#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 T+1 最终策略回测(完整K线管道版) ────────────────────────────────────────────────────────────────────── 数据管道: 原始K线CSV → 技术指标 → 多空信号 → 交易执行 → T+1转换 → 市场环境标注 → 过滤分析 策略逻辑: 必要条件:市场状态 NOT IN ['下跌趋势低波', '震荡低波'](死亡区过滤) 入场条件:加分模型评分 >= 5(Version D 阈值) 仓位加成:评分≥5 且命中组合规则 → 1.5x 仓位(调整盈亏金额) 对比版本: B — 仅排死亡区(基准) D — 排死亡区 + 评分≥5(入场过滤) F — 排死亡区 + 评分≥5 + 组合规则加仓(最终策略) 组合规则(走前向验证,训练2023-2024,验证2025,盲测2026): 规则1:RSI区域=中性偏弱 & rsi_bin=rsi偏低 (训练67% → 验证80%) 规则2:vol_bin=vol中 & ts_bin=ts强 (训练57% → 验证60%) 规则3:波动率水平=中等 & ts_bin=ts强 (训练57% → 验证60%) 规则4:市场状态=震荡高波 & RSI区域=中性偏弱 (训练57% → 验证50%) 规则5:RSI区域=中性偏弱 & ts_bin=ts弱 (训练57% → 验证50%) """ import sys, io, os, contextlib if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import pandas as pd import numpy as np from datetime import datetime import warnings warnings.filterwarnings('ignore') from cyb50_30min_dual_direction import ( IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) from t1_converter import simulate_t1_trades from comfort_zone_analyzer import MarketEnvironmentAnalyzer, ComfortZoneAnalyzer # ────────────────────────────────────────────────────────────────── # 配置项 # ────────────────────────────────────────────────────────────────── DATA_CSV = os.path.join(os.path.dirname(__file__), 'cyb50_30min_2023_to_20260325.csv') INITIAL = 1_000_000 DEATH_ZONES = {'下跌趋势低波', '震荡低波'} BOOST_MULT = 1.5 # 命中组合规则时的仓位乘数 USE_FEES = True # True=含手续费(t1_converter计算); False=按价差×数量重算 EXPORT_CSV = True # True=每次运行后导出各版本明细到 CSV(含K线指标) VERBOSE = False # True=显示策略执行过程详情(大量输出) SEP = '=' * 72 # ────────────────────────────────────────────────────────────────── # 工具:压制子模块 stdout # ────────────────────────────────────────────────────────────────── @contextlib.contextmanager def suppress_stdout(): old = sys.stdout sys.stdout = io.StringIO() try: yield finally: sys.stdout = old def ctx(): """每次返回新的上下文管理器(VERBOSE=False时压制输出)""" return contextlib.nullcontext() if VERBOSE else suppress_stdout() # ────────────────────────────────────────────────────────────────── # 指标分箱 # ────────────────────────────────────────────────────────────────── def vol_bin(v): if pd.isna(v): return 'unknown' if v < 0.20: return 'vol极低' if v < 0.40: return 'vol低' if v < 0.60: return 'vol中' if v < 0.80: return 'vol高' return 'vol极高' def rsi_bin(v): if pd.isna(v): return 'unknown' if v < 0.05: return 'rsi极底' if v < 0.10: return 'rsi底' if v < 0.20: return 'rsi低' if v < 0.40: return 'rsi偏低' if v < 0.60: return 'rsi中' if v < 0.80: return 'rsi偏高' return 'rsi高' def ts_bin(v): if pd.isna(v): return 'unknown' if v < 1.0: return 'ts弱' if v < 1.5: return 'ts中弱' if v < 2.5: return 'ts中' if v < 4.0: return 'ts强' return 'ts极强' def add_bins(df): df = df.copy() df['vol_bin'] = df['波动率分位'].apply(vol_bin) df['rsi_bin'] = df['RSI分位'].apply(rsi_bin) df['ts_bin'] = df['趋势强度'].apply(ts_bin) return df # ────────────────────────────────────────────────────────────────── # Version G: 日线指标 & 区域净分 # ────────────────────────────────────────────────────────────────── def build_daily_indicators(data_df: pd.DataFrame) -> pd.DataFrame: """从30分钟K线聚合日线,计算日线技术指标""" d = data_df.resample('D').agg( Open=('Open', 'first'), High=('High', 'max'), Low=('Low', 'min'), Close=('Close', 'last'), Volume=('Volume', 'sum') ).dropna(subset=['Close']) delta = d['Close'].diff() gain = delta.where(delta > 0, 0).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() d['RSI_D'] = 100 - 100 / (1 + gain / (loss + 1e-9)) low9 = d['Low'].rolling(9).min() high9 = d['High'].rolling(9).max() rsv = (d['Close'] - low9) / (high9 - low9 + 1e-9) * 100 d['K_D'] = rsv.ewm(com=2, adjust=False).mean() d['D_D'] = d['K_D'].ewm(com=2, adjust=False).mean() d['J_D'] = 3 * d['K_D'] - 2 * d['D_D'] d['MA5_D'] = d['Close'].rolling(5).mean() d['MA20_D'] = d['Close'].rolling(20).mean() d['MA5_slope'] = d['MA5_D'].diff() / d['MA5_D'].shift() * 100 # 日MA5斜率% d['pct_MA20'] = (d['Close'] - d['MA20_D']) / d['MA20_D'] * 100 bm = d['Close'].rolling(20).mean() bs = d['Close'].rolling(20).std() d['BB_pos_D'] = (d['Close'] - (bm - 2*bs)) / (4*bs + 1e-9) d['Mom5_D'] = d['Close'].pct_change(5) * 100 d['Mom10_D'] = d['Close'].pct_change(10) * 100 return d def attach_zone_indicators(enriched: pd.DataFrame, data_with_ind: pd.DataFrame, daily: pd.DataFrame) -> pd.DataFrame: """附加入场时刻的30分钟K线指标 + 当日日线指标""" kline_cols = ['RSI', 'K', 'D', 'J', 'MACD_hist', 'BB_width', 'ATR_Pct', 'Momentum', 'Volume_Ratio'] daily_cols = ['RSI_D', 'K_D', 'J_D', 'MA5_slope', 'pct_MA20', 'BB_pos_D', 'Mom5_D', 'Mom10_D'] out = enriched.copy() # 30分钟指标 — 向量化 for c in kline_cols: if c in data_with_ind.columns: out[c] = out['开仓时间'].map( lambda t, col=c: data_with_ind.at[t, col] if t in data_with_ind.index else float('nan')) # 日线指标 — 向量化 entry_days = out['开仓时间'].dt.normalize() for c in daily_cols: if c in daily.columns: out[c] = entry_days.map( lambda t, col=c: daily.at[t, col] if t in daily.index else float('nan')) return out # 舒适区规则(命中 +1 分)— 来自 kline_zone_analysis 分析结论 COMFORT_RULES = [ # (col1, lo1, hi1, col2, lo2, hi2, 描述) ('J', 20, 50, 'J_D', 20, 50, 'J跨周期共振低位'), # WR 85.7% ('RSI', 45, 55, 'RSI_D', 60, 70, 'RSI 30min温和+日线强'), # WR 80.0% ('K', 20, 40, 'K_D', 80, 999, 'K低位+日线K超买(反弹)'), # WR 71.4% ('K', 20, 40, 'K_D', 20, 40, 'K跨周期共振低位'), # WR 62.5% ('J', 0, 20, 'J_D', 20, 50, 'J超卖+日线低位'), # WR 66.7% ('RSI', 25, 35, 'MA5_slope', 0.1, 0.5, 'RSI弱+日MA5上升'), # WR 66.7% ('RSI', 35, 45, 'RSI_D', 40, 50, 'RSI双偏弱共振'), # WR 66.7% ('RSI', 35, 45, 'MA5_slope', 0.1, 0.5, 'RSI偏弱+MA5上升'), # WR 66.7% ('RSI_D', 60, 70, 'Mom5_D', -2, 0, '日RSI强+小幅回调中'), # WR 63.6% ('BB_pos_D',0.3,0.5, 'J_D', 0, 20, 'BB中下+日KDJ超卖'), # WR 62.5% ('MACD_hist',0.8,999, None, None, None, 'MACD强多头'), # WR 66.7% ('Momentum', 0, 0.01, None, None, None, '30min小涨'), # WR 62.5% ] # 黑暗区规则(命中 -1 分) DARK_RULES = [ ('RSI', 45, 55, 'RSI_D', 50, 60, 'RSI双均衡无方向'), # WR 25% ('Momentum',-999,-0.03,'Mom5_D', -999, -4, '动量双大跌共振'), # WR 20% ('Volume_Ratio',1.8,999,'ATR_Pct', -999,0.006, '大放量+极低波动'), # WR 28.6% ('MACD_hist', 0.2, 0.8, None, None, None, 'MACD弱多头'), # WR 25% ('Volume_Ratio',-999,0.5,None,None, None, '极度缩量'), # WR 25% ('K', 20, 40, 'K_D', 60, 80, 'K低位+日线K高位'), # WR 33.3% ('RSI', 35, 45, 'MA5_slope', 0.5, 999, 'RSI偏弱+急速上升'), # WR 25% ('RSI', 35, 45, 'RSI_D', 70, 999, 'RSI偏弱+日线超买'), # WR 25% ('Momentum',-999,-0.03, None, None, None, '30min大跌'), # WR 26.7% ('RSI', 25, 35, 'MA5_slope', -999,-0.5, 'RSI弱+日MA5急降'), # WR 26.3% ('BB_pos_D',-999, 0.1, 'J_D', 0, 20, 'BB超下轨+日KDJ超卖'), # WR 28.6% ('Mom10_D', 6, 999, None, None, None, '日线10日大涨高位'), # WR 32.1% ] def _in_range(val, lo, hi) -> bool: v = pd.to_numeric(val, errors='coerce') return not pd.isna(v) and lo <= v < hi def zone_net_score(row) -> int: """区域净分 = 舒适规则命中数 − 黑暗规则命中数""" def hit(c1, lo1, hi1, c2, lo2, hi2): if not _in_range(row.get(c1, float('nan')), lo1, hi1): return False if c2 is None: return True return _in_range(row.get(c2, float('nan')), lo2, hi2) comfort = sum(1 for r in COMFORT_RULES if hit(*r[:6])) dark = sum(1 for r in DARK_RULES if hit(*r[:6])) return comfort - dark # ────────────────────────────────────────────────────────────────── # 加分模型(Version D 使用,阈值≥5) # ────────────────────────────────────────────────────────────────── def comfort_score(row) -> int: s = 0 ms = str(row.get('市场状态', '')) vl = str(row.get('波动率水平', '')) rsi = str(row.get('RSI区域', '')) vq = row.get('波动率分位', float('nan')) rq = row.get('RSI分位', float('nan')) ts = row.get('趋势强度', float('nan')) t1 = str(row.get('T1调整', '')) # '否' 或 '是(T0→T1)' if ms == '下跌趋势高波' and vl == '极低': s += 3 # 最优组合 if pd.notna(rq) and 0.05 <= rq < 0.10: s += 3 # 最优RSI区间 if pd.notna(vq) and vq < 0.30: s += 2 # 低波动率分位 if pd.notna(ts) and 1.5 <= ts < 4.0: s += 2 # 适中趋势强度 if rsi == '中性偏弱': s += 2 # 最稳定RSI if 'T0' not in t1: s += 2 # 非T0延期单 if vl in ('极低', '低', '中等'): s += 1 # 低中波动率 if pd.notna(rq) and rq >= 0.60: s += 1 # RSI回升阶段 return s # ────────────────────────────────────────────────────────────────── # 组合规则(走前向验证后锁定) # ────────────────────────────────────────────────────────────────── COMBO_RULES = [ {'RSI区域': '中性偏弱', 'rsi_bin': 'rsi偏低'}, # 规则1: 训练67% → 验证80% {'vol_bin': 'vol中', 'ts_bin': 'ts强'}, # 规则2: 训练57% → 验证60% {'波动率水平': '中等', 'ts_bin': 'ts强'}, # 规则3: 训练57% → 验证60% {'市场状态': '震荡高波', 'RSI区域': '中性偏弱'}, # 规则4: 训练57% → 验证50% {'RSI区域': '中性偏弱', 'ts_bin': 'ts弱'}, # 规则5: 训练57% → 验证50% ] def hits_combo(row) -> bool: for rule in COMBO_RULES: if all(str(row.get(col, '')) == str(val) for col, val in rule.items()): return True return False # ────────────────────────────────────────────────────────────────── # 权益曲线模拟(支持仓位乘数) # ────────────────────────────────────────────────────────────────── def simulate_equity(df, initial=INITIAL): df = df.copy().reset_index(drop=True) if 'pnl_mult' not in df.columns: df['pnl_mult'] = 1.0 cap = float(initial) caps = [] for _, r in df.iterrows(): cap += float(r['盈亏金额']) * float(r['pnl_mult']) caps.append(cap) df['资金余额'] = caps df['实际盈亏'] = df['盈亏金额'] * df['pnl_mult'] df['盈利'] = df['实际盈亏'] > 0 return df # ────────────────────────────────────────────────────────────────── # 绩效统计 # ────────────────────────────────────────────────────────────────── def calc_stats(df, initial=INITIAL): if len(df) == 0: return None wr = df['盈利'].mean() pnl = df['实际盈亏'].sum() cap = df['资金余额'].iloc[-1] ret = (cap - initial) / initial win = df[df['盈利']]['实际盈亏'] los = df[~df['盈利']]['实际盈亏'] plr = abs(win.mean() / los.mean()) if len(los) > 0 and los.mean() != 0 else float('inf') eq = df['资金余额'].values pk = np.maximum.accumulate(np.append([initial], eq)) dd = ((eq - pk[1:]) / pk[1:]).min() if len(eq) > 0 else 0 return dict(n=len(df), wr=wr, pnl=pnl, cap=cap, ret=ret, plr=plr, dd=dd) def print_yearly(df, initial=INITIAL): if len(df) == 0: return df = df.copy() df['年份'] = pd.to_datetime(df['开仓时间']).dt.year prev = initial for y in sorted(df['年份'].unique()): sy = df[df['年份'] == y] pnl = sy['实际盈亏'].sum() wr = sy['盈利'].mean() end = sy['资金余额'].iloc[-1] print(f" {y}年: {len(sy):>3}笔 胜率{wr:.1%} | {pnl:>+12,.0f}元 ({pnl/prev:>+.2%}) → {end:,.0f}元") prev = end def print_regime_breakdown(df): if len(df) == 0 or '市场状态' not in df.columns: return print(f" {'市场状态':<15} {'笔数':>5} {'胜率':>7} {'均盈亏':>10} {'总盈亏':>12}") print(f" {'-'*15} {'-'*5} {'-'*7} {'-'*10} {'-'*12}") for ms in sorted(df['市场状态'].unique()): sub = df[df['市场状态'] == ms] wr = sub['盈利'].mean() avg = sub['实际盈亏'].mean() tot = sub['实际盈亏'].sum() print(f" {ms:<15} {len(sub):>5} {wr:>7.1%} {avg:>+10,.0f} {tot:>+12,.0f}") # ────────────────────────────────────────────────────────────────── # 主流程 # ────────────────────────────────────────────────────────────────── def main(): print(SEP) print(' CYB50 T+1 最终策略回测(完整K线管道版)') print(f' Version D (排死亡区+评分≥5) vs Version F (D + 组合规则加仓{BOOST_MULT}x)') print(SEP) # ── 步骤1: 加载原始K线 CSV ─────────────────────────────────── print(f'\n📂 步骤1: 加载原始K线数据...') raw_csv = pd.read_csv(DATA_CSV, encoding='utf-8-sig') fetcher = IntradayDataFetcher() with ctx(): data = fetcher._process_dataframe( raw_csv, pd.Timestamp('2000-01-01'), pd.Timestamp('2099-12-31') ) print(f' K线: {len(data)}条 {data.index[0].date()} ~ {data.index[-1].date()}') # ── 步骤2: 计算技术指标 ────────────────────────────────────── print(f'\n📈 步骤2: 计算技术指标...') with ctx(): data_with_ind = fetcher.calculate_intraday_indicators(data) ind_cols = [c for c in ['RSI','K','D','J','MACD','MACD_hist','BB_width', 'ATR_Pct','Momentum','Volume_Ratio'] if c in data_with_ind.columns] print(f' 指标列: {ind_cols}') # ── 步骤3: 生成信号 & 执行多空双向交易 ────────────────────── print(f'\n🔄 步骤3: 生成多空双向信号并执行交易...') gen = DualDirectionSignalGenerator() with ctx(): signals_df = gen.generate_dual_direction_signals(data_with_ind) executor = DualDirectionExecutor(initial_capital=INITIAL) with ctx(): _, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() short_trades = trades_df[trades_df['交易方向'] == '做空'].copy() print(f' 总交易: {len(trades_df)}笔 做多: {len(long_trades)}笔 做空: {len(short_trades)}笔') # ── 步骤4: T+1规则转换(仅做多) ──────────────────────────── print(f'\n📅 步骤4: T+1规则转换(做多交易)...') with ctx(): t1_trades = simulate_t1_trades(data_with_ind, long_trades, INITIAL) if len(t1_trades) == 0: print(' ⚠️ T+1转换后无交易记录,退出。') return t1_adj = (t1_trades['T+1调整'] == '是(T0→T1)').sum() print(f' T+1交易: {len(t1_trades)}笔 其中T0→T1调整: {t1_adj}笔') # ── 步骤5: 计算市场环境 & 标注每笔交易 ────────────────────── print(f'\n🔬 步骤5: 市场环境分析 & 交易标注...') market_analyzer = MarketEnvironmentAnalyzer(data_with_ind) cza = ComfortZoneAnalyzer(t1_trades, market_analyzer) with ctx(): cza._enrich_trades_with_environment() enriched = cza.enriched_trades.copy() print(f' 成功标注: {len(enriched)}笔交易') # ── 规范化列名 & 数值类型 ──────────────────────────────────── # T+1调整 → T1调整(供 comfort_score 使用) if 'T+1调整' in enriched.columns: enriched.rename(columns={'T+1调整': 'T1调整'}, inplace=True) enriched['开仓时间'] = pd.to_datetime(enriched['开仓时间']) enriched['平仓时间'] = pd.to_datetime(enriched['平仓时间']) for c in ['盈亏金额', '盈亏百分比', '波动率分位', 'RSI分位', '趋势强度', '布林带位置', '1日动量', '成交量分位']: if c in enriched.columns: enriched[c] = pd.to_numeric(enriched[c], errors='coerce') enriched = enriched.sort_values('开仓时间').reset_index(drop=True) enriched = add_bins(enriched) # USE_FEES 开关 if not USE_FEES: enriched['盈亏金额'] = (enriched['平仓价格'] - enriched['开仓价格']) * enriched['仓位'] fee_label = '不含手续费(按价差×数量重算)' else: fee_label = '含手续费(t1_converter计算)' print(f' 数据区间: {enriched["开仓时间"].min().date()} ~ {enriched["开仓时间"].max().date()}') print(f' 手续费模式: {fee_label}') # ── 步骤6: 评分 & 附加日线/K线指标 ────────────────────────── print(f'\n🎯 步骤6: 评分 & 附加日线指标 & 构建 B/D/F/G...') enriched['_score'] = enriched.apply(comfort_score, axis=1) enriched['_combo_hit'] = enriched.apply(hits_combo, axis=1) enriched['_year'] = enriched['开仓时间'].dt.year # 附加 K线+日线指标,计算区域净分 daily = build_daily_indicators(data_with_ind) enriched = attach_zone_indicators(enriched, data_with_ind, daily) enriched['_zone_net'] = enriched.apply(zone_net_score, axis=1) enriched['_zone'] = enriched['_zone_net'].apply( lambda n: '舒适' if n >= 2 else ('黑暗' if n <= -2 else '中性')) not_dead = ~enriched['市场状态'].isin(DEATH_ZONES) high_score = enriched['_score'] >= 5 not_dark = enriched['_zone'] != '黑暗' vB_df = enriched[not_dead].copy(); vB_df['pnl_mult'] = 1.0 vD_df = enriched[not_dead & high_score].copy(); vD_df['pnl_mult'] = 1.0 vF_df = enriched[not_dead & high_score].copy() vF_df['pnl_mult'] = vF_df['_combo_hit'].apply(lambda x: BOOST_MULT if x else 1.0) # Version G:排死亡区 + 评分≥5 + 跳过黑暗区 + 舒适区×1.5仓位 vG_df = enriched[not_dead & high_score & not_dark].copy() vG_df['pnl_mult'] = vG_df['_zone'].apply(lambda z: BOOST_MULT if z == '舒适' else 1.0) vB = simulate_equity(vB_df) vD = simulate_equity(vD_df) vF = simulate_equity(vF_df) vG = simulate_equity(vG_df) sB, sD, sF, sG = calc_stats(vB), calc_stats(vD), calc_stats(vF), calc_stats(vG) # ════════════════════════════════════════════════════════════ # 输出:回测结果对比 # ════════════════════════════════════════════════════════════ print() print(SEP) print(' 版本说明') print(SEP) print(f' Version B = 排死亡区(基准) {sB["n"] if sB else 0}笔') print(f' Version D = 排死亡区 + 评分≥5 {sD["n"] if sD else 0}笔') print(f' Version F = 排死亡区 + 评分≥5 + 组合规则加仓{BOOST_MULT}x {sF["n"] if sF else 0}笔') print(f' Version G = 排死亡区 + 评分≥5 + 区域过滤 + 舒适区{BOOST_MULT}x {sG["n"] if sG else 0}笔') print() print(SEP) print(' 回测结果对比') print(SEP) def fv(s, k, fmt): if s is None: return 'N/A' v = s[k] if fmt == 'n': return str(int(v)) if fmt == 'pct': return f'{v:.1%}' if fmt == 'pct2': return f'{v:+.2%}' if fmt == 'f2': return f'{v:.2f}' if fmt == 'money':return f'{v:+,.0f}' if fmt == 'cap': return f'{v:,.0f}' return str(v) hdr = (f' {"指标":<14} {"Version B(基准)":>14} {"Version D":>12}' f' {"Version F(+加仓)":>16} {"Version G(区域)":>16}') print(hdr) print(' ' + '-' * 74) for name, k, fmt in [ ('交易笔数', 'n', 'n'), ('胜率', 'wr', 'pct'), ('盈亏比', 'plr', 'f2'), ('总收益率', 'ret', 'pct2'), ('最终资金', 'cap', 'cap'), ('总盈亏', 'pnl', 'money'), ('最大回撤', 'dd', 'pct'), ]: print(f' {name:<14} {fv(sB,k,fmt):>14} {fv(sD,k,fmt):>12}' f' {fv(sF,k,fmt):>16} {fv(sG,k,fmt):>16}') # ── 年度明细 ───────────────────────────────────────────────── print(f'\n Version B(排死亡区)年度明细:') print_yearly(vB) print(f'\n Version D(评分≥5)年度明细:') print_yearly(vD) print(f'\n Version F(评分≥5 + 加仓)年度明细:') print_yearly(vF) print(f'\n Version G(区域过滤 + 舒适区加仓)年度明细:') print_yearly(vG) # ── 组合规则命中分析 ───────────────────────────────────────── print(f'\n{SEP}') print(' 组合规则命中分析(在评分≥5的交易中)') print(SEP) combo_hit = vF_df[vF_df['_combo_hit']].copy() combo_hit['实际盈亏'] = combo_hit['盈亏金额'] * BOOST_MULT combo_hit['盈利'] = combo_hit['实际盈亏'] > 0 combo_miss = vF_df[~vF_df['_combo_hit']].copy() combo_miss['实际盈亏'] = combo_miss['盈亏金额'] * 1.0 combo_miss['盈利'] = combo_miss['实际盈亏'] > 0 print(f'\n 命中规则: {len(combo_hit)}笔 ' f'| 胜率{combo_hit["盈利"].mean():.1%} ' f'| 均盈亏{combo_hit["实际盈亏"].mean():+,.0f}元 ' f'| 仓位乘数{BOOST_MULT}x') print(f' 未命中: {len(combo_miss)}笔 ' f'| 胜率{combo_miss["盈利"].mean():.1%} ' f'| 均盈亏{combo_miss["实际盈亏"].mean():+,.0f}元 ' f'| 仓位乘数1.0x') combo_hit['_year'] = pd.to_datetime(combo_hit['开仓时间']).dt.year print(f'\n 命中规则年份分布:') print(f' {"年份":>5} {"命中笔数":>8} {"胜率":>7} {"总盈亏(1.5x)":>14}') print(f' {"-----":>5} {"--------":>8} {"-------":>7} {"----------":>14}') for y in sorted(enriched['_year'].unique()): sy = combo_hit[combo_hit['_year'] == y] if len(sy) == 0: print(f' {y:>5} {"0":>8} {"—":>7} {"0":>14}') else: print(f' {y:>5} {len(sy):>8} {sy["盈利"].mean():>7.1%} {sy["实际盈亏"].sum():>+14,.0f}') # ── 市场状态分布(Version F)──────────────────────────────── print(f'\n{SEP}') print(' Version F 各市场状态表现') print(SEP) print() print_regime_breakdown(vF) # ── Version G 区域分布说明 ─────────────────────────────────── print(f'\n{SEP}') print(' Version G — 区域分布(在 Version D 85笔中)') print(SEP) vD_z = enriched[not_dead & high_score].copy() zone_grp = vD_z.groupby('_zone').agg( n=('盈亏金额', 'count'), wr=('盈亏金额', lambda x: (x > 0).mean()), avg_pnl=('盈亏金额', 'mean'), total_pnl=('盈亏金额', 'sum'), ).reset_index() print(f'\n {"区域":<6} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"总盈亏":>12} {"Version G操作"}') print(f' {"-"*6} {"-"*5} {"-"*7} {"-"*10} {"-"*12} {"-"*16}') action_map = {'舒适': f'入场 ×{BOOST_MULT} 仓位', '中性': '入场 ×1.0 仓位', '黑暗': '⛔ 跳过'} for _, r in zone_grp.sort_values('wr', ascending=False).iterrows(): act = action_map.get(r['_zone'], '') print(f' {r["_zone"]:<6} {int(r["n"]):>5} {r["wr"]:>7.1%}' f' {r["avg_pnl"]:>+10,.0f} {r["total_pnl"]:>+12,.0f} {act}') # 各年黑暗区过滤效果 print(f'\n 黑暗区过滤效果(Version D → Version G 减少笔数):') print(f' {"年份":>5} {"D总笔":>6} {"G入场":>6} {"过滤":>5} {"D总盈亏":>12} {"G总盈亏":>12} {"改善":>12}') print(f' {"-"*5} {"-"*6} {"-"*6} {"-"*5} {"-"*12} {"-"*12} {"-"*12}') for y in sorted(enriched['_year'].unique()): d_y = vD[vD['开仓时间'].dt.year == y] if '开仓时间' in vD.columns else pd.DataFrame() g_y = vG[vG['开仓时间'].dt.year == y] if '开仓时间' in vG.columns else pd.DataFrame() nd = len(d_y); ng = len(g_y) pd_ = d_y['实际盈亏'].sum() if nd else 0 pg_ = g_y['实际盈亏'].sum() if ng else 0 print(f' {y:>5} {nd:>6} {ng:>6} {nd-ng:>5}' f' {pd_:>+12,.0f} {pg_:>+12,.0f} {pg_-pd_:>+12,.0f}') # ── K线指标详情(Version F 最近20笔)──────────────────────── print(f'\n{SEP}') print(' Version F 最近20笔 — 入场时K线指标') print(SEP) print(f'\n {"":1} {"开仓时间":<17} {"市场状态":<12} {"评分":>4} {"组合":>4}' f' {"RSI":>6} {"K":>6} {"D":>6} {"J":>6}' f' {"BB位置":>6} {"动量%":>7} {"量比":>6} {"实际盈亏":>10}') print(f' {"-"*1} {"-"*17} {"-"*12} {"-"*4} {"-"*4}' f' {"-"*6} {"-"*6} {"-"*6} {"-"*6}' f' {"-"*6} {"-"*7} {"-"*6} {"-"*10}') for _, r in vF.tail(20).iterrows(): t = r['开仓时间'] win = '★' if r['盈利'] else '✗' boost = 'HIT' if r['pnl_mult'] > 1.0 else ' ' sc = int(enriched.loc[enriched['开仓时间'] == t, '_score'].iloc[0]) \ if (enriched['开仓时间'] == t).any() else -1 ms = str(r.get('市场状态', '')) # K线指标 if t in data_with_ind.index: bar = data_with_ind.loc[t] rsi_v = bar.get('RSI', float('nan')) k_v = bar.get('K', float('nan')) d_v = bar.get('D', float('nan')) j_v = bar.get('J', float('nan')) bb_u = bar.get('BB_upper', float('nan')) bb_l = bar.get('BB_lower', float('nan')) bb_p = (bar.get('Close', float('nan')) - bb_l) / (bb_u - bb_l + 1e-9) mom = bar.get('Momentum', float('nan')) * 100 vr = bar.get('Volume_Ratio', float('nan')) else: rsi_v = k_v = d_v = j_v = bb_p = mom = vr = float('nan') print(f' {win} {str(t)[:16]} {ms:<12} {sc:>4} {boost:>4}' f' {rsi_v:>6.1f} {k_v:>6.1f} {d_v:>6.1f} {j_v:>6.1f}' f' {bb_p:>6.2f} {mom:>7.2f} {vr:>6.2f} {r["实际盈亏"]:>+10,.0f}') # ── 评分分布 ───────────────────────────────────────────────── print(f'\n{SEP}') print(' 评分分布(非死亡区全量)') print(SEP) nondead = enriched[not_dead] print(f'\n {"评分":>5} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"累计盈亏":>12} {"选入D/F?":>8}') print(f' {"-"*5} {"-"*5} {"-"*7} {"-"*10} {"-"*12} {"-"*8}') for sc in sorted(nondead['_score'].unique()): sub = nondead[nondead['_score'] == sc] wr = (sub['盈亏金额'] > 0).mean() avg = sub['盈亏金额'].mean() tot = sub['盈亏金额'].sum() flag = '✓ 入场' if sc >= 5 else ' 跳过' print(f' {sc:>5} {len(sub):>5} {wr:>7.1%} {avg:>+10,.0f} {tot:>+12,.0f} {flag}') # ── CSV 导出 ───────────────────────────────────────────────── if EXPORT_CSV: ts = datetime.now().strftime('%Y%m%d_%H%M%S') fee_tag = 'nofee' if not USE_FEES else 'fee' out_dir = os.path.dirname(__file__) # K线指标列(入场时刻) kline_cols = [c for c in ['RSI','K','D','J','MACD','MACD_hist', 'BB_width','ATR_Pct','Momentum','Volume_Ratio'] if c in data_with_ind.columns] def attach_kline(df_in): """给 df_in 每行附加入场时刻的K线指标""" rows = [] for t in df_in['开仓时间']: if t in data_with_ind.index: rows.append(data_with_ind.loc[t, kline_cols].to_dict()) else: rows.append({c: float('nan') for c in kline_cols}) return pd.DataFrame(rows, index=df_in.index) base_cols = [ '交易方向', '开仓时间', '平仓时间', '开仓价格', '平仓价格', '仓位', '盈亏金额', '盈亏百分比', '退出原因', 'T1调整', '市场状态', '波动率水平', '波动率分位', 'RSI区域', 'RSI分位', '趋势强度', '布林带区域', '_score', 'vol_bin', 'rsi_bin', 'ts_bin', '成交量分位', '布林带位置', '1日动量', '_zone_net', '_zone', # Version G 区域字段 ] def build_export(df, label): out = df.copy().reset_index(drop=True) out.insert(0, '序号', range(1, len(out) + 1)) out['版本'] = label out['手续费模式'] = fee_label out['仓位乘数'] = out.get('pnl_mult', pd.Series([1.0]*len(out))) out['实际盈亏'] = out['盈亏金额'] * out['仓位乘数'] out['盈利标记'] = out['实际盈亏'].apply(lambda x: '盈' if x > 0 else '亏') out['验证盈亏%'] = ((out['平仓价格'] - out['开仓价格']) / out['开仓价格'] * 100).round(4) # 附加K线指标 kl = attach_kline(out) for c in kl.columns: out[c] = kl[c].values keep = (['序号', '版本', '手续费模式', '盈利标记', '仓位乘数', '实际盈亏', '资金余额'] + [c for c in base_cols if c in out.columns] + kline_cols + ['验证盈亏%']) keep = [c for c in keep if c in out.columns] return out[keep] print(f'\n📁 导出CSV...') exp_B = build_export(vB, 'B_排死亡区') exp_D = build_export(vD, 'D_评分≥5') exp_F = build_export(vF, 'F_评分≥5+加仓') exp_G = build_export(vG, 'G_区域过滤') for label, exp in [('B', exp_B), ('D', exp_D), ('F', exp_F), ('G', exp_G)]: fname = os.path.join(out_dir, f'backtest_v{label}_{fee_tag}_{ts}.csv') exp.to_csv(fname, index=False, encoding='utf-8-sig') print(f' Version {label}: {os.path.basename(fname)} ({len(exp)}笔)') combined = pd.concat([exp_B, exp_D, exp_F, exp_G], ignore_index=True) fname_all = os.path.join(out_dir, f'backtest_all_{fee_tag}_{ts}.csv') combined.to_csv(fname_all, index=False, encoding='utf-8-sig') print(f' 合并总表: {os.path.basename(fname_all)} ({len(combined)}笔)') print() print(SEP) print(' 回测完成') print(SEP) if __name__ == '__main__': main()