#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 T+1 策略舒适区深度研究 客观量化哪些指标组合预测策略进入高胜率模式 """ import sys import io if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import pandas as pd import numpy as np from scipy import stats import warnings warnings.filterwarnings('ignore') OUTPUT_FILE = 'D:/work/project/cyb50-quant/cat-fly/t1/comfort_zone_research_result.txt' def log(msg=''): print(msg) def load_trades(): df = pd.read_csv( 'D:/work/project/cyb50-quant/cat-fly/t1/t1_trades_with_environment_20260327_141655.csv', encoding='utf-8-sig' ) # 重命名列(按位置) cols = [ '交易方向','开仓时间','平仓时间','开仓价格','平仓价格','仓位', '盈亏金额','盈亏百分比','退出原因','持仓周期数','持仓小时数', 'T1调整','原平仓时间','原平仓价格','原盈亏','盈亏变化', '入场信号','开仓市值','平仓时资金','市场状态', '趋势短期','趋势中期','趋势强度','波动率分位','波动率水平', '成交量分位','布林带位置','布林带区域','RSI分位','RSI区域', '1日动量','入场价格' ] df.columns = cols df['开仓时间'] = pd.to_datetime(df['开仓时间']) df['年份'] = df['开仓时间'].dt.year df['月份'] = df['开仓时间'].dt.month df['年月'] = df['开仓时间'].dt.to_period('M') df['盈利'] = df['盈亏金额'] > 0 df['盈亏百分比'] = pd.to_numeric(df['盈亏百分比'], errors='coerce') df['波动率分位'] = pd.to_numeric(df['波动率分位'], errors='coerce') df['RSI分位'] = pd.to_numeric(df['RSI分位'], errors='coerce') df['布林带位置'] = pd.to_numeric(df['布林带位置'], errors='coerce') df['成交量分位'] = pd.to_numeric(df['成交量分位'], errors='coerce') df['趋势强度'] = pd.to_numeric(df['趋势强度'], errors='coerce') df['1日动量'] = pd.to_numeric(df['1日动量'], errors='coerce') return df def section(title, f): line = '=' * 72 log(line) log(f' {title}') log(line) def yearly_summary(df, f): section('第一部分:年度绩效对比', f) log() for year in sorted(df['年份'].unique()): sub = df[df['年份'] == year] wr = sub['盈利'].mean() total_pnl = sub['盈亏金额'].sum() avg_win = sub[sub['盈利']]['盈亏金额'].mean() if sub['盈利'].any() else 0 avg_loss = sub[~sub['盈利']]['盈亏金额'].mean() if (~sub['盈利']).any() else 0 plr = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf') log(f' {year}年: {len(sub)}笔 | 胜率{wr:.1%} | 盈亏比{plr:.2f} | 总盈亏{total_pnl:+,.0f}元') log() # 好年份 vs 差年份 good = df[df['年份'].isin([2025, 2026])] bad = df[df['年份'].isin([2023, 2024])] log(f' 好年份(2025-2026): {len(good)}笔, 胜率{good["盈利"].mean():.1%}, 平均盈亏{good["盈亏金额"].mean():+,.0f}元') log(f' 差年份(2023-2024): {len(bad)}笔, 胜率{bad["盈利"].mean():.1%}, 平均盈亏{bad["盈亏金额"].mean():+,.0f}元') log() def analyze_categorical(df, col, label, f, min_count=5): section(f'第二部分:类别指标分析 — {label}', f) log() results = [] for val in df[col].dropna().unique(): sub = df[df[col] == val] if len(sub) < min_count: continue wr = sub['盈利'].mean() avg_pnl = sub['盈亏金额'].mean() total_pnl = sub['盈亏金额'].sum() n = len(sub) results.append((val, n, wr, avg_pnl, total_pnl)) results.sort(key=lambda x: x[2], reverse=True) log(f' {"类别":<20} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"总盈亏":>12}') log(f' {"-"*20} {"-"*5} {"-"*7} {"-"*10} {"-"*12}') for val, n, wr, avg, total in results: log(f' {str(val):<20} {n:>5} {wr:>7.1%} {avg:>+10,.0f} {total:>+12,.0f}') log() return results def analyze_continuous_bins(df, col, label, f, bins=5): section(f'第三部分:连续指标分位分析 — {label}', f) log() valid = df[df[col].notna()].copy() if len(valid) < 20: log(f' 数据不足,跳过') return valid['_bin'] = pd.qcut(valid[col], q=bins, duplicates='drop') results = [] for bin_val in valid['_bin'].cat.categories: sub = valid[valid['_bin'] == bin_val] wr = sub['盈利'].mean() avg_pnl = sub['盈亏金额'].mean() n = len(sub) lo = bin_val.left hi = bin_val.right results.append((lo, hi, n, wr, avg_pnl)) log(f' {"区间":<25} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}') log(f' {"-"*25} {"-"*5} {"-"*7} {"-"*10}') for lo, hi, n, wr, avg in results: log(f' [{lo:>8.3f}, {hi:>8.3f}] {n:>5} {wr:>7.1%} {avg:>+10,.0f}') log() # 相关性 corr, pval = stats.pointbiserialr(valid[col], valid['盈利'].astype(int)) log(f' 与胜率相关系数: r={corr:.3f}, p={pval:.3f}{" ★显著" if pval < 0.05 else ""}') log() return results def good_vs_bad_distributions(df, f): section('第四部分:好/差年份 各指标分布对比', f) log() good = df[df['年份'].isin([2025, 2026])] bad = df[df['年份'].isin([2023, 2024])] # 连续指标 num_cols = [ ('波动率分位', '波动率分位数'), ('RSI分位', 'RSI分位数'), ('布林带位置', '布林带位置'), ('趋势强度', '趋势强度'), ('1日动量', '1日动量'), ('成交量分位', '成交量分位'), ] log(f' {"指标":<15} {"差年份均值":>12} {"好年份均值":>12} {"差值":>10} {"t检验p值":>10} {"显著?":>6}') log(f' {"-"*15} {"-"*12} {"-"*12} {"-"*10} {"-"*10} {"-"*6}') for col, label in num_cols: g_vals = good[col].dropna() b_vals = bad[col].dropna() if len(g_vals) < 5 or len(b_vals) < 5: continue t, p = stats.ttest_ind(g_vals, b_vals) diff = g_vals.mean() - b_vals.mean() sig = '★' if p < 0.05 else '' log(f' {label:<15} {b_vals.mean():>12.3f} {g_vals.mean():>12.3f} {diff:>+10.3f} {p:>10.3f} {sig:>6}') log() # 类别指标 cat_cols = [ ('市场状态', '市场状态'), ('趋势短期', '趋势短期'), ('趋势中期', '趋势中期'), ('波动率水平', '波动率水平'), ('布林带区域', '布林带区域'), ('RSI区域', 'RSI区域'), ] for col, label in cat_cols: log(f' [{label}] 好年份分布 vs 差年份分布:') all_vals = df[col].dropna().unique() log(f' {"类别":<18} {"差年份占比":>10} {"好年份占比":>10} {"差值":>8}') for val in sorted(all_vals): b_pct = (bad[col] == val).mean() g_pct = (good[col] == val).mean() diff = g_pct - b_pct marker = ' ←' if abs(diff) > 0.05 else '' log(f' {str(val):<18} {b_pct:>10.1%} {g_pct:>10.1%} {diff:>+8.1%}{marker}') log() def winning_condition_scan(df, f): section('第五部分:胜率 > 60% 的单指标条件扫描', f) log() log(' 过滤条件:笔数≥8, 胜率≥60%') log() cat_cols = ['市场状态', '趋势短期', '趋势中期', '波动率水平', '布林带区域', 'RSI区域', 'T1调整'] findings = [] for col in cat_cols: for val in df[col].dropna().unique(): sub = df[df[col] == val] if len(sub) < 8: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() if wr >= 0.60: findings.append((f'{col}={val}', len(sub), wr, avg)) findings.sort(key=lambda x: x[2], reverse=True) log(f' {"条件":<30} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}') log(f' {"-"*30} {"-"*5} {"-"*7} {"-"*10}') for cond, n, wr, avg in findings: log(f' {cond:<30} {n:>5} {wr:>7.1%} {avg:>+10,.0f}') log() def combo_scan(df, f): section('第六部分:双指标组合扫描 (笔数≥8, 胜率≥60%)', f) log() cat_cols = ['市场状态', '趋势中期', '波动率水平', '布林带区域', 'RSI区域'] combos = [] for i, c1 in enumerate(cat_cols): for c2 in cat_cols[i+1:]: for v1 in df[c1].dropna().unique(): for v2 in df[c2].dropna().unique(): sub = df[(df[c1] == v1) & (df[c2] == v2)] if len(sub) < 8: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() if wr >= 0.60: combos.append((f'{c1}={v1} & {c2}={v2}', len(sub), wr, avg)) combos.sort(key=lambda x: x[2], reverse=True) log(f' {"组合条件":<45} {"笔数":>5} {"胜率":>7} {"均盈亏":>10}') log(f' {"-"*45} {"-"*5} {"-"*7} {"-"*10}') for cond, n, wr, avg in combos[:20]: log(f' {cond:<45} {n:>5} {wr:>7.1%} {avg:>+10,.0f}') log() def volatility_threshold(df, f): section('第七部分:波动率阈值精确定位', f) log() valid = df[df['波动率分位'].notna()].copy() thresholds = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] log(f' 波动率分位 < X 时的胜率:') log(f' {"阈值":>8} {"笔数":>6} {"胜率":>8} {"均盈亏":>10}') log(f' {"-"*8} {"-"*6} {"-"*8} {"-"*10}') for thr in thresholds: sub = valid[valid['波动率分位'] < thr] if len(sub) < 5: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() log(f' {thr:>8.1f} {len(sub):>6} {wr:>8.1%} {avg:>+10,.0f}') log() log(f' 波动率分位 >= X 时的胜率:') log(f' {"阈值":>8} {"笔数":>6} {"胜率":>8} {"均盈亏":>10}') log(f' {"-"*8} {"-"*6} {"-"*8} {"-"*10}') for thr in thresholds: sub = valid[valid['波动率分位'] >= thr] if len(sub) < 5: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() log(f' {thr:>8.1f} {len(sub):>6} {wr:>8.1%} {avg:>+10,.0f}') log() def rsi_threshold(df, f): section('第八部分:RSI分位阈值精确定位', f) log() valid = df[df['RSI分位'].notna()].copy() thresholds = [0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] log(f' RSI分位 < X 时的胜率(低RSI/超卖区间做多):') log(f' {"阈值":>8} {"笔数":>6} {"胜率":>8} {"均盈亏":>10}') log(f' {"-"*8} {"-"*6} {"-"*8} {"-"*10}') for thr in thresholds: sub = valid[valid['RSI分位'] < thr] if len(sub) < 5: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() log(f' {thr:>8.1f} {len(sub):>6} {wr:>8.1%} {avg:>+10,.0f}') log() def monthly_rolling(df, f): section('第九部分:滚动月度胜率 — 识别策略周期规律', f) log() monthly = df.groupby('年月').agg( 笔数=('盈利', 'count'), 胜率=('盈利', 'mean'), 总盈亏=('盈亏金额', 'sum') ).reset_index() monthly['年月_str'] = monthly['年月'].astype(str) log(f' {"年月":<10} {"笔数":>5} {"胜率":>7} {"总盈亏":>12} {"状态":>6}') log(f' {"-"*10} {"-"*5} {"-"*7} {"-"*12} {"-"*6}') for _, row in monthly.iterrows(): state = '★好' if row['胜率'] >= 0.6 else ('△差' if row['胜率'] < 0.35 else ' ') log(f' {row["年月_str"]:<10} {row["笔数"]:>5} {row["胜率"]:>7.1%} {row["总盈亏"]:>+12,.0f} {state:>6}') log() # 统计好月份的特征 good_months = monthly[monthly['胜率'] >= 0.6] bad_months = monthly[monthly['胜率'] < 0.35] log(f' 胜率≥60%的月份: {len(good_months)}个, 总计{good_months["笔数"].sum()}笔') log(f' 胜率<35%的月份: {len(bad_months)}个, 总计{bad_months["笔数"].sum()}笔') log() def comfort_zone_score(df, f): section('第十部分:多维舒适区评分模型', f) log() log(' 基于以上分析,建立量化评分规则(每项满足得分累加):') log() df2 = df.copy() # 规则定义:(描述, 条件函数, 分值) rules = [] # 波动率 if df2['波动率分位'].notna().any(): rules.append(('波动率分位 < 0.4', lambda r: r['波动率分位'] < 0.4 if pd.notna(r['波动率分位']) else False, 2)) rules.append(('波动率分位 < 0.2', lambda r: r['波动率分位'] < 0.2 if pd.notna(r['波动率分位']) else False, 1)) # RSI if df2['RSI分位'].notna().any(): rules.append(('RSI分位 < 0.4', lambda r: r['RSI分位'] < 0.4 if pd.notna(r['RSI分位']) else False, 1)) rules.append(('RSI分位 < 0.5 (偏低)', lambda r: r['RSI分位'] < 0.5 if pd.notna(r['RSI分位']) else False, 1)) # 趋势 rules.append(('趋势中期=上涨', lambda r: r['趋势中期'] == '上涨', 2)) rules.append(('趋势短期=上涨', lambda r: r['趋势短期'] == '上涨', 1)) # 布林带 rules.append(('布林带区域=下轨中位', lambda r: '下轨' in str(r['布林带区域']), 1)) rules.append(('布林带区域=中轨', lambda r: r['布林带区域'] == '中轨', 1)) # 市场状态 rules.append(('市场状态=强趋势低波', lambda r: r['市场状态'] == '强趋势低波', 2)) rules.append(('市场状态=趋势上涨', lambda r: '趋势' in str(r['市场状态']) and '上' in str(r['市场状态']), 1)) # 计算每笔交易的舒适区评分 def score_trade(row): s = 0 for _, cond, pts in rules: try: if cond(row): s += pts except: pass return s df2['舒适区评分'] = df2.apply(score_trade, axis=1) # 按评分分组 log(f' {"评分":>6} {"笔数":>5} {"胜率":>7} {"均盈亏":>10} {"总盈亏":>12}') log(f' {"-"*6} {"-"*5} {"-"*7} {"-"*10} {"-"*12}') for score in sorted(df2['舒适区评分'].unique()): sub = df2[df2['舒适区评分'] == score] wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() total = sub['盈亏金额'].sum() log(f' {score:>6} {len(sub):>5} {wr:>7.1%} {avg:>+10,.0f} {total:>+12,.0f}') log() # 阈值建议 for thr in [4, 5, 6, 7, 8]: sub = df2[df2['舒适区评分'] >= thr] if len(sub) < 5: continue wr = sub['盈利'].mean() avg = sub['盈亏金额'].mean() cov = len(sub) / len(df2) log(f' 评分≥{thr}: {len(sub)}笔 ({cov:.1%}覆盖) | 胜率{wr:.1%} | 均盈亏{avg:+,.0f}元') log() # 输出规则列表 log(' 评分规则明细:') for desc, _, pts in rules: log(f' +{pts}分 {desc}') log() return df2 def momentum_analysis(df, f): section('第十一部分:1日动量与胜率关系', f) log() valid = df[df['1日动量'].notna()].copy() log(f' 1日动量 > 0 (上涨动量): {len(valid[valid["1日动量"]>0])}笔, 胜率{valid[valid["1日动量"]>0]["盈利"].mean():.1%}') log(f' 1日动量 < 0 (下跌动量): {len(valid[valid["1日动量"]<0])}笔, 胜率{valid[valid["1日动量"]<0]["盈利"].mean():.1%}') log() corr, pval = stats.pointbiserialr(valid['1日动量'], valid['盈利'].astype(int)) log(f' 动量与胜率相关系数: r={corr:.3f}, p={pval:.3f}{" ★显著" if pval < 0.05 else ""}') log() def t1_effect(df, f): section('第十二部分:T+1调整对胜率的影响', f) log() t1 = df[df['T1调整'].str.contains('T1|T0', na=False)] not_t1 = df[~df['T1调整'].str.contains('T1|T0', na=False)] log(f' T+1调整交易: {len(t1)}笔, 胜率{t1["盈利"].mean():.1%}, 均盈亏{t1["盈亏金额"].mean():+,.0f}元') log(f' 非T+1交易: {len(not_t1)}笔, 胜率{not_t1["盈利"].mean():.1%}, 均盈亏{not_t1["盈亏金额"].mean():+,.0f}元') # 原盈亏 vs 新盈亏 t1_valid = t1[t1['原盈亏'].notna() & t1['盈亏金额'].notna()] if len(t1_valid) > 0: orig_wins = (t1_valid['原盈亏'] > 0).mean() new_wins = (t1_valid['盈亏金额'] > 0).mean() log(f' T+1调整笔中,原始胜率{orig_wins:.1%} → T+1后胜率{new_wins:.1%}') log() def final_summary(f): section('第十三部分:综合结论与舒适区定义', f) log() log(' 基于以上分析,策略舒适区的量化定义:') log() log(' 【核心必要条件】(缺一不可)') log(' 1. 波动率水平 ≠ "极高" (高波动是最大杀手)') log(' 2. 波动率分位 < 0.5 (处于历史波动率中位以下)') log() log(' 【加分条件】(满足越多越好)') log(' +2分 趋势中期 = "上涨" (中期趋势配合做多方向)') log(' +2分 市场状态 = "强趋势低波" (最优市场环境)') log(' +2分 波动率分位 < 0.4 (低波动率环境)') log(' +1分 趋势短期 = "上涨" (短期趋势配合)') log(' +1分 RSI分位 < 0.5 (RSI未过热)') log(' +1分 布林带区域含 "下轨" (价格位于布林带下方回调位)') log(' +1分 波动率分位 < 0.2 (极低波动率加成)') log() log(' 【评分建议】') log(' 总分 ≥ 5分: 进入舒适区,正常交易') log(' 总分 3-4分: 半舒适区,减半仓位') log(' 总分 < 3分: 非舒适区,建议观望或跳过') log() log(' 【时间规律】') log(' 策略在创业板行情趋势明确、波动率收敛后表现最好') log(' 2025-2026年市场结构更适合此策略(可能与流动性环境有关)') log() def main(): # 重定向输出到文件 orig_stdout = sys.stdout with open(OUTPUT_FILE, 'w', encoding='utf-8') as f_out: # 双重输出:控制台 + 文件 class Tee: def __init__(self, *files): self.files = files def write(self, data): for fh in self.files: fh.write(data) def flush(self): for fh in self.files: fh.flush() sys.stdout = Tee(orig_stdout, f_out) log('CYB50 T+1 策略舒适区深度研究报告') log(f'数据覆盖: 2023-03-27 ~ 2026-03-25') log() df = load_trades() log(f'加载交易记录: {len(df)}笔') log() f = None # 文件句柄占位,实际通过Tee输出 yearly_summary(df, f) analyze_categorical(df, '市场状态', '市场状态', f) analyze_categorical(df, '趋势中期', '趋势中期', f) analyze_categorical(df, '波动率水平', '波动率水平', f) analyze_categorical(df, '布林带区域', '布林带区域', f) analyze_categorical(df, 'RSI区域', 'RSI区域', f) good_vs_bad_distributions(df, f) analyze_continuous_bins(df, '波动率分位', '波动率分位', f) analyze_continuous_bins(df, 'RSI分位', 'RSI分位', f) analyze_continuous_bins(df, '布林带位置', '布林带位置', f) analyze_continuous_bins(df, '趋势强度', '趋势强度', f) volatility_threshold(df, f) rsi_threshold(df, f) monthly_rolling(df, f) winning_condition_scan(df, f) combo_scan(df, f) momentum_analysis(df, f) t1_effect(df, f) comfort_zone_score(df, f) final_summary(f) log('=' * 72) log('分析完成') log('=' * 72) sys.stdout = orig_stdout print(f'\n结果已保存到: {OUTPUT_FILE}') if __name__ == '__main__': main()