| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- CYB50 T+1 最终策略回测(完整K线管道版)
- ──────────────────────────────────────────────────────────────────────
- 数据管道:
- 原始K线CSV → 技术指标 → 多空信号 → 交易执行 → T+1转换 → 市场环境标注 → 过滤分析
- 策略逻辑:
- 必要条件:市场状态 NOT IN ['下跌趋势低波', '震荡低波'](死亡区过滤)
- 入场条件:加分模型评分 >= 5(Version D 阈值)
- 仓位加成:评分≥5 且命中组合规则 → 1.5x 仓位(调整盈亏金额)
- 对比版本:
- B — 仅排死亡区(基准)
- D — 排死亡区 + 评分≥5(入场过滤)
- F — 排死亡区 + 评分≥5 + 组合规则加仓(最终策略)
- 组合规则(走前向验证,训练2023-2024,验证2025,盲测2026):
- 规则1:RSI区域=中性偏弱 & rsi_bin=rsi偏低 (训练67% → 验证80%)
- 规则2:vol_bin=vol中 & ts_bin=ts强 (训练57% → 验证60%)
- 规则3:波动率水平=中等 & ts_bin=ts强 (训练57% → 验证60%)
- 规则4:市场状态=震荡高波 & RSI区域=中性偏弱 (训练57% → 验证50%)
- 规则5:RSI区域=中性偏弱 & ts_bin=ts弱 (训练57% → 验证50%)
- """
- import sys, io, os, contextlib
- if sys.platform == 'win32':
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
- import pandas as pd
- import numpy as np
- from datetime import datetime
- import warnings
- warnings.filterwarnings('ignore')
- from cyb50_30min_dual_direction import (
- IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor
- )
- from t1_converter import simulate_t1_trades
- from comfort_zone_analyzer import MarketEnvironmentAnalyzer, ComfortZoneAnalyzer
- # ──────────────────────────────────────────────────────────────────
- # 配置项
- # ──────────────────────────────────────────────────────────────────
- DATA_CSV = os.path.join(os.path.dirname(__file__), 'cyb50_30min_2023_to_20260325.csv')
- INITIAL = 1_000_000
- DEATH_ZONES = {'下跌趋势低波', '震荡低波'}
- BOOST_MULT = 1.5 # 命中组合规则时的仓位乘数
- USE_FEES = True # True=含手续费(t1_converter计算); False=按价差×数量重算
- EXPORT_CSV = True # True=每次运行后导出各版本明细到 CSV(含K线指标)
- VERBOSE = False # True=显示策略执行过程详情(大量输出)
- SEP = '=' * 72
- # ──────────────────────────────────────────────────────────────────
- # 工具:压制子模块 stdout
- # ──────────────────────────────────────────────────────────────────
- @contextlib.contextmanager
- def suppress_stdout():
- old = sys.stdout
- sys.stdout = io.StringIO()
- try:
- yield
- finally:
- sys.stdout = old
- def ctx():
- """每次返回新的上下文管理器(VERBOSE=False时压制输出)"""
- return contextlib.nullcontext() if VERBOSE else suppress_stdout()
- # ──────────────────────────────────────────────────────────────────
- # 指标分箱
- # ──────────────────────────────────────────────────────────────────
- def vol_bin(v):
- if pd.isna(v): return 'unknown'
- if v < 0.20: return 'vol极低'
- if v < 0.40: return 'vol低'
- if v < 0.60: return 'vol中'
- if v < 0.80: return 'vol高'
- return 'vol极高'
- def rsi_bin(v):
- if pd.isna(v): return 'unknown'
- if v < 0.05: return 'rsi极底'
- if v < 0.10: return 'rsi底'
- if v < 0.20: return 'rsi低'
- if v < 0.40: return 'rsi偏低'
- if v < 0.60: return 'rsi中'
- if v < 0.80: return 'rsi偏高'
- return 'rsi高'
- def ts_bin(v):
- if pd.isna(v): return 'unknown'
- if v < 1.0: return 'ts弱'
- if v < 1.5: return 'ts中弱'
- if v < 2.5: return 'ts中'
- if v < 4.0: return 'ts强'
- return 'ts极强'
- def add_bins(df):
- df = df.copy()
- df['vol_bin'] = df['波动率分位'].apply(vol_bin)
- df['rsi_bin'] = df['RSI分位'].apply(rsi_bin)
- df['ts_bin'] = df['趋势强度'].apply(ts_bin)
- return df
- # ──────────────────────────────────────────────────────────────────
- # Version G: 日线指标 & 区域净分
- # ──────────────────────────────────────────────────────────────────
- def build_daily_indicators(data_df: pd.DataFrame) -> pd.DataFrame:
- """从30分钟K线聚合日线,计算日线技术指标"""
- d = data_df.resample('D').agg(
- Open=('Open', 'first'), High=('High', 'max'),
- Low=('Low', 'min'), Close=('Close', 'last'),
- Volume=('Volume', 'sum')
- ).dropna(subset=['Close'])
- delta = d['Close'].diff()
- gain = delta.where(delta > 0, 0).rolling(14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
- d['RSI_D'] = 100 - 100 / (1 + gain / (loss + 1e-9))
- low9 = d['Low'].rolling(9).min()
- high9 = d['High'].rolling(9).max()
- rsv = (d['Close'] - low9) / (high9 - low9 + 1e-9) * 100
- d['K_D'] = rsv.ewm(com=2, adjust=False).mean()
- d['D_D'] = d['K_D'].ewm(com=2, adjust=False).mean()
- d['J_D'] = 3 * d['K_D'] - 2 * d['D_D']
- d['MA5_D'] = d['Close'].rolling(5).mean()
- d['MA20_D'] = d['Close'].rolling(20).mean()
- d['MA5_slope'] = d['MA5_D'].diff() / d['MA5_D'].shift() * 100 # 日MA5斜率%
- d['pct_MA20'] = (d['Close'] - d['MA20_D']) / d['MA20_D'] * 100
- bm = d['Close'].rolling(20).mean()
- bs = d['Close'].rolling(20).std()
- d['BB_pos_D'] = (d['Close'] - (bm - 2*bs)) / (4*bs + 1e-9)
- d['Mom5_D'] = d['Close'].pct_change(5) * 100
- d['Mom10_D'] = d['Close'].pct_change(10) * 100
- return d
- def attach_zone_indicators(enriched: pd.DataFrame,
- data_with_ind: pd.DataFrame,
- daily: pd.DataFrame) -> pd.DataFrame:
- """附加入场时刻的30分钟K线指标 + 当日日线指标"""
- kline_cols = ['RSI', 'K', 'D', 'J', 'MACD_hist',
- 'BB_width', 'ATR_Pct', 'Momentum', 'Volume_Ratio']
- daily_cols = ['RSI_D', 'K_D', 'J_D', 'MA5_slope',
- 'pct_MA20', 'BB_pos_D', 'Mom5_D', 'Mom10_D']
- out = enriched.copy()
- # 30分钟指标 — 向量化
- for c in kline_cols:
- if c in data_with_ind.columns:
- out[c] = out['开仓时间'].map(
- lambda t, col=c: data_with_ind.at[t, col]
- if t in data_with_ind.index else float('nan'))
- # 日线指标 — 向量化
- entry_days = out['开仓时间'].dt.normalize()
- for c in daily_cols:
- if c in daily.columns:
- out[c] = entry_days.map(
- lambda t, col=c: daily.at[t, col]
- if t in daily.index else float('nan'))
- return out
- # 舒适区规则(命中 +1 分)— 来自 kline_zone_analysis 分析结论
- COMFORT_RULES = [
- # (col1, lo1, hi1, col2, lo2, hi2, 描述)
- ('J', 20, 50, 'J_D', 20, 50, 'J跨周期共振低位'), # WR 85.7%
- ('RSI', 45, 55, 'RSI_D', 60, 70, 'RSI 30min温和+日线强'), # WR 80.0%
- ('K', 20, 40, 'K_D', 80, 999, 'K低位+日线K超买(反弹)'), # WR 71.4%
- ('K', 20, 40, 'K_D', 20, 40, 'K跨周期共振低位'), # WR 62.5%
- ('J', 0, 20, 'J_D', 20, 50, 'J超卖+日线低位'), # WR 66.7%
- ('RSI', 25, 35, 'MA5_slope', 0.1, 0.5, 'RSI弱+日MA5上升'), # WR 66.7%
- ('RSI', 35, 45, 'RSI_D', 40, 50, 'RSI双偏弱共振'), # WR 66.7%
- ('RSI', 35, 45, 'MA5_slope', 0.1, 0.5, 'RSI偏弱+MA5上升'), # WR 66.7%
- ('RSI_D', 60, 70, 'Mom5_D', -2, 0, '日RSI强+小幅回调中'), # WR 63.6%
- ('BB_pos_D',0.3,0.5, 'J_D', 0, 20, 'BB中下+日KDJ超卖'), # WR 62.5%
- ('MACD_hist',0.8,999, None, None, None, 'MACD强多头'), # WR 66.7%
- ('Momentum', 0, 0.01, None, None, None, '30min小涨'), # WR 62.5%
- ]
- # 黑暗区规则(命中 -1 分)
- DARK_RULES = [
- ('RSI', 45, 55, 'RSI_D', 50, 60, 'RSI双均衡无方向'), # WR 25%
- ('Momentum',-999,-0.03,'Mom5_D', -999, -4, '动量双大跌共振'), # WR 20%
- ('Volume_Ratio',1.8,999,'ATR_Pct', -999,0.006, '大放量+极低波动'), # WR 28.6%
- ('MACD_hist', 0.2, 0.8, None, None, None, 'MACD弱多头'), # WR 25%
- ('Volume_Ratio',-999,0.5,None,None, None, '极度缩量'), # WR 25%
- ('K', 20, 40, 'K_D', 60, 80, 'K低位+日线K高位'), # WR 33.3%
- ('RSI', 35, 45, 'MA5_slope', 0.5, 999, 'RSI偏弱+急速上升'), # WR 25%
- ('RSI', 35, 45, 'RSI_D', 70, 999, 'RSI偏弱+日线超买'), # WR 25%
- ('Momentum',-999,-0.03, None, None, None, '30min大跌'), # WR 26.7%
- ('RSI', 25, 35, 'MA5_slope', -999,-0.5, 'RSI弱+日MA5急降'), # WR 26.3%
- ('BB_pos_D',-999, 0.1, 'J_D', 0, 20, 'BB超下轨+日KDJ超卖'), # WR 28.6%
- ('Mom10_D', 6, 999, None, None, None, '日线10日大涨高位'), # WR 32.1%
- ]
- def _in_range(val, lo, hi) -> bool:
- v = pd.to_numeric(val, errors='coerce')
- return not pd.isna(v) and lo <= v < hi
- def zone_net_score(row) -> int:
- """区域净分 = 舒适规则命中数 − 黑暗规则命中数"""
- def hit(c1, lo1, hi1, c2, lo2, hi2):
- if not _in_range(row.get(c1, float('nan')), lo1, hi1):
- return False
- if c2 is None:
- return True
- return _in_range(row.get(c2, float('nan')), lo2, hi2)
- comfort = sum(1 for r in COMFORT_RULES if hit(*r[:6]))
- dark = sum(1 for r in DARK_RULES if hit(*r[:6]))
- return comfort - dark
- # ──────────────────────────────────────────────────────────────────
- # 加分模型(Version D 使用,阈值≥5)
- # ──────────────────────────────────────────────────────────────────
- def comfort_score(row) -> int:
- s = 0
- ms = str(row.get('市场状态', ''))
- vl = str(row.get('波动率水平', ''))
- rsi = str(row.get('RSI区域', ''))
- vq = row.get('波动率分位', float('nan'))
- rq = row.get('RSI分位', float('nan'))
- ts = row.get('趋势强度', float('nan'))
- t1 = str(row.get('T1调整', '')) # '否' 或 '是(T0→T1)'
- if ms == '下跌趋势高波' and vl == '极低': s += 3 # 最优组合
- if pd.notna(rq) and 0.05 <= rq < 0.10: s += 3 # 最优RSI区间
- if pd.notna(vq) and vq < 0.30: s += 2 # 低波动率分位
- if pd.notna(ts) and 1.5 <= ts < 4.0: s += 2 # 适中趋势强度
- if rsi == '中性偏弱': s += 2 # 最稳定RSI
- if 'T0' not in t1: s += 2 # 非T0延期单
- if vl in ('极低', '低', '中等'): s += 1 # 低中波动率
- if pd.notna(rq) and rq >= 0.60: s += 1 # RSI回升阶段
- return s
- # ──────────────────────────────────────────────────────────────────
- # 组合规则(走前向验证后锁定)
- # ──────────────────────────────────────────────────────────────────
- COMBO_RULES = [
- {'RSI区域': '中性偏弱', 'rsi_bin': 'rsi偏低'}, # 规则1: 训练67% → 验证80%
- {'vol_bin': 'vol中', 'ts_bin': 'ts强'}, # 规则2: 训练57% → 验证60%
- {'波动率水平': '中等', 'ts_bin': 'ts强'}, # 规则3: 训练57% → 验证60%
- {'市场状态': '震荡高波', 'RSI区域': '中性偏弱'}, # 规则4: 训练57% → 验证50%
- {'RSI区域': '中性偏弱', 'ts_bin': 'ts弱'}, # 规则5: 训练57% → 验证50%
- ]
- def hits_combo(row) -> bool:
- for rule in COMBO_RULES:
- if all(str(row.get(col, '')) == str(val) for col, val in rule.items()):
- return True
- return False
- # ──────────────────────────────────────────────────────────────────
- # 权益曲线模拟(支持仓位乘数)
- # ──────────────────────────────────────────────────────────────────
- def simulate_equity(df, initial=INITIAL):
- df = df.copy().reset_index(drop=True)
- if 'pnl_mult' not in df.columns:
- df['pnl_mult'] = 1.0
- cap = float(initial)
- caps = []
- for _, r in df.iterrows():
- cap += float(r['盈亏金额']) * float(r['pnl_mult'])
- caps.append(cap)
- df['资金余额'] = caps
- df['实际盈亏'] = df['盈亏金额'] * df['pnl_mult']
- df['盈利'] = df['实际盈亏'] > 0
- return df
- # ──────────────────────────────────────────────────────────────────
- # 绩效统计
- # ──────────────────────────────────────────────────────────────────
- def calc_stats(df, initial=INITIAL):
- if len(df) == 0:
- return None
- wr = df['盈利'].mean()
- pnl = df['实际盈亏'].sum()
- cap = df['资金余额'].iloc[-1]
- ret = (cap - initial) / initial
- win = df[df['盈利']]['实际盈亏']
- los = df[~df['盈利']]['实际盈亏']
- plr = abs(win.mean() / los.mean()) if len(los) > 0 and los.mean() != 0 else float('inf')
- eq = df['资金余额'].values
- pk = np.maximum.accumulate(np.append([initial], eq))
- dd = ((eq - pk[1:]) / pk[1:]).min() if len(eq) > 0 else 0
- return dict(n=len(df), wr=wr, pnl=pnl, cap=cap, ret=ret, plr=plr, dd=dd)
- def print_yearly(df, initial=INITIAL):
- if len(df) == 0:
- return
- df = df.copy()
- df['年份'] = pd.to_datetime(df['开仓时间']).dt.year
- prev = initial
- for y in sorted(df['年份'].unique()):
- sy = df[df['年份'] == y]
- pnl = sy['实际盈亏'].sum()
- wr = sy['盈利'].mean()
- end = sy['资金余额'].iloc[-1]
- print(f" {y}年: {len(sy):>3}笔 胜率{wr:.1%} | {pnl:>+12,.0f}元 ({pnl/prev:>+.2%}) → {end:,.0f}元")
- prev = end
- def print_regime_breakdown(df):
- if len(df) == 0 or '市场状态' not in df.columns:
- return
- print(f" {'市场状态':<15} {'笔数':>5} {'胜率':>7} {'均盈亏':>10} {'总盈亏':>12}")
- print(f" {'-'*15} {'-'*5} {'-'*7} {'-'*10} {'-'*12}")
- for ms in sorted(df['市场状态'].unique()):
- sub = df[df['市场状态'] == ms]
- wr = sub['盈利'].mean()
- avg = sub['实际盈亏'].mean()
- tot = sub['实际盈亏'].sum()
- print(f" {ms:<15} {len(sub):>5} {wr:>7.1%} {avg:>+10,.0f} {tot:>+12,.0f}")
- # ──────────────────────────────────────────────────────────────────
- # 主流程
- # ──────────────────────────────────────────────────────────────────
- def main():
- print(SEP)
- print(' CYB50 T+1 最终策略回测(完整K线管道版)')
- print(f' Version D (排死亡区+评分≥5) vs Version F (D + 组合规则加仓{BOOST_MULT}x)')
- print(SEP)
- # ── 步骤1: 加载原始K线 CSV ───────────────────────────────────
- print(f'\n📂 步骤1: 加载原始K线数据...')
- raw_csv = pd.read_csv(DATA_CSV, encoding='utf-8-sig')
- fetcher = IntradayDataFetcher()
- with ctx():
- data = fetcher._process_dataframe(
- raw_csv,
- pd.Timestamp('2000-01-01'),
- pd.Timestamp('2099-12-31')
- )
- print(f' K线: {len(data)}条 {data.index[0].date()} ~ {data.index[-1].date()}')
- # ── 步骤2: 计算技术指标 ──────────────────────────────────────
- print(f'\n📈 步骤2: 计算技术指标...')
- with ctx():
- data_with_ind = fetcher.calculate_intraday_indicators(data)
- ind_cols = [c for c in ['RSI','K','D','J','MACD','MACD_hist','BB_width',
- 'ATR_Pct','Momentum','Volume_Ratio'] if c in data_with_ind.columns]
- print(f' 指标列: {ind_cols}')
- # ── 步骤3: 生成信号 & 执行多空双向交易 ──────────────────────
- print(f'\n🔄 步骤3: 生成多空双向信号并执行交易...')
- gen = DualDirectionSignalGenerator()
- with ctx():
- signals_df = gen.generate_dual_direction_signals(data_with_ind)
- executor = DualDirectionExecutor(initial_capital=INITIAL)
- with ctx():
- _, trades_df = executor.execute_dual_direction_trades(signals_df)
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- short_trades = trades_df[trades_df['交易方向'] == '做空'].copy()
- print(f' 总交易: {len(trades_df)}笔 做多: {len(long_trades)}笔 做空: {len(short_trades)}笔')
- # ── 步骤4: T+1规则转换(仅做多) ────────────────────────────
- print(f'\n📅 步骤4: T+1规则转换(做多交易)...')
- with ctx():
- t1_trades = simulate_t1_trades(data_with_ind, long_trades, INITIAL)
- if len(t1_trades) == 0:
- print(' ⚠️ T+1转换后无交易记录,退出。')
- return
- t1_adj = (t1_trades['T+1调整'] == '是(T0→T1)').sum()
- print(f' T+1交易: {len(t1_trades)}笔 其中T0→T1调整: {t1_adj}笔')
- # ── 步骤5: 计算市场环境 & 标注每笔交易 ──────────────────────
- print(f'\n🔬 步骤5: 市场环境分析 & 交易标注...')
- market_analyzer = MarketEnvironmentAnalyzer(data_with_ind)
- cza = ComfortZoneAnalyzer(t1_trades, market_analyzer)
- with ctx():
- cza._enrich_trades_with_environment()
- enriched = cza.enriched_trades.copy()
- print(f' 成功标注: {len(enriched)}笔交易')
- # ── 规范化列名 & 数值类型 ────────────────────────────────────
- # T+1调整 → T1调整(供 comfort_score 使用)
- if 'T+1调整' in enriched.columns:
- enriched.rename(columns={'T+1调整': 'T1调整'}, inplace=True)
- enriched['开仓时间'] = pd.to_datetime(enriched['开仓时间'])
- enriched['平仓时间'] = pd.to_datetime(enriched['平仓时间'])
- for c in ['盈亏金额', '盈亏百分比', '波动率分位', 'RSI分位', '趋势强度',
- '布林带位置', '1日动量', '成交量分位']:
- if c in enriched.columns:
- enriched[c] = pd.to_numeric(enriched[c], errors='coerce')
- enriched = enriched.sort_values('开仓时间').reset_index(drop=True)
- enriched = add_bins(enriched)
- # USE_FEES 开关
- if not USE_FEES:
- enriched['盈亏金额'] = (enriched['平仓价格'] - enriched['开仓价格']) * enriched['仓位']
- fee_label = '不含手续费(按价差×数量重算)'
- else:
- fee_label = '含手续费(t1_converter计算)'
- print(f' 数据区间: {enriched["开仓时间"].min().date()} ~ {enriched["开仓时间"].max().date()}')
- print(f' 手续费模式: {fee_label}')
- # ── 步骤6: 评分 & 附加日线/K线指标 ──────────────────────────
- print(f'\n🎯 步骤6: 评分 & 附加日线指标 & 构建 B/D/F/G...')
- enriched['_score'] = enriched.apply(comfort_score, axis=1)
- enriched['_combo_hit'] = enriched.apply(hits_combo, axis=1)
- enriched['_year'] = enriched['开仓时间'].dt.year
- # 附加 K线+日线指标,计算区域净分
- daily = build_daily_indicators(data_with_ind)
- enriched = attach_zone_indicators(enriched, data_with_ind, daily)
- enriched['_zone_net'] = enriched.apply(zone_net_score, axis=1)
- enriched['_zone'] = enriched['_zone_net'].apply(
- lambda n: '舒适' if n >= 2 else ('黑暗' if n <= -2 else '中性'))
- not_dead = ~enriched['市场状态'].isin(DEATH_ZONES)
- high_score = enriched['_score'] >= 5
- not_dark = enriched['_zone'] != '黑暗'
- vB_df = enriched[not_dead].copy(); vB_df['pnl_mult'] = 1.0
- vD_df = enriched[not_dead & high_score].copy(); vD_df['pnl_mult'] = 1.0
- vF_df = enriched[not_dead & high_score].copy()
- vF_df['pnl_mult'] = vF_df['_combo_hit'].apply(lambda x: BOOST_MULT if x else 1.0)
- # Version G:排死亡区 + 评分≥5 + 跳过黑暗区 + 舒适区×1.5仓位
- vG_df = enriched[not_dead & high_score & not_dark].copy()
- vG_df['pnl_mult'] = vG_df['_zone'].apply(lambda z: BOOST_MULT if z == '舒适' else 1.0)
- vB = simulate_equity(vB_df)
- vD = simulate_equity(vD_df)
- vF = simulate_equity(vF_df)
- vG = simulate_equity(vG_df)
- sB, sD, sF, sG = calc_stats(vB), calc_stats(vD), calc_stats(vF), calc_stats(vG)
- # ════════════════════════════════════════════════════════════
- # 输出:回测结果对比
- # ════════════════════════════════════════════════════════════
- print()
- print(SEP)
- print(' 版本说明')
- print(SEP)
- print(f' Version B = 排死亡区(基准) {sB["n"] if sB else 0}笔')
- print(f' Version D = 排死亡区 + 评分≥5 {sD["n"] if sD else 0}笔')
- print(f' Version F = 排死亡区 + 评分≥5 + 组合规则加仓{BOOST_MULT}x {sF["n"] if sF else 0}笔')
- print(f' Version G = 排死亡区 + 评分≥5 + 区域过滤 + 舒适区{BOOST_MULT}x {sG["n"] if sG else 0}笔')
- print()
- print(SEP)
- print(' 回测结果对比')
- print(SEP)
- def fv(s, k, fmt):
- if s is None: return 'N/A'
- v = s[k]
- if fmt == 'n': return str(int(v))
- if fmt == 'pct': return f'{v:.1%}'
- if fmt == 'pct2': return f'{v:+.2%}'
- if fmt == 'f2': return f'{v:.2f}'
- if fmt == 'money':return f'{v:+,.0f}'
- if fmt == 'cap': return f'{v:,.0f}'
- return str(v)
- hdr = (f' {"指标":<14} {"Version B(基准)":>14} {"Version D":>12}'
- f' {"Version F(+加仓)":>16} {"Version G(区域)":>16}')
- print(hdr)
- print(' ' + '-' * 74)
- for name, k, fmt in [
- ('交易笔数', 'n', 'n'),
- ('胜率', 'wr', 'pct'),
- ('盈亏比', 'plr', 'f2'),
- ('总收益率', 'ret', 'pct2'),
- ('最终资金', 'cap', 'cap'),
- ('总盈亏', 'pnl', 'money'),
- ('最大回撤', 'dd', 'pct'),
- ]:
- print(f' {name:<14} {fv(sB,k,fmt):>14} {fv(sD,k,fmt):>12}'
- f' {fv(sF,k,fmt):>16} {fv(sG,k,fmt):>16}')
- # ── 年度明细 ─────────────────────────────────────────────────
- print(f'\n Version B(排死亡区)年度明细:')
- print_yearly(vB)
- print(f'\n Version D(评分≥5)年度明细:')
- print_yearly(vD)
- print(f'\n Version F(评分≥5 + 加仓)年度明细:')
- print_yearly(vF)
- print(f'\n Version G(区域过滤 + 舒适区加仓)年度明细:')
- print_yearly(vG)
- # ── 组合规则命中分析 ─────────────────────────────────────────
- print(f'\n{SEP}')
- print(' 组合规则命中分析(在评分≥5的交易中)')
- print(SEP)
- combo_hit = vF_df[vF_df['_combo_hit']].copy()
- combo_hit['实际盈亏'] = combo_hit['盈亏金额'] * BOOST_MULT
- combo_hit['盈利'] = combo_hit['实际盈亏'] > 0
- combo_miss = vF_df[~vF_df['_combo_hit']].copy()
- combo_miss['实际盈亏'] = combo_miss['盈亏金额'] * 1.0
- combo_miss['盈利'] = combo_miss['实际盈亏'] > 0
- print(f'\n 命中规则: {len(combo_hit)}笔 '
- f'| 胜率{combo_hit["盈利"].mean():.1%} '
- f'| 均盈亏{combo_hit["实际盈亏"].mean():+,.0f}元 '
- f'| 仓位乘数{BOOST_MULT}x')
- print(f' 未命中: {len(combo_miss)}笔 '
- f'| 胜率{combo_miss["盈利"].mean():.1%} '
- f'| 均盈亏{combo_miss["实际盈亏"].mean():+,.0f}元 '
- f'| 仓位乘数1.0x')
- combo_hit['_year'] = pd.to_datetime(combo_hit['开仓时间']).dt.year
- print(f'\n 命中规则年份分布:')
- print(f' {"年份":>5} {"命中笔数":>8} {"胜率":>7} {"总盈亏(1.5x)":>14}')
- print(f' {"-----":>5} {"--------":>8} {"-------":>7} {"----------":>14}')
- for y in sorted(enriched['_year'].unique()):
- sy = combo_hit[combo_hit['_year'] == y]
- if len(sy) == 0:
- print(f' {y:>5} {"0":>8} {"—":>7} {"0":>14}')
- else:
- print(f' {y:>5} {len(sy):>8} {sy["盈利"].mean():>7.1%} {sy["实际盈亏"].sum():>+14,.0f}')
- # ── 市场状态分布(Version F)────────────────────────────────
- print(f'\n{SEP}')
- print(' Version F 各市场状态表现')
- print(SEP)
- print()
- print_regime_breakdown(vF)
- # ── Version G 区域分布说明 ───────────────────────────────────
- print(f'\n{SEP}')
- print(' Version G — 区域分布(在 Version D 85笔中)')
- print(SEP)
- vD_z = enriched[not_dead & high_score].copy()
- zone_grp = vD_z.groupby('_zone').agg(
- n=('盈亏金额', 'count'),
- wr=('盈亏金额', lambda x: (x > 0).mean()),
- avg_pnl=('盈亏金额', 'mean'),
- total_pnl=('盈亏金额', 'sum'),
- ).reset_index()
- print(f'\n {"区域":<6} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"总盈亏":>12} {"Version G操作"}')
- print(f' {"-"*6} {"-"*5} {"-"*7} {"-"*10} {"-"*12} {"-"*16}')
- action_map = {'舒适': f'入场 ×{BOOST_MULT} 仓位', '中性': '入场 ×1.0 仓位', '黑暗': '⛔ 跳过'}
- for _, r in zone_grp.sort_values('wr', ascending=False).iterrows():
- act = action_map.get(r['_zone'], '')
- print(f' {r["_zone"]:<6} {int(r["n"]):>5} {r["wr"]:>7.1%}'
- f' {r["avg_pnl"]:>+10,.0f} {r["total_pnl"]:>+12,.0f} {act}')
- # 各年黑暗区过滤效果
- print(f'\n 黑暗区过滤效果(Version D → Version G 减少笔数):')
- print(f' {"年份":>5} {"D总笔":>6} {"G入场":>6} {"过滤":>5} {"D总盈亏":>12} {"G总盈亏":>12} {"改善":>12}')
- print(f' {"-"*5} {"-"*6} {"-"*6} {"-"*5} {"-"*12} {"-"*12} {"-"*12}')
- for y in sorted(enriched['_year'].unique()):
- d_y = vD[vD['开仓时间'].dt.year == y] if '开仓时间' in vD.columns else pd.DataFrame()
- g_y = vG[vG['开仓时间'].dt.year == y] if '开仓时间' in vG.columns else pd.DataFrame()
- nd = len(d_y); ng = len(g_y)
- pd_ = d_y['实际盈亏'].sum() if nd else 0
- pg_ = g_y['实际盈亏'].sum() if ng else 0
- print(f' {y:>5} {nd:>6} {ng:>6} {nd-ng:>5}'
- f' {pd_:>+12,.0f} {pg_:>+12,.0f} {pg_-pd_:>+12,.0f}')
- # ── K线指标详情(Version F 最近20笔)────────────────────────
- print(f'\n{SEP}')
- print(' Version F 最近20笔 — 入场时K线指标')
- print(SEP)
- print(f'\n {"":1} {"开仓时间":<17} {"市场状态":<12} {"评分":>4} {"组合":>4}'
- f' {"RSI":>6} {"K":>6} {"D":>6} {"J":>6}'
- f' {"BB位置":>6} {"动量%":>7} {"量比":>6} {"实际盈亏":>10}')
- print(f' {"-"*1} {"-"*17} {"-"*12} {"-"*4} {"-"*4}'
- f' {"-"*6} {"-"*6} {"-"*6} {"-"*6}'
- f' {"-"*6} {"-"*7} {"-"*6} {"-"*10}')
- for _, r in vF.tail(20).iterrows():
- t = r['开仓时间']
- win = '★' if r['盈利'] else '✗'
- boost = 'HIT' if r['pnl_mult'] > 1.0 else ' '
- sc = int(enriched.loc[enriched['开仓时间'] == t, '_score'].iloc[0]) \
- if (enriched['开仓时间'] == t).any() else -1
- ms = str(r.get('市场状态', ''))
- # K线指标
- if t in data_with_ind.index:
- bar = data_with_ind.loc[t]
- rsi_v = bar.get('RSI', float('nan'))
- k_v = bar.get('K', float('nan'))
- d_v = bar.get('D', float('nan'))
- j_v = bar.get('J', float('nan'))
- bb_u = bar.get('BB_upper', float('nan'))
- bb_l = bar.get('BB_lower', float('nan'))
- bb_p = (bar.get('Close', float('nan')) - bb_l) / (bb_u - bb_l + 1e-9)
- mom = bar.get('Momentum', float('nan')) * 100
- vr = bar.get('Volume_Ratio', float('nan'))
- else:
- rsi_v = k_v = d_v = j_v = bb_p = mom = vr = float('nan')
- print(f' {win} {str(t)[:16]} {ms:<12} {sc:>4} {boost:>4}'
- f' {rsi_v:>6.1f} {k_v:>6.1f} {d_v:>6.1f} {j_v:>6.1f}'
- f' {bb_p:>6.2f} {mom:>7.2f} {vr:>6.2f} {r["实际盈亏"]:>+10,.0f}')
- # ── 评分分布 ─────────────────────────────────────────────────
- print(f'\n{SEP}')
- print(' 评分分布(非死亡区全量)')
- print(SEP)
- nondead = enriched[not_dead]
- print(f'\n {"评分":>5} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"累计盈亏":>12} {"选入D/F?":>8}')
- print(f' {"-"*5} {"-"*5} {"-"*7} {"-"*10} {"-"*12} {"-"*8}')
- for sc in sorted(nondead['_score'].unique()):
- sub = nondead[nondead['_score'] == sc]
- wr = (sub['盈亏金额'] > 0).mean()
- avg = sub['盈亏金额'].mean()
- tot = sub['盈亏金额'].sum()
- flag = '✓ 入场' if sc >= 5 else ' 跳过'
- print(f' {sc:>5} {len(sub):>5} {wr:>7.1%} {avg:>+10,.0f} {tot:>+12,.0f} {flag}')
- # ── CSV 导出 ─────────────────────────────────────────────────
- if EXPORT_CSV:
- ts = datetime.now().strftime('%Y%m%d_%H%M%S')
- fee_tag = 'nofee' if not USE_FEES else 'fee'
- out_dir = os.path.dirname(__file__)
- # K线指标列(入场时刻)
- kline_cols = [c for c in ['RSI','K','D','J','MACD','MACD_hist',
- 'BB_width','ATR_Pct','Momentum','Volume_Ratio']
- if c in data_with_ind.columns]
- def attach_kline(df_in):
- """给 df_in 每行附加入场时刻的K线指标"""
- rows = []
- for t in df_in['开仓时间']:
- if t in data_with_ind.index:
- rows.append(data_with_ind.loc[t, kline_cols].to_dict())
- else:
- rows.append({c: float('nan') for c in kline_cols})
- return pd.DataFrame(rows, index=df_in.index)
- base_cols = [
- '交易方向', '开仓时间', '平仓时间', '开仓价格', '平仓价格', '仓位',
- '盈亏金额', '盈亏百分比', '退出原因', 'T1调整',
- '市场状态', '波动率水平', '波动率分位', 'RSI区域', 'RSI分位',
- '趋势强度', '布林带区域', '_score', 'vol_bin', 'rsi_bin', 'ts_bin',
- '成交量分位', '布林带位置', '1日动量',
- '_zone_net', '_zone', # Version G 区域字段
- ]
- def build_export(df, label):
- out = df.copy().reset_index(drop=True)
- out.insert(0, '序号', range(1, len(out) + 1))
- out['版本'] = label
- out['手续费模式'] = fee_label
- out['仓位乘数'] = out.get('pnl_mult', pd.Series([1.0]*len(out)))
- out['实际盈亏'] = out['盈亏金额'] * out['仓位乘数']
- out['盈利标记'] = out['实际盈亏'].apply(lambda x: '盈' if x > 0 else '亏')
- out['验证盈亏%'] = ((out['平仓价格'] - out['开仓价格'])
- / out['开仓价格'] * 100).round(4)
- # 附加K线指标
- kl = attach_kline(out)
- for c in kl.columns:
- out[c] = kl[c].values
- keep = (['序号', '版本', '手续费模式', '盈利标记', '仓位乘数',
- '实际盈亏', '资金余额'] +
- [c for c in base_cols if c in out.columns] +
- kline_cols +
- ['验证盈亏%'])
- keep = [c for c in keep if c in out.columns]
- return out[keep]
- print(f'\n📁 导出CSV...')
- exp_B = build_export(vB, 'B_排死亡区')
- exp_D = build_export(vD, 'D_评分≥5')
- exp_F = build_export(vF, 'F_评分≥5+加仓')
- exp_G = build_export(vG, 'G_区域过滤')
- for label, exp in [('B', exp_B), ('D', exp_D), ('F', exp_F), ('G', exp_G)]:
- fname = os.path.join(out_dir, f'backtest_v{label}_{fee_tag}_{ts}.csv')
- exp.to_csv(fname, index=False, encoding='utf-8-sig')
- print(f' Version {label}: {os.path.basename(fname)} ({len(exp)}笔)')
- combined = pd.concat([exp_B, exp_D, exp_F, exp_G], ignore_index=True)
- fname_all = os.path.join(out_dir, f'backtest_all_{fee_tag}_{ts}.csv')
- combined.to_csv(fname_all, index=False, encoding='utf-8-sig')
- print(f' 合并总表: {os.path.basename(fname_all)} ({len(combined)}笔)')
- print()
- print(SEP)
- print(' 回测完成')
- print(SEP)
- if __name__ == '__main__':
- main()
|