| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- CYB50 T+1 策略舒适区深度研究
- 客观量化哪些指标组合预测策略进入高胜率模式
- """
- import sys
- import io
- if sys.platform == 'win32':
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
- import pandas as pd
- import numpy as np
- from scipy import stats
- import warnings
- warnings.filterwarnings('ignore')
- OUTPUT_FILE = 'D:/work/project/cyb50-quant/cat-fly/t1/comfort_zone_research_result.txt'
- def log(msg=''):
- print(msg)
- def load_trades():
- df = pd.read_csv(
- 'D:/work/project/cyb50-quant/cat-fly/t1/t1_trades_with_environment_20260327_141655.csv',
- encoding='utf-8-sig'
- )
- # 重命名列(按位置)
- cols = [
- '交易方向','开仓时间','平仓时间','开仓价格','平仓价格','仓位',
- '盈亏金额','盈亏百分比','退出原因','持仓周期数','持仓小时数',
- 'T1调整','原平仓时间','原平仓价格','原盈亏','盈亏变化',
- '入场信号','开仓市值','平仓时资金','市场状态',
- '趋势短期','趋势中期','趋势强度','波动率分位','波动率水平',
- '成交量分位','布林带位置','布林带区域','RSI分位','RSI区域',
- '1日动量','入场价格'
- ]
- df.columns = cols
- df['开仓时间'] = pd.to_datetime(df['开仓时间'])
- df['年份'] = df['开仓时间'].dt.year
- df['月份'] = df['开仓时间'].dt.month
- df['年月'] = df['开仓时间'].dt.to_period('M')
- df['盈利'] = df['盈亏金额'] > 0
- df['盈亏百分比'] = pd.to_numeric(df['盈亏百分比'], errors='coerce')
- df['波动率分位'] = pd.to_numeric(df['波动率分位'], errors='coerce')
- df['RSI分位'] = pd.to_numeric(df['RSI分位'], errors='coerce')
- df['布林带位置'] = pd.to_numeric(df['布林带位置'], errors='coerce')
- df['成交量分位'] = pd.to_numeric(df['成交量分位'], errors='coerce')
- df['趋势强度'] = pd.to_numeric(df['趋势强度'], errors='coerce')
- df['1日动量'] = pd.to_numeric(df['1日动量'], errors='coerce')
- return df
- def section(title, f):
- line = '=' * 72
- log(line)
- log(f' {title}')
- log(line)
- def yearly_summary(df, f):
- section('第一部分:年度绩效对比', f)
- log()
- for year in sorted(df['年份'].unique()):
- sub = df[df['年份'] == year]
- wr = sub['盈利'].mean()
- total_pnl = sub['盈亏金额'].sum()
- avg_win = sub[sub['盈利']]['盈亏金额'].mean() if sub['盈利'].any() else 0
- avg_loss = sub[~sub['盈利']]['盈亏金额'].mean() if (~sub['盈利']).any() else 0
- plr = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')
- log(f' {year}年: {len(sub)}笔 | 胜率{wr:.1%} | 盈亏比{plr:.2f} | 总盈亏{total_pnl:+,.0f}元')
- log()
- # 好年份 vs 差年份
- good = df[df['年份'].isin([2025, 2026])]
- bad = df[df['年份'].isin([2023, 2024])]
- log(f' 好年份(2025-2026): {len(good)}笔, 胜率{good["盈利"].mean():.1%}, 平均盈亏{good["盈亏金额"].mean():+,.0f}元')
- log(f' 差年份(2023-2024): {len(bad)}笔, 胜率{bad["盈利"].mean():.1%}, 平均盈亏{bad["盈亏金额"].mean():+,.0f}元')
- log()
- def analyze_categorical(df, col, label, f, min_count=5):
- section(f'第二部分:类别指标分析 — {label}', f)
- log()
- results = []
- for val in df[col].dropna().unique():
- sub = df[df[col] == val]
- if len(sub) < min_count:
- continue
- wr = sub['盈利'].mean()
- avg_pnl = sub['盈亏金额'].mean()
- total_pnl = sub['盈亏金额'].sum()
- n = len(sub)
- results.append((val, n, wr, avg_pnl, total_pnl))
- results.sort(key=lambda x: x[2], reverse=True)
- log(f' {"类别":<20} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"总盈亏":>12}')
- log(f' {"-"*20} {"-"*5} {"-"*7} {"-"*10} {"-"*12}')
- for val, n, wr, avg, total in results:
- log(f' {str(val):<20} {n:>5} {wr:>7.1%} {avg:>+10,.0f} {total:>+12,.0f}')
- log()
- return results
- def analyze_continuous_bins(df, col, label, f, bins=5):
- section(f'第三部分:连续指标分位分析 — {label}', f)
- log()
- valid = df[df[col].notna()].copy()
- if len(valid) < 20:
- log(f' 数据不足,跳过')
- return
- valid['_bin'] = pd.qcut(valid[col], q=bins, duplicates='drop')
- results = []
- for bin_val in valid['_bin'].cat.categories:
- sub = valid[valid['_bin'] == bin_val]
- wr = sub['盈利'].mean()
- avg_pnl = sub['盈亏金额'].mean()
- n = len(sub)
- lo = bin_val.left
- hi = bin_val.right
- results.append((lo, hi, n, wr, avg_pnl))
- log(f' {"区间":<25} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}')
- log(f' {"-"*25} {"-"*5} {"-"*7} {"-"*10}')
- for lo, hi, n, wr, avg in results:
- log(f' [{lo:>8.3f}, {hi:>8.3f}] {n:>5} {wr:>7.1%} {avg:>+10,.0f}')
- log()
- # 相关性
- corr, pval = stats.pointbiserialr(valid[col], valid['盈利'].astype(int))
- log(f' 与胜率相关系数: r={corr:.3f}, p={pval:.3f}{" ★显著" if pval < 0.05 else ""}')
- log()
- return results
- def good_vs_bad_distributions(df, f):
- section('第四部分:好/差年份 各指标分布对比', f)
- log()
- good = df[df['年份'].isin([2025, 2026])]
- bad = df[df['年份'].isin([2023, 2024])]
- # 连续指标
- num_cols = [
- ('波动率分位', '波动率分位数'),
- ('RSI分位', 'RSI分位数'),
- ('布林带位置', '布林带位置'),
- ('趋势强度', '趋势强度'),
- ('1日动量', '1日动量'),
- ('成交量分位', '成交量分位'),
- ]
- log(f' {"指标":<15} {"差年份均值":>12} {"好年份均值":>12} {"差值":>10} {"t检验p值":>10} {"显著?":>6}')
- log(f' {"-"*15} {"-"*12} {"-"*12} {"-"*10} {"-"*10} {"-"*6}')
- for col, label in num_cols:
- g_vals = good[col].dropna()
- b_vals = bad[col].dropna()
- if len(g_vals) < 5 or len(b_vals) < 5:
- continue
- t, p = stats.ttest_ind(g_vals, b_vals)
- diff = g_vals.mean() - b_vals.mean()
- sig = '★' if p < 0.05 else ''
- log(f' {label:<15} {b_vals.mean():>12.3f} {g_vals.mean():>12.3f} {diff:>+10.3f} {p:>10.3f} {sig:>6}')
- log()
- # 类别指标
- cat_cols = [
- ('市场状态', '市场状态'),
- ('趋势短期', '趋势短期'),
- ('趋势中期', '趋势中期'),
- ('波动率水平', '波动率水平'),
- ('布林带区域', '布林带区域'),
- ('RSI区域', 'RSI区域'),
- ]
- for col, label in cat_cols:
- log(f' [{label}] 好年份分布 vs 差年份分布:')
- all_vals = df[col].dropna().unique()
- log(f' {"类别":<18} {"差年份占比":>10} {"好年份占比":>10} {"差值":>8}')
- for val in sorted(all_vals):
- b_pct = (bad[col] == val).mean()
- g_pct = (good[col] == val).mean()
- diff = g_pct - b_pct
- marker = ' ←' if abs(diff) > 0.05 else ''
- log(f' {str(val):<18} {b_pct:>10.1%} {g_pct:>10.1%} {diff:>+8.1%}{marker}')
- log()
- def winning_condition_scan(df, f):
- section('第五部分:胜率 > 60% 的单指标条件扫描', f)
- log()
- log(' 过滤条件:笔数≥8, 胜率≥60%')
- log()
- cat_cols = ['市场状态', '趋势短期', '趋势中期', '波动率水平', '布林带区域', 'RSI区域', 'T1调整']
- findings = []
- for col in cat_cols:
- for val in df[col].dropna().unique():
- sub = df[df[col] == val]
- if len(sub) < 8:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- if wr >= 0.60:
- findings.append((f'{col}={val}', len(sub), wr, avg))
- findings.sort(key=lambda x: x[2], reverse=True)
- log(f' {"条件":<30} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}')
- log(f' {"-"*30} {"-"*5} {"-"*7} {"-"*10}')
- for cond, n, wr, avg in findings:
- log(f' {cond:<30} {n:>5} {wr:>7.1%} {avg:>+10,.0f}')
- log()
- def combo_scan(df, f):
- section('第六部分:双指标组合扫描 (笔数≥8, 胜率≥60%)', f)
- log()
- cat_cols = ['市场状态', '趋势中期', '波动率水平', '布林带区域', 'RSI区域']
- combos = []
- for i, c1 in enumerate(cat_cols):
- for c2 in cat_cols[i+1:]:
- for v1 in df[c1].dropna().unique():
- for v2 in df[c2].dropna().unique():
- sub = df[(df[c1] == v1) & (df[c2] == v2)]
- if len(sub) < 8:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- if wr >= 0.60:
- combos.append((f'{c1}={v1} & {c2}={v2}', len(sub), wr, avg))
- combos.sort(key=lambda x: x[2], reverse=True)
- log(f' {"组合条件":<45} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}')
- log(f' {"-"*45} {"-"*5} {"-"*7} {"-"*10}')
- for cond, n, wr, avg in combos[:20]:
- log(f' {cond:<45} {n:>5} {wr:>7.1%} {avg:>+10,.0f}')
- log()
- def volatility_threshold(df, f):
- section('第七部分:波动率阈值精确定位', f)
- log()
- valid = df[df['波动率分位'].notna()].copy()
- thresholds = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
- log(f' 波动率分位 < X 时的胜率:')
- log(f' {"阈值":>8} {"笔数":>6} {"胜率":>8} {"均盈亏":>10}')
- log(f' {"-"*8} {"-"*6} {"-"*8} {"-"*10}')
- for thr in thresholds:
- sub = valid[valid['波动率分位'] < thr]
- if len(sub) < 5:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- log(f' {thr:>8.1f} {len(sub):>6} {wr:>8.1%} {avg:>+10,.0f}')
- log()
- log(f' 波动率分位 >= X 时的胜率:')
- log(f' {"阈值":>8} {"笔数":>6} {"胜率":>8} {"均盈亏":>10}')
- log(f' {"-"*8} {"-"*6} {"-"*8} {"-"*10}')
- for thr in thresholds:
- sub = valid[valid['波动率分位'] >= thr]
- if len(sub) < 5:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- log(f' {thr:>8.1f} {len(sub):>6} {wr:>8.1%} {avg:>+10,.0f}')
- log()
- def rsi_threshold(df, f):
- section('第八部分:RSI分位阈值精确定位', f)
- log()
- valid = df[df['RSI分位'].notna()].copy()
- thresholds = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
- log(f' RSI分位 < X 时的胜率(低RSI/超卖区间做多):')
- log(f' {"阈值":>8} {"笔数":>6} {"胜率":>8} {"均盈亏":>10}')
- log(f' {"-"*8} {"-"*6} {"-"*8} {"-"*10}')
- for thr in thresholds:
- sub = valid[valid['RSI分位'] < thr]
- if len(sub) < 5:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- log(f' {thr:>8.1f} {len(sub):>6} {wr:>8.1%} {avg:>+10,.0f}')
- log()
- def monthly_rolling(df, f):
- section('第九部分:滚动月度胜率 — 识别策略周期规律', f)
- log()
- monthly = df.groupby('年月').agg(
- 笔数=('盈利', 'count'),
- 胜率=('盈利', 'mean'),
- 总盈亏=('盈亏金额', 'sum')
- ).reset_index()
- monthly['年月_str'] = monthly['年月'].astype(str)
- log(f' {"年月":<10} {"笔数":>5} {"胜率":>7} {"总盈亏":>12} {"状态":>6}')
- log(f' {"-"*10} {"-"*5} {"-"*7} {"-"*12} {"-"*6}')
- for _, row in monthly.iterrows():
- state = '★好' if row['胜率'] >= 0.6 else ('△差' if row['胜率'] < 0.35 else ' ')
- log(f' {row["年月_str"]:<10} {row["笔数"]:>5} {row["胜率"]:>7.1%} {row["总盈亏"]:>+12,.0f} {state:>6}')
- log()
- # 统计好月份的特征
- good_months = monthly[monthly['胜率'] >= 0.6]
- bad_months = monthly[monthly['胜率'] < 0.35]
- log(f' 胜率≥60%的月份: {len(good_months)}个, 总计{good_months["笔数"].sum()}笔')
- log(f' 胜率<35%的月份: {len(bad_months)}个, 总计{bad_months["笔数"].sum()}笔')
- log()
- def comfort_zone_score(df, f):
- section('第十部分:多维舒适区评分模型', f)
- log()
- log(' 基于以上分析,建立量化评分规则(每项满足得分累加):')
- log()
- df2 = df.copy()
- # 规则定义:(描述, 条件函数, 分值)
- rules = []
- # 波动率
- if df2['波动率分位'].notna().any():
- rules.append(('波动率分位 < 0.4', lambda r: r['波动率分位'] < 0.4 if pd.notna(r['波动率分位']) else False, 2))
- rules.append(('波动率分位 < 0.2', lambda r: r['波动率分位'] < 0.2 if pd.notna(r['波动率分位']) else False, 1))
- # RSI
- if df2['RSI分位'].notna().any():
- rules.append(('RSI分位 < 0.4', lambda r: r['RSI分位'] < 0.4 if pd.notna(r['RSI分位']) else False, 1))
- rules.append(('RSI分位 < 0.5 (偏低)', lambda r: r['RSI分位'] < 0.5 if pd.notna(r['RSI分位']) else False, 1))
- # 趋势
- rules.append(('趋势中期=上涨', lambda r: r['趋势中期'] == '上涨', 2))
- rules.append(('趋势短期=上涨', lambda r: r['趋势短期'] == '上涨', 1))
- # 布林带
- rules.append(('布林带区域=下轨中位', lambda r: '下轨' in str(r['布林带区域']), 1))
- rules.append(('布林带区域=中轨', lambda r: r['布林带区域'] == '中轨', 1))
- # 市场状态
- rules.append(('市场状态=强趋势低波', lambda r: r['市场状态'] == '强趋势低波', 2))
- rules.append(('市场状态=趋势上涨', lambda r: '趋势' in str(r['市场状态']) and '上' in str(r['市场状态']), 1))
- # 计算每笔交易的舒适区评分
- def score_trade(row):
- s = 0
- for _, cond, pts in rules:
- try:
- if cond(row):
- s += pts
- except:
- pass
- return s
- df2['舒适区评分'] = df2.apply(score_trade, axis=1)
- # 按评分分组
- log(f' {"评分":>6} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"总盈亏":>12}')
- log(f' {"-"*6} {"-"*5} {"-"*7} {"-"*10} {"-"*12}')
- for score in sorted(df2['舒适区评分'].unique()):
- sub = df2[df2['舒适区评分'] == score]
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- total = sub['盈亏金额'].sum()
- log(f' {score:>6} {len(sub):>5} {wr:>7.1%} {avg:>+10,.0f} {total:>+12,.0f}')
- log()
- # 阈值建议
- for thr in [4, 5, 6, 7, 8]:
- sub = df2[df2['舒适区评分'] >= thr]
- if len(sub) < 5:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- cov = len(sub) / len(df2)
- log(f' 评分≥{thr}: {len(sub)}笔 ({cov:.1%}覆盖) | 胜率{wr:.1%} | 均盈亏{avg:+,.0f}元')
- log()
- # 输出规则列表
- log(' 评分规则明细:')
- for desc, _, pts in rules:
- log(f' +{pts}分 {desc}')
- log()
- return df2
- def momentum_analysis(df, f):
- section('第十一部分:1日动量与胜率关系', f)
- log()
- valid = df[df['1日动量'].notna()].copy()
- log(f' 1日动量 > 0 (上涨动量): {len(valid[valid["1日动量"]>0])}笔, 胜率{valid[valid["1日动量"]>0]["盈利"].mean():.1%}')
- log(f' 1日动量 < 0 (下跌动量): {len(valid[valid["1日动量"]<0])}笔, 胜率{valid[valid["1日动量"]<0]["盈利"].mean():.1%}')
- log()
- corr, pval = stats.pointbiserialr(valid['1日动量'], valid['盈利'].astype(int))
- log(f' 动量与胜率相关系数: r={corr:.3f}, p={pval:.3f}{" ★显著" if pval < 0.05 else ""}')
- log()
- def t1_effect(df, f):
- section('第十二部分:T+1调整对胜率的影响', f)
- log()
- t1 = df[df['T1调整'].str.contains('T1|T0', na=False)]
- not_t1 = df[~df['T1调整'].str.contains('T1|T0', na=False)]
- log(f' T+1调整交易: {len(t1)}笔, 胜率{t1["盈利"].mean():.1%}, 均盈亏{t1["盈亏金额"].mean():+,.0f}元')
- log(f' 非T+1交易: {len(not_t1)}笔, 胜率{not_t1["盈利"].mean():.1%}, 均盈亏{not_t1["盈亏金额"].mean():+,.0f}元')
- # 原盈亏 vs 新盈亏
- t1_valid = t1[t1['原盈亏'].notna() & t1['盈亏金额'].notna()]
- if len(t1_valid) > 0:
- orig_wins = (t1_valid['原盈亏'] > 0).mean()
- new_wins = (t1_valid['盈亏金额'] > 0).mean()
- log(f' T+1调整笔中,原始胜率{orig_wins:.1%} → T+1后胜率{new_wins:.1%}')
- log()
- def final_summary(f):
- section('第十三部分:综合结论与舒适区定义', f)
- log()
- log(' 基于以上分析,策略舒适区的量化定义:')
- log()
- log(' 【核心必要条件】(缺一不可)')
- log(' 1. 波动率水平 ≠ "极高" (高波动是最大杀手)')
- log(' 2. 波动率分位 < 0.5 (处于历史波动率中位以下)')
- log()
- log(' 【加分条件】(满足越多越好)')
- log(' +2分 趋势中期 = "上涨" (中期趋势配合做多方向)')
- log(' +2分 市场状态 = "强趋势低波" (最优市场环境)')
- log(' +2分 波动率分位 < 0.4 (低波动率环境)')
- log(' +1分 趋势短期 = "上涨" (短期趋势配合)')
- log(' +1分 RSI分位 < 0.5 (RSI未过热)')
- log(' +1分 布林带区域含 "下轨" (价格位于布林带下方回调位)')
- log(' +1分 波动率分位 < 0.2 (极低波动率加成)')
- log()
- log(' 【评分建议】')
- log(' 总分 ≥ 5分: 进入舒适区,正常交易')
- log(' 总分 3-4分: 半舒适区,减半仓位')
- log(' 总分 < 3分: 非舒适区,建议观望或跳过')
- log()
- log(' 【时间规律】')
- log(' 策略在创业板行情趋势明确、波动率收敛后表现最好')
- log(' 2025-2026年市场结构更适合此策略(可能与流动性环境有关)')
- log()
- def main():
- # 重定向输出到文件
- orig_stdout = sys.stdout
- with open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out:
- # 双重输出:控制台 + 文件
- class Tee:
- def __init__(self, *files):
- self.files = files
- def write(self, data):
- for fh in self.files:
- fh.write(data)
- def flush(self):
- for fh in self.files:
- fh.flush()
- sys.stdout = Tee(orig_stdout, f_out)
- log('CYB50 T+1 策略舒适区深度研究报告')
- log(f'数据覆盖: 2023-03-27 ~ 2026-03-25')
- log()
- df = load_trades()
- log(f'加载交易记录: {len(df)}笔')
- log()
- f = None # 文件句柄占位,实际通过Tee输出
- yearly_summary(df, f)
- analyze_categorical(df, '市场状态', '市场状态', f)
- analyze_categorical(df, '趋势中期', '趋势中期', f)
- analyze_categorical(df, '波动率水平', '波动率水平', f)
- analyze_categorical(df, '布林带区域', '布林带区域', f)
- analyze_categorical(df, 'RSI区域', 'RSI区域', f)
- good_vs_bad_distributions(df, f)
- analyze_continuous_bins(df, '波动率分位', '波动率分位', f)
- analyze_continuous_bins(df, 'RSI分位', 'RSI分位', f)
- analyze_continuous_bins(df, '布林带位置', '布林带位置', f)
- analyze_continuous_bins(df, '趋势强度', '趋势强度', f)
- volatility_threshold(df, f)
- rsi_threshold(df, f)
- monthly_rolling(df, f)
- winning_condition_scan(df, f)
- combo_scan(df, f)
- momentum_analysis(df, f)
- t1_effect(df, f)
- comfort_zone_score(df, f)
- final_summary(f)
- log('=' * 72)
- log('分析完成')
- log('=' * 72)
- sys.stdout = orig_stdout
- print(f'\n结果已保存到: {OUTPUT_FILE}')
- if __name__ == '__main__':
- main()
|