#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 舒适区规则优化 步骤: 1. 只在非死亡区数据上操作 2. 2023-2024 → 训练集:穷举组合,筛选有效规则 3. 2025 → 验证集:调优组合入选门槛 4. 2026 → 盲测集:最终验证(不允许回看调整) 5. 输出新规则,运行完整回测对比 """ import sys, io if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import os import pandas as pd import numpy as np from itertools import combinations import warnings warnings.filterwarnings('ignore') SEP = '=' * 70 INITIAL = 1_000_000 DEATH_ZONES = {'下跌趋势低波', '震荡低波'} # ── 加载数据 ──────────────────────────────────────────────────── def load_trades(): csv = os.path.join(os.path.dirname(__file__), 't1_trades_with_environment_20260327_141655.csv') df = pd.read_csv(csv, encoding='utf-8-sig') cols = [ '交易方向','开仓时间','平仓时间','开仓价格','平仓价格','仓位', '盈亏金额','盈亏百分比','退出原因','持仓周期数','持仓小时数', 'T1调整','原平仓时间','原平仓价格','原盈亏','盈亏变化', '入场信号','开仓市值','平仓时资金','市场状态', '趋势短期','趋势中期','趋势强度','波动率分位','波动率水平', '成交量分位','布林带位置','布林带区域','RSI分位','RSI区域', '1日动量','入场价格' ] df.columns = cols df['开仓时间'] = pd.to_datetime(df['开仓时间']) for c in ['盈亏金额','盈亏百分比','波动率分位','RSI分位','趋势强度']: df[c] = pd.to_numeric(df[c], errors='coerce') df['盈利'] = df['盈亏金额'] > 0 df['年份'] = df['开仓时间'].dt.year return df.sort_values('开仓时间').reset_index(drop=True) # ── 权益曲线模拟 ───────────────────────────────────────────────── def simulate_equity(df, initial=INITIAL): df = df.copy().reset_index(drop=True) cap = float(initial) caps = [] for _, r in df.iterrows(): cap += float(r['盈亏金额']) caps.append(cap) df['资金余额'] = caps df['盈利'] = df['盈亏金额'] > 0 return df # ── 指标分箱(将连续指标离散化用于组合搜索)──────────────────────── def add_bins(df): df = df.copy() # 波动率分位数分箱 def vol_bin(v): if pd.isna(v): return 'unknown' if v < 0.20: return 'vol极低' if v < 0.40: return 'vol低' if v < 0.60: return 'vol中' if v < 0.80: return 'vol高' return 'vol极高' # RSI分位数分箱 def rsi_bin(v): if pd.isna(v): return 'unknown' if v < 0.05: return 'rsi极底' if v < 0.10: return 'rsi底' if v < 0.20: return 'rsi低' if v < 0.40: return 'rsi偏低' if v < 0.60: return 'rsi中' if v < 0.80: return 'rsi偏高' return 'rsi高' # 趋势强度分箱 def ts_bin(v): if pd.isna(v): return 'unknown' if v < 1.0: return 'ts弱' if v < 1.5: return 'ts中弱' if v < 2.5: return 'ts中' if v < 4.0: return 'ts强' return 'ts极强' df['vol_bin'] = df['波动率分位'].apply(vol_bin) df['rsi_bin'] = df['RSI分位'].apply(rsi_bin) df['ts_bin'] = df['趋势强度'].apply(ts_bin) df['t1_flag'] = df['T1调整'].apply(lambda x: 'T1是' if 'T0' in str(x) else 'T1否') return df # ── 组合穷举(在指定数据集上) ────────────────────────────────────── CAT_COLS = ['市场状态', '波动率水平', 'RSI区域', 'vol_bin', 'rsi_bin', 'ts_bin'] def scan_combos(df, min_trades=6, min_wr=0.55, max_combos=2): """穷举双指标组合,返回满足条件的规则列表""" results = [] for n in range(1, max_combos + 1): for cols in combinations(CAT_COLS, n): groups = df.groupby(list(cols)) for key, sub in groups: if len(sub) < min_trades: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() if wr >= min_wr: cond = dict(zip(cols, key if n > 1 else [key])) results.append({ 'n_cols': n, 'conditions': cond, 'cond_str': ' & '.join(f'{k}={v}' for k, v in cond.items()), 'trades': len(sub), 'win_rate': wr, 'avg_pnl': avg, 'total_pnl': sub['盈亏金额'].sum(), }) return pd.DataFrame(results) if results else pd.DataFrame() def apply_combo_rule(row, combo_rules): """判断一笔交易是否命中任意一条组合规则""" for rule in combo_rules: match = all(str(row.get(col, '')) == str(val) for col, val in rule['conditions'].items()) if match: return True return False # ── 绩效统计 ───────────────────────────────────────────────────── def stats(df, initial=INITIAL): if len(df) == 0: return None wr = df['盈利'].mean() pnl = df['盈亏金额'].sum() cap = df['资金余额'].iloc[-1] ret = (cap - initial) / initial win = df[df['盈利']]['盈亏金额'] los = df[~df['盈利']]['盈亏金额'] plr = abs(win.mean() / los.mean()) if len(los) > 0 and los.mean() != 0 else float('inf') eq = df['资金余额'].values pk = np.maximum.accumulate(np.append([initial], eq)) dd = ((eq - pk[1:]) / pk[1:]).min() if len(eq) > 0 else 0 return dict(n=len(df), wr=wr, pnl=pnl, cap=cap, ret=ret, plr=plr, dd=dd) def print_yearly(df, initial=INITIAL): df = df.copy() df['年份'] = pd.to_datetime(df['开仓时间']).dt.year prev = initial for y in sorted(df['年份'].unique()): sy = df[df['年份'] == y] pnl = sy['盈亏金额'].sum() wr = sy['盈利'].mean() end = sy['资金余额'].iloc[-1] print(f" {y}年: {len(sy):>3}笔 胜率{wr:.1%} | {pnl:>+12,.0f}元 ({pnl/prev:>+.2%}) → {end:,.0f}元") prev = end # ═══════════════════════════════════════════════════════════════ # 主流程 # ═══════════════════════════════════════════════════════════════ def main(): print(SEP) print(' 舒适区规则优化 — 时间分层法') print(SEP) raw = load_trades() raw = add_bins(raw) # ── Step 1:数据分层 ───────────────────────────────────── print(f'\n{SEP}') print(' Step 1: 数据分层') print(SEP) all_nondead = raw[~raw['市场状态'].isin(DEATH_ZONES)].copy() train = all_nondead[all_nondead['年份'].isin([2023, 2024])].copy() valid = all_nondead[all_nondead['年份'] == 2025].copy() test = all_nondead[all_nondead['年份'] == 2026].copy() print(f'\n 全量非死亡区: {len(all_nondead)}笔 | 胜率{all_nondead["盈利"].mean():.1%}') print(f' 训练集(23-24): {len(train)}笔 | 胜率{train["盈利"].mean():.1%}') print(f' 验证集(2025): {len(valid)}笔 | 胜率{valid["盈利"].mean():.1%}') print(f' 盲测集(2026): {len(test)}笔 | 胜率{test["盈利"].mean():.1%}') # ── Step 2:从训练集穷举组合规则 ───────────────────────── print(f'\n{SEP}') print(' Step 2: 训练集(2023-2024) 组合规则穷举') print(f' 筛选条件: 笔数≥6, 胜率≥55%') print(SEP) combos_df = scan_combos(train, min_trades=6, min_wr=0.55, max_combos=2) if combos_df.empty: print(' 未找到满足条件的组合') return combos_df = combos_df.sort_values('win_rate', ascending=False).reset_index(drop=True) print(f'\n 共找到 {len(combos_df)} 条候选规则:') print(f' {"条件":<45} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}') print(f' {"-"*45} {"-"*5} {"-"*7} {"-"*10}') for _, r in combos_df.iterrows(): print(f' {r["cond_str"]:<45} {r["trades"]:>5} {r["win_rate"]:>7.1%} {r["avg_pnl"]:>+10,.0f}') # ── Step 3:在验证集上测试每条规则 ─────────────────────── print(f'\n{SEP}') print(' Step 3: 验证集(2025) 规则命中率与胜率') print(SEP) rule_valid_stats = [] for _, rule_row in combos_df.iterrows(): rule = [{'conditions': rule_row['conditions'], 'cond_str': rule_row['cond_str']}] mask = valid.apply(lambda r: apply_combo_rule(r, rule), axis=1) sub = valid[mask] if len(sub) < 3: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() rule_valid_stats.append({ 'cond_str': rule_row['cond_str'], 'train_wr': rule_row['win_rate'], 'train_n': rule_row['trades'], 'valid_n': len(sub), 'valid_wr': wr, 'valid_avg': avg, 'stable': abs(wr - rule_row['win_rate']) < 0.20, # 验证集与训练集胜率偏差<20% }) valid_rules_df = pd.DataFrame(rule_valid_stats) if len(valid_rules_df) == 0: print(' 无规则在验证集有足够样本') return valid_rules_df = valid_rules_df.sort_values('valid_wr', ascending=False) print(f'\n {"条件":<45} {"训练":>8} {"验证":>8} {"稳定?":>6}') print(f' {"-"*45} {"-"*8} {"-"*8} {"-"*6}') for _, r in valid_rules_df.iterrows(): stab = '✓' if r['stable'] else '✗偏移' print(f' {r["cond_str"]:<45} ' f'{r["train_n"]}笔{r["train_wr"]:.0%} ' f'{r["valid_n"]}笔{r["valid_wr"]:.0%} ' f'{stab:>6}') # 只保留验证集胜率 >= 50% 且样本≥3的稳定规则 stable_rules = valid_rules_df[ (valid_rules_df['valid_wr'] >= 0.50) & (valid_rules_df['valid_n'] >= 3) ].copy() print(f'\n 通过验证的规则: {len(stable_rules)}条') passed_rule_list = [] for _, r in stable_rules.iterrows(): # 找回 conditions dict orig = combos_df[combos_df['cond_str'] == r['cond_str']].iloc[0] passed_rule_list.append({ 'conditions': orig['conditions'], 'cond_str': r['cond_str'], 'train_wr': r['train_wr'], 'valid_wr': r['valid_wr'], }) print(f' ✓ {r["cond_str"]} | 训练{r["train_wr"]:.0%} → 验证{r["valid_wr"]:.0%}') if not passed_rule_list: print('\n 无规则通过验证,将仅使用死亡区过滤(Version B)') passed_rule_list = [] # ── Step 4:盲测集(2026) 验证 ───────────────────────────── print(f'\n{SEP}') print(' Step 4: 盲测集(2026) — 规则锁定后不允许回看调整') print(SEP) if passed_rule_list: test_mask = test.apply(lambda r: apply_combo_rule(r, passed_rule_list), axis=1) test_pass = test[test_mask] test_skip = test[~test_mask] print(f'\n 规则命中: {len(test_pass)}笔, 胜率{test_pass["盈利"].mean():.1%}, ' f'均盈亏{test_pass["盈亏金额"].mean():+,.0f}元') print(f' 规则未命中: {len(test_skip)}笔, 胜率{test_skip["盈利"].mean():.1%}, ' f'均盈亏{test_skip["盈亏金额"].mean():+,.0f}元') print() for _, r in test.iterrows(): hit = '★命中' if apply_combo_rule(r, passed_rule_list) else ' 跳过' win = 'WIN ' if r['盈利'] else 'LOSS' print(f' {hit} {win} {str(r["开仓时间"])[:10]} | ' f'{r["市场状态"]:<12} {r["波动率水平"]:<5} {r["RSI区域"]:<10} | ' f'{r["盈亏金额"]:>+9,.0f}元') else: test_pass = test # ── Step 5:完整回测对比 ────────────────────────────────── print(f'\n{SEP}') print(' Step 5: 完整回测对比') print(SEP) # Version B: 仅排死亡区 vB_df = raw[~raw['市场状态'].isin(DEATH_ZONES)].copy() vB = simulate_equity(vB_df) # Version E: 排死亡区 + 新组合规则(时间分层推导) if passed_rule_list: mask_E = raw.apply( lambda r: (r['市场状态'] not in DEATH_ZONES) and apply_combo_rule(r, passed_rule_list), axis=1 ) vE_df = raw[mask_E].copy() else: vE_df = vB_df.copy() print(' (无有效组合规则,Version E = Version B)') vE = simulate_equity(vE_df) sB = stats(vB) sE = stats(vE) print(f'\n {"指标":<14} {"Version B(排死亡区)":>20} {"Version E(新组合规则)":>22}') print(f' {"-"*14} {"-"*20} {"-"*22}') rows = [ ('交易笔数', 'n', 'd'), ('胜率', 'wr', 'pct'), ('盈亏比', 'plr', 'f2'), ('总收益率', 'ret', 'pct2'), ('最终资金', 'cap', 'cap'), ('总盈亏', 'pnl', 'money'), ('最大回撤', 'dd', 'pct'), ] def fv(s, k, fmt): if s is None: return 'N/A' v = s[k] if fmt == 'd': return str(int(v)) if fmt == 'pct': return f'{v:.1%}' if fmt == 'pct2': return f'{v:+.2%}' if fmt == 'f2': return f'{v:.2f}' if fmt == 'money':return f'{v:+,.0f}' if fmt == 'cap': return f'{v:,.0f}' return str(v) for name, k, fmt in rows: print(f' {name:<14} {fv(sB,k,fmt):>20} {fv(sE,k,fmt):>22}') print(f'\n Version B 年度明细:') print_yearly(vB) print(f'\n Version E 年度明细:') print_yearly(vE) # ── Step 6:最终规则输出 ────────────────────────────────── print(f'\n{SEP}') print(' Step 6: 最终规则(可直接用于生产)') print(SEP) print(f'\n [必要条件] 市场状态 NOT IN {sorted(DEATH_ZONES)}') if passed_rule_list: print(f' [充分条件] 满足以下任意一条组合规则:') for i, rule in enumerate(passed_rule_list, 1): print(f' 规则{i}: {rule["cond_str"]}') print(f' 训练胜率{rule["train_wr"]:.0%} → 验证胜率{rule["valid_wr"]:.0%}') else: print(f' [充分条件] 无(仅死亡区过滤即为最优)') print() print(SEP) print(' 优化完成') print(SEP) if __name__ == '__main__': main()