| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 舒适区规则优化
- 步骤:
- 1. 只在非死亡区数据上操作
- 2. 2023-2024 → 训练集:穷举组合,筛选有效规则
- 3. 2025 → 验证集:调优组合入选门槛
- 4. 2026 → 盲测集:最终验证(不允许回看调整)
- 5. 输出新规则,运行完整回测对比
- """
- import sys, io
- if sys.platform == 'win32':
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
- import os
- import pandas as pd
- import numpy as np
- from itertools import combinations
- import warnings
- warnings.filterwarnings('ignore')
- SEP = '=' * 70
- INITIAL = 1_000_000
- DEATH_ZONES = {'下跌趋势低波', '震荡低波'}
- # ── 加载数据 ────────────────────────────────────────────────────
- def load_trades():
- csv = os.path.join(os.path.dirname(__file__),
- 't1_trades_with_environment_20260327_141655.csv')
- df = pd.read_csv(csv, encoding='utf-8-sig')
- cols = [
- '交易方向','开仓时间','平仓时间','开仓价格','平仓价格','仓位',
- '盈亏金额','盈亏百分比','退出原因','持仓周期数','持仓小时数',
- 'T1调整','原平仓时间','原平仓价格','原盈亏','盈亏变化',
- '入场信号','开仓市值','平仓时资金','市场状态',
- '趋势短期','趋势中期','趋势强度','波动率分位','波动率水平',
- '成交量分位','布林带位置','布林带区域','RSI分位','RSI区域',
- '1日动量','入场价格'
- ]
- df.columns = cols
- df['开仓时间'] = pd.to_datetime(df['开仓时间'])
- for c in ['盈亏金额','盈亏百分比','波动率分位','RSI分位','趋势强度']:
- df[c] = pd.to_numeric(df[c], errors='coerce')
- df['盈利'] = df['盈亏金额'] > 0
- df['年份'] = df['开仓时间'].dt.year
- return df.sort_values('开仓时间').reset_index(drop=True)
- # ── 权益曲线模拟 ─────────────────────────────────────────────────
- def simulate_equity(df, initial=INITIAL):
- df = df.copy().reset_index(drop=True)
- cap = float(initial)
- caps = []
- for _, r in df.iterrows():
- cap += float(r['盈亏金额'])
- caps.append(cap)
- df['资金余额'] = caps
- df['盈利'] = df['盈亏金额'] > 0
- return df
- # ── 指标分箱(将连续指标离散化用于组合搜索)────────────────────────
- def add_bins(df):
- df = df.copy()
- # 波动率分位数分箱
- def vol_bin(v):
- if pd.isna(v): return 'unknown'
- if v < 0.20: return 'vol极低'
- if v < 0.40: return 'vol低'
- if v < 0.60: return 'vol中'
- if v < 0.80: return 'vol高'
- return 'vol极高'
- # RSI分位数分箱
- def rsi_bin(v):
- if pd.isna(v): return 'unknown'
- if v < 0.05: return 'rsi极底'
- if v < 0.10: return 'rsi底'
- if v < 0.20: return 'rsi低'
- if v < 0.40: return 'rsi偏低'
- if v < 0.60: return 'rsi中'
- if v < 0.80: return 'rsi偏高'
- return 'rsi高'
- # 趋势强度分箱
- def ts_bin(v):
- if pd.isna(v): return 'unknown'
- if v < 1.0: return 'ts弱'
- if v < 1.5: return 'ts中弱'
- if v < 2.5: return 'ts中'
- if v < 4.0: return 'ts强'
- return 'ts极强'
- df['vol_bin'] = df['波动率分位'].apply(vol_bin)
- df['rsi_bin'] = df['RSI分位'].apply(rsi_bin)
- df['ts_bin'] = df['趋势强度'].apply(ts_bin)
- df['t1_flag'] = df['T1调整'].apply(lambda x: 'T1是' if 'T0' in str(x) else 'T1否')
- return df
- # ── 组合穷举(在指定数据集上) ──────────────────────────────────────
- CAT_COLS = ['市场状态', '波动率水平', 'RSI区域', 'vol_bin', 'rsi_bin', 'ts_bin']
- def scan_combos(df, min_trades=6, min_wr=0.55, max_combos=2):
- """穷举双指标组合,返回满足条件的规则列表"""
- results = []
- for n in range(1, max_combos + 1):
- for cols in combinations(CAT_COLS, n):
- groups = df.groupby(list(cols))
- for key, sub in groups:
- if len(sub) < min_trades:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- if wr >= min_wr:
- cond = dict(zip(cols, key if n > 1 else [key]))
- results.append({
- 'n_cols': n,
- 'conditions': cond,
- 'cond_str': ' & '.join(f'{k}={v}' for k, v in cond.items()),
- 'trades': len(sub),
- 'win_rate': wr,
- 'avg_pnl': avg,
- 'total_pnl': sub['盈亏金额'].sum(),
- })
- return pd.DataFrame(results) if results else pd.DataFrame()
- def apply_combo_rule(row, combo_rules):
- """判断一笔交易是否命中任意一条组合规则"""
- for rule in combo_rules:
- match = all(str(row.get(col, '')) == str(val)
- for col, val in rule['conditions'].items())
- if match:
- return True
- return False
- # ── 绩效统计 ─────────────────────────────────────────────────────
- def stats(df, initial=INITIAL):
- if len(df) == 0:
- return None
- wr = df['盈利'].mean()
- pnl = df['盈亏金额'].sum()
- cap = df['资金余额'].iloc[-1]
- ret = (cap - initial) / initial
- win = df[df['盈利']]['盈亏金额']
- los = df[~df['盈利']]['盈亏金额']
- plr = abs(win.mean() / los.mean()) if len(los) > 0 and los.mean() != 0 else float('inf')
- eq = df['资金余额'].values
- pk = np.maximum.accumulate(np.append([initial], eq))
- dd = ((eq - pk[1:]) / pk[1:]).min() if len(eq) > 0 else 0
- return dict(n=len(df), wr=wr, pnl=pnl, cap=cap, ret=ret, plr=plr, dd=dd)
- def print_yearly(df, initial=INITIAL):
- df = df.copy()
- df['年份'] = pd.to_datetime(df['开仓时间']).dt.year
- prev = initial
- for y in sorted(df['年份'].unique()):
- sy = df[df['年份'] == y]
- pnl = sy['盈亏金额'].sum()
- wr = sy['盈利'].mean()
- end = sy['资金余额'].iloc[-1]
- print(f" {y}年: {len(sy):>3}笔 胜率{wr:.1%} | {pnl:>+12,.0f}元 ({pnl/prev:>+.2%}) → {end:,.0f}元")
- prev = end
- # ═══════════════════════════════════════════════════════════════
- # 主流程
- # ═══════════════════════════════════════════════════════════════
- def main():
- print(SEP)
- print(' 舒适区规则优化 — 时间分层法')
- print(SEP)
- raw = load_trades()
- raw = add_bins(raw)
- # ── Step 1:数据分层 ─────────────────────────────────────
- print(f'\n{SEP}')
- print(' Step 1: 数据分层')
- print(SEP)
- all_nondead = raw[~raw['市场状态'].isin(DEATH_ZONES)].copy()
- train = all_nondead[all_nondead['年份'].isin([2023, 2024])].copy()
- valid = all_nondead[all_nondead['年份'] == 2025].copy()
- test = all_nondead[all_nondead['年份'] == 2026].copy()
- print(f'\n 全量非死亡区: {len(all_nondead)}笔 | 胜率{all_nondead["盈利"].mean():.1%}')
- print(f' 训练集(23-24): {len(train)}笔 | 胜率{train["盈利"].mean():.1%}')
- print(f' 验证集(2025): {len(valid)}笔 | 胜率{valid["盈利"].mean():.1%}')
- print(f' 盲测集(2026): {len(test)}笔 | 胜率{test["盈利"].mean():.1%}')
- # ── Step 2:从训练集穷举组合规则 ─────────────────────────
- print(f'\n{SEP}')
- print(' Step 2: 训练集(2023-2024) 组合规则穷举')
- print(f' 筛选条件: 笔数≥6, 胜率≥55%')
- print(SEP)
- combos_df = scan_combos(train, min_trades=6, min_wr=0.55, max_combos=2)
- if combos_df.empty:
- print(' 未找到满足条件的组合')
- return
- combos_df = combos_df.sort_values('win_rate', ascending=False).reset_index(drop=True)
- print(f'\n 共找到 {len(combos_df)} 条候选规则:')
- print(f' {"条件":<45} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}')
- print(f' {"-"*45} {"-"*5} {"-"*7} {"-"*10}')
- for _, r in combos_df.iterrows():
- print(f' {r["cond_str"]:<45} {r["trades"]:>5} {r["win_rate"]:>7.1%} {r["avg_pnl"]:>+10,.0f}')
- # ── Step 3:在验证集上测试每条规则 ───────────────────────
- print(f'\n{SEP}')
- print(' Step 3: 验证集(2025) 规则命中率与胜率')
- print(SEP)
- rule_valid_stats = []
- for _, rule_row in combos_df.iterrows():
- rule = [{'conditions': rule_row['conditions'], 'cond_str': rule_row['cond_str']}]
- mask = valid.apply(lambda r: apply_combo_rule(r, rule), axis=1)
- sub = valid[mask]
- if len(sub) < 3:
- continue
- wr = sub['盈利'].mean()
- avg = sub['盈亏金额'].mean()
- rule_valid_stats.append({
- 'cond_str': rule_row['cond_str'],
- 'train_wr': rule_row['win_rate'],
- 'train_n': rule_row['trades'],
- 'valid_n': len(sub),
- 'valid_wr': wr,
- 'valid_avg': avg,
- 'stable': abs(wr - rule_row['win_rate']) < 0.20, # 验证集与训练集胜率偏差<20%
- })
- valid_rules_df = pd.DataFrame(rule_valid_stats)
- if len(valid_rules_df) == 0:
- print(' 无规则在验证集有足够样本')
- return
- valid_rules_df = valid_rules_df.sort_values('valid_wr', ascending=False)
- print(f'\n {"条件":<45} {"训练":>8} {"验证":>8} {"稳定?":>6}')
- print(f' {"-"*45} {"-"*8} {"-"*8} {"-"*6}')
- for _, r in valid_rules_df.iterrows():
- stab = '✓' if r['stable'] else '✗偏移'
- print(f' {r["cond_str"]:<45} '
- f'{r["train_n"]}笔{r["train_wr"]:.0%} '
- f'{r["valid_n"]}笔{r["valid_wr"]:.0%} '
- f'{stab:>6}')
- # 只保留验证集胜率 >= 50% 且样本≥3的稳定规则
- stable_rules = valid_rules_df[
- (valid_rules_df['valid_wr'] >= 0.50) &
- (valid_rules_df['valid_n'] >= 3)
- ].copy()
- print(f'\n 通过验证的规则: {len(stable_rules)}条')
- passed_rule_list = []
- for _, r in stable_rules.iterrows():
- # 找回 conditions dict
- orig = combos_df[combos_df['cond_str'] == r['cond_str']].iloc[0]
- passed_rule_list.append({
- 'conditions': orig['conditions'],
- 'cond_str': r['cond_str'],
- 'train_wr': r['train_wr'],
- 'valid_wr': r['valid_wr'],
- })
- print(f' ✓ {r["cond_str"]} | 训练{r["train_wr"]:.0%} → 验证{r["valid_wr"]:.0%}')
- if not passed_rule_list:
- print('\n 无规则通过验证,将仅使用死亡区过滤(Version B)')
- passed_rule_list = []
- # ── Step 4:盲测集(2026) 验证 ─────────────────────────────
- print(f'\n{SEP}')
- print(' Step 4: 盲测集(2026) — 规则锁定后不允许回看调整')
- print(SEP)
- if passed_rule_list:
- test_mask = test.apply(lambda r: apply_combo_rule(r, passed_rule_list), axis=1)
- test_pass = test[test_mask]
- test_skip = test[~test_mask]
- print(f'\n 规则命中: {len(test_pass)}笔, 胜率{test_pass["盈利"].mean():.1%}, '
- f'均盈亏{test_pass["盈亏金额"].mean():+,.0f}元')
- print(f' 规则未命中: {len(test_skip)}笔, 胜率{test_skip["盈利"].mean():.1%}, '
- f'均盈亏{test_skip["盈亏金额"].mean():+,.0f}元')
- print()
- for _, r in test.iterrows():
- hit = '★命中' if apply_combo_rule(r, passed_rule_list) else ' 跳过'
- win = 'WIN ' if r['盈利'] else 'LOSS'
- print(f' {hit} {win} {str(r["开仓时间"])[:10]} | '
- f'{r["市场状态"]:<12} {r["波动率水平"]:<5} {r["RSI区域"]:<10} | '
- f'{r["盈亏金额"]:>+9,.0f}元')
- else:
- test_pass = test
- # ── Step 5:完整回测对比 ──────────────────────────────────
- print(f'\n{SEP}')
- print(' Step 5: 完整回测对比')
- print(SEP)
- # Version B: 仅排死亡区
- vB_df = raw[~raw['市场状态'].isin(DEATH_ZONES)].copy()
- vB = simulate_equity(vB_df)
- # Version E: 排死亡区 + 新组合规则(时间分层推导)
- if passed_rule_list:
- mask_E = raw.apply(
- lambda r: (r['市场状态'] not in DEATH_ZONES) and apply_combo_rule(r, passed_rule_list),
- axis=1
- )
- vE_df = raw[mask_E].copy()
- else:
- vE_df = vB_df.copy()
- print(' (无有效组合规则,Version E = Version B)')
- vE = simulate_equity(vE_df)
- sB = stats(vB)
- sE = stats(vE)
- print(f'\n {"指标":<14} {"Version B(排死亡区)":>20} {"Version E(新组合规则)":>22}')
- print(f' {"-"*14} {"-"*20} {"-"*22}')
- rows = [
- ('交易笔数', 'n', 'd'),
- ('胜率', 'wr', 'pct'),
- ('盈亏比', 'plr', 'f2'),
- ('总收益率', 'ret', 'pct2'),
- ('最终资金', 'cap', 'cap'),
- ('总盈亏', 'pnl', 'money'),
- ('最大回撤', 'dd', 'pct'),
- ]
- def fv(s, k, fmt):
- if s is None: return 'N/A'
- v = s[k]
- if fmt == 'd': return str(int(v))
- if fmt == 'pct': return f'{v:.1%}'
- if fmt == 'pct2': return f'{v:+.2%}'
- if fmt == 'f2': return f'{v:.2f}'
- if fmt == 'money':return f'{v:+,.0f}'
- if fmt == 'cap': return f'{v:,.0f}'
- return str(v)
- for name, k, fmt in rows:
- print(f' {name:<14} {fv(sB,k,fmt):>20} {fv(sE,k,fmt):>22}')
- print(f'\n Version B 年度明细:')
- print_yearly(vB)
- print(f'\n Version E 年度明细:')
- print_yearly(vE)
- # ── Step 6:最终规则输出 ──────────────────────────────────
- print(f'\n{SEP}')
- print(' Step 6: 最终规则(可直接用于生产)')
- print(SEP)
- print(f'\n [必要条件] 市场状态 NOT IN {sorted(DEATH_ZONES)}')
- if passed_rule_list:
- print(f' [充分条件] 满足以下任意一条组合规则:')
- for i, rule in enumerate(passed_rule_list, 1):
- print(f' 规则{i}: {rule["cond_str"]}')
- print(f' 训练胜率{rule["train_wr"]:.0%} → 验证胜率{rule["valid_wr"]:.0%}')
- else:
- print(f' [充分条件] 无(仅死亡区过滤即为最优)')
- print()
- print(SEP)
- print(' 优化完成')
- print(SEP)
- if __name__ == '__main__':
- main()
|