#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 K线 + 日线指标 舒适区 / 黑暗区 深度分析 ────────────────────────────────────────────────────────────────────── 数据管道: cyb50_30min_2023_to_20260325.csv → resample 1D → 日线指标(RSI/KDJ/MACD/MA/BB/动量) backtest_vB_fee_*.csv(195笔,全部非死亡区,含30分钟K线指标) → attach_daily_indicators → 完整 30min + 日线 指标集 分析: 单变量:每个指标分箱 → count / WR / avg_PnL / total_PnL 双变量:关键指标对的 2D 组合矩阵 区域识别:WR≥60% → 舒适区;WR≤35% → 黑暗区 """ import sys, io, os, glob if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import pandas as pd import numpy as np from datetime import datetime import warnings warnings.filterwarnings('ignore') # ────────────────────────────────────────────────────────────────── # 配置 # ────────────────────────────────────────────────────────────────── DATA_CSV = os.path.join(os.path.dirname(__file__), 'cyb50_30min_2023_to_20260325.csv') MIN_TRADES = 4 # 最少笔数(统计置信阈值) WR_COMFORT = 0.60 # 舒适区胜率阈值 WR_DARK = 0.35 # 黑暗区胜率阈值 TOP_N = 15 # 展示前 N 个区域 SEP = '=' * 76 # ────────────────────────────────────────────────────────────────── # 1. 数据加载 # ────────────────────────────────────────────────────────────────── def load_backtest_vB() -> pd.DataFrame: """加载最新 backtest_vB_fee_*.csv(195笔非死亡区全量)""" pattern = os.path.join(os.path.dirname(__file__), 'backtest_vB_fee_*.csv') files = sorted(glob.glob(pattern)) if not files: raise FileNotFoundError( "未找到 backtest_vB_fee_*.csv,请先运行 final_strategy.py") latest = files[-1] print(f" 回测数据: {os.path.basename(latest)}") df = pd.read_csv(latest, encoding='utf-8-sig') df['开仓时间'] = pd.to_datetime(df['开仓时间']) df['平仓时间'] = pd.to_datetime(df['平仓时间']) df['盈利'] = df['盈利标记'] == '盈' for c in ['实际盈亏', 'RSI', 'K', 'D', 'J', 'MACD_hist', 'BB_width', 'ATR_Pct', 'Momentum', 'Volume_Ratio']: if c in df.columns: df[c] = pd.to_numeric(df[c], errors='coerce') return df def build_daily_indicators() -> pd.DataFrame: """从30分钟K线聚合日线,计算日线技术指标""" raw = pd.read_csv(DATA_CSV, encoding='utf-8-sig') raw['DateTime'] = pd.to_datetime(raw['DateTime']) raw.set_index('DateTime', inplace=True) raw.sort_index(inplace=True) # 聚合为日线(只保留有数据的交易日) d = raw.resample('D').agg( Open=('Open', 'first'), High=('High', 'max'), Low=('Low', 'min'), Close=('Close', 'last'), Volume=('Volume', 'sum') ).dropna(subset=['Close']) # ── RSI(14) ── delta = d['Close'].diff() gain = delta.where(delta > 0, 0).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() d['RSI_D'] = 100 - 100 / (1 + gain / (loss + 1e-9)) # ── KDJ(9,3,3) ── low9 = d['Low'].rolling(9).min() high9 = d['High'].rolling(9).max() rsv = (d['Close'] - low9) / (high9 - low9 + 1e-9) * 100 d['K_D'] = rsv.ewm(com=2, adjust=False).mean() d['D_D'] = d['K_D'].ewm(com=2, adjust=False).mean() d['J_D'] = 3 * d['K_D'] - 2 * d['D_D'] # ── MACD(12,26,9) ── ema12 = d['Close'].ewm(span=12, adjust=False).mean() ema26 = d['Close'].ewm(span=26, adjust=False).mean() d['MACD_D'] = ema12 - ema26 d['Signal_D'] = d['MACD_D'].ewm(span=9, adjust=False).mean() d['MACDhist_D'] = d['MACD_D'] - d['Signal_D'] # ── 均线 & 偏离度 ── d['MA5_D'] = d['Close'].rolling(5).mean() d['MA10_D'] = d['Close'].rolling(10).mean() d['MA20_D'] = d['Close'].rolling(20).mean() d['MA60_D'] = d['Close'].rolling(60).mean() d['pct_MA20'] = (d['Close'] - d['MA20_D']) / d['MA20_D'] * 100 d['pct_MA60'] = (d['Close'] - d['MA60_D']) / d['MA60_D'] * 100 d['MA5_slope'] = d['MA5_D'].diff() / d['MA5_D'].shift() * 100 # MA5日斜率% # ── 布林带位置(20,2) ── bm = d['Close'].rolling(20).mean() bstd = d['Close'].rolling(20).std() bl, bu = bm - 2 * bstd, bm + 2 * bstd d['BB_pos_D'] = (d['Close'] - bl) / (bu - bl + 1e-9) # 0~1 # ── 动量 ── d['Mom5_D'] = d['Close'].pct_change(5) * 100 d['Mom10_D'] = d['Close'].pct_change(10) * 100 return d def attach_daily(trades: pd.DataFrame, daily: pd.DataFrame) -> pd.DataFrame: """为每笔交易附加开仓日的日线指标""" daily_cols = ['RSI_D', 'K_D', 'D_D', 'J_D', 'MACD_D', 'MACDhist_D', 'pct_MA20', 'pct_MA60', 'MA5_slope', 'BB_pos_D', 'Mom5_D', 'Mom10_D'] result = trades.copy() for c in daily_cols: result[c] = float('nan') for i, row in trades.iterrows(): entry_day = row['开仓时间'].normalize() if entry_day in daily.index: for c in daily_cols: if c in daily.columns: result.at[i, c] = daily.loc[entry_day, c] return result # ────────────────────────────────────────────────────────────────── # 2. 指标分箱定义 # ────────────────────────────────────────────────────────────────── IND_BINS = { # ── 30分钟指标 ── 'RSI': ( [-np.inf, 25, 35, 45, 55, 65, np.inf], ['超卖<25', '弱25-35', '偏弱35-45', '中45-55', '偏强55-65', '超买>65'] ), 'K': ( [-np.inf, 20, 40, 60, 80, np.inf], ['超卖<20', '低20-40', '中40-60', '高60-80', '超买>80'] ), 'J': ( [-np.inf, 0, 20, 50, 80, 100, np.inf], ['极卖<0', '超卖0-20', '低20-50', '高50-80', '超买80-100', '极买>100'] ), 'MACD_hist': ( [-np.inf, -0.8, -0.2, 0, 0.2, 0.8, np.inf], ['强空<-0.8', '空-0.8~-0.2', '弱空~0', '弱多0~0.2', '多0.2~0.8', '强多>0.8'] ), 'Momentum': ( [-np.inf, -0.03, -0.01, 0, 0.01, 0.03, np.inf], ['大跌>3%', '跌1-3%', '小跌<1%', '小涨<1%', '涨1-3%', '大涨>3%'] ), 'Volume_Ratio': ( [-np.inf, 0.5, 0.8, 1.2, 1.8, np.inf], ['缩量<0.5', '偏低0.5-0.8', '平量0.8-1.2', '放量1.2-1.8', '大放>1.8'] ), 'BB_width': ( [-np.inf, 0.025, 0.04, 0.06, 0.08, np.inf], ['极窄<2.5%', '窄2.5-4%', '中4-6%', '宽6-8%', '极宽>8%'] ), 'ATR_Pct': ( [-np.inf, 0.006, 0.009, 0.013, 0.017, np.inf], ['极低<0.6%', '低0.6-0.9%', '中0.9-1.3%', '高1.3-1.7%', '极高>1.7%'] ), # ── 日线指标 ── 'RSI_D': ( [-np.inf, 30, 40, 50, 60, 70, np.inf], ['超卖<30', '弱30-40', '偏弱40-50', '中50-60', '偏强60-70', '超买>70'] ), 'K_D': ( [-np.inf, 20, 40, 60, 80, np.inf], ['超卖<20', '低20-40', '中40-60', '高60-80', '超买>80'] ), 'J_D': ( [-np.inf, 0, 20, 50, 80, 100, np.inf], ['极卖<0', '超卖0-20', '低20-50', '高50-80', '超买80-100', '极买>100'] ), 'MACDhist_D': ( [-np.inf, -2, -0.5, 0, 0.5, 2, np.inf], ['强空<-2', '空-2~-0.5', '弱空~0', '弱多0~0.5', '多0.5~2', '强多>2'] ), 'pct_MA20': ( [-np.inf, -5, -2, 0, 2, 5, np.inf], ['大幅低<-5%', '低-5~-2%', '略低-2~0%', '略高0~2%', '高2~5%', '大幅高>5%'] ), 'pct_MA60': ( [-np.inf, -8, -3, 0, 3, 8, np.inf], ['大幅低<-8%', '低-8~-3%', '略低-3~0%', '略高0~3%', '高3~8%', '大幅高>8%'] ), 'BB_pos_D': ( [-np.inf, 0.1, 0.3, 0.5, 0.7, 0.9, np.inf], ['超下轨<0.1', '下轨0.1-0.3', '中下0.3-0.5', '中上0.5-0.7', '上轨0.7-0.9', '超上轨>0.9'] ), 'MA5_slope': ( [-np.inf, -0.5, -0.1, 0.1, 0.5, np.inf], ['急降<-0.5%', '下降-0.5~-0.1%', '平-0.1~0.1%', '上升0.1~0.5%', '急升>0.5%'] ), 'Mom5_D': ( [-np.inf, -4, -2, 0, 2, 4, np.inf], ['大跌>4%', '跌2-4%', '小跌<2%', '小涨<2%', '涨2-4%', '大涨>4%'] ), 'Mom10_D': ( [-np.inf, -6, -3, 0, 3, 6, np.inf], ['大跌>6%', '跌3-6%', '小跌<3%', '小涨<3%', '涨3-6%', '大涨>6%'] ), } def do_bin(series: pd.Series, col: str) -> pd.Series: """对指标列分箱,返回带标签的Categorical""" bins, labels = IND_BINS[col] return pd.cut(pd.to_numeric(series, errors='coerce'), bins=bins, labels=labels, include_lowest=True) # ────────────────────────────────────────────────────────────────── # 3. 分析函数 # ────────────────────────────────────────────────────────────────── def analyze_univariate(df: pd.DataFrame, min_n=MIN_TRADES) -> dict: """单变量分析:返回 {col: DataFrame(bin, n, wr, avg_pnl, total_pnl)}""" results = {} for col in IND_BINS: if col not in df.columns: continue binned = do_bin(df[col], col) tmp = df.copy() tmp['_b'] = binned grp = (tmp.groupby('_b', observed=True) .agg(n=('盈利', 'count'), wr=('盈利', 'mean'), avg_pnl=('实际盈亏', 'mean'), total_pnl=('实际盈亏', 'sum')) .reset_index() .rename(columns={'_b': 'bin'})) grp = grp[grp['n'] >= min_n].copy() grp['col'] = col results[col] = grp return results # 双变量分析的关键指标对 BIVARIATE_PAIRS = [ ('RSI', 'RSI_D'), # 30min超卖 × 日线超卖共振 ('J', 'J_D'), # KDJ J值跨周期 ('Momentum', 'Mom5_D'), # 动量跨周期 ('Volume_Ratio', 'ATR_Pct'), # 放量 × 波动率 ('MACD_hist', 'MACDhist_D'), # MACD多空力道跨周期 ('RSI', 'pct_MA20'), # 超卖 × 均线偏离 ('BB_pos_D', 'J_D'), # 日线布林位置 × 日线KDJ ('RSI_D', 'Mom5_D'), # 日线超卖 × 日线动量 ('RSI', 'MA5_slope'), # 30min超卖 × 日线趋势方向 ('K', 'K_D'), # KDJ K跨周期 ] def analyze_bivariate(df: pd.DataFrame, pairs=BIVARIATE_PAIRS, min_n=MIN_TRADES) -> dict: """双变量分析:返回 {c1×c2: DataFrame(c1_bin, c2_bin, n, wr, avg_pnl, total_pnl)}""" results = {} for c1, c2 in pairs: if c1 not in df.columns or c2 not in df.columns: continue tmp = df.copy() tmp['_b1'] = do_bin(df[c1], c1) tmp['_b2'] = do_bin(df[c2], c2) grp = (tmp.groupby(['_b1', '_b2'], observed=True) .agg(n=('盈利', 'count'), wr=('盈利', 'mean'), avg_pnl=('实际盈亏', 'mean'), total_pnl=('实际盈亏', 'sum')) .reset_index()) grp.columns = [c1, c2, 'n', 'wr', 'avg_pnl', 'total_pnl'] grp = grp[grp['n'] >= min_n].copy() grp['pair'] = f'{c1}×{c2}' results[f'{c1}×{c2}'] = grp return results # ────────────────────────────────────────────────────────────────── # 4. 区域识别 # ────────────────────────────────────────────────────────────────── def identify_zones(uni: dict, biv: dict): """ 从单变量+双变量结果中提炼舒适区(WR≥60%)和黑暗区(WR≤35%)。 综合评分 = WR × avg_PnL(正向 → 舒适;负向 → 黑暗) """ rows = [] # 单变量 for col, grp in uni.items(): for _, r in grp.iterrows(): rows.append({ 'type': '单变量', 'dim1': col, 'bin1': str(r['bin']), 'dim2': '', 'bin2': '', 'n': int(r['n']), 'wr': r['wr'], 'avg_pnl': r['avg_pnl'], 'total_pnl': r['total_pnl'], 'score': r['wr'] * r['avg_pnl'], }) # 双变量 for key, grp in biv.items(): c1, c2 = key.split('×') for _, r in grp.iterrows(): rows.append({ 'type': '双变量', 'dim1': c1, 'bin1': str(r[c1]), 'dim2': c2, 'bin2': str(r[c2]), 'n': int(r['n']), 'wr': r['wr'], 'avg_pnl': r['avg_pnl'], 'total_pnl': r['total_pnl'], 'score': r['wr'] * r['avg_pnl'], }) all_df = pd.DataFrame(rows) comfort = (all_df[all_df['wr'] >= WR_COMFORT] .sort_values('score', ascending=False) .reset_index(drop=True)) dark = (all_df[all_df['wr'] <= WR_DARK] .sort_values('score', ascending=True) .reset_index(drop=True)) return all_df, comfort, dark # ────────────────────────────────────────────────────────────────── # 5. 报告输出 # ────────────────────────────────────────────────────────────────── def _wr_bar(wr: float, width=20) -> str: """ASCII 胜率条""" filled = int(wr * width) bar = '█' * filled + '░' * (width - filled) return f'[{bar}] {wr:.1%}' def print_univariate_summary(uni: dict): """打印每个指标的分箱胜率摘要(以WR排序)""" print(f'\n{"指标":<15} {"分箱":<20} {"笔数":>5} {"胜率":>28} {"均盈亏":>10} {"累计盈亏":>12}') print('-' * 92) for col, grp in uni.items(): if grp.empty: continue grp_s = grp.sort_values('wr', ascending=False) first = True for _, r in grp_s.iterrows(): col_disp = col if first else '' first = False bar = _wr_bar(r['wr']) pnl_sign = '+' if r['avg_pnl'] >= 0 else '' print(f'{col_disp:<15} {str(r["bin"]):<20} {int(r["n"]):>5} ' f'{bar} {pnl_sign}{r["avg_pnl"]:>9,.0f} {r["total_pnl"]:>+12,.0f}') print() def print_bivariate_heatmap(biv: dict, top_each=6): """打印每个双变量组合的 Top 舒适+黑暗区""" for pair_key, grp in biv.items(): if grp.empty: continue c1, c2 = pair_key.split('×') print(f'\n ◆ {pair_key} (共{len(grp)}个有效组合,最少{MIN_TRADES}笔)') print(f' {"":1} {c1:<22} {c2:<22} {"笔数":>4} {"胜率":>7} {"均盈亏":>10} {"累计盈亏":>12}') print(f' {"-"} {"-"*22} {"-"*22} {"-"*4} {"-"*7} {"-"*10} {"-"*12}') # 舒适 top_comfort = grp.nlargest(top_each, 'wr') for _, r in top_comfort.iterrows(): flag = '★' if r['wr'] >= WR_COMFORT else ' ' print(f' {flag} {str(r[c1]):<22} {str(r[c2]):<22} {int(r["n"]):>4} ' f'{r["wr"]:>7.1%} {r["avg_pnl"]:>+10,.0f} {r["total_pnl"]:>+12,.0f}') # 黑暗(如果有) dark_rows = grp[grp['wr'] <= WR_DARK].nsmallest(min(3, top_each), 'wr') if not dark_rows.empty: print(f' --- 黑暗 ---') for _, r in dark_rows.iterrows(): print(f' ✗ {str(r[c1]):<22} {str(r[c2]):<22} {int(r["n"]):>4} ' f'{r["wr"]:>7.1%} {r["avg_pnl"]:>+10,.0f} {r["total_pnl"]:>+12,.0f}') def print_zones(comfort: pd.DataFrame, dark: pd.DataFrame, top_n=TOP_N): """打印最终舒适区/黑暗区汇总""" def _fmt_zone(row): if row['dim2']: return f"{row['dim1']}={row['bin1']} & {row['dim2']}={row['bin2']}" return f"{row['dim1']}={row['bin1']}" print(f'\n{"★ 舒适区 TOP":=<76}') print(f' {"#":>3} {"类型":<5} {"条件":<52} {"笔数":>4} {"胜率":>7} {"均盈亏":>10}') print(f' {"---":>3} {"-----":<5} {"-"*52} {"-"*4} {"-"*7} {"-"*10}') for i, (_, r) in enumerate(comfort.head(top_n).iterrows(), 1): cond = _fmt_zone(r) print(f' {i:>3} {r["type"]:<5} {cond:<52} {int(r["n"]):>4} ' f'{r["wr"]:>7.1%} {r["avg_pnl"]:>+10,.0f}') print(f'\n{"✗ 黑暗区 TOP":=<76}') print(f' {"#":>3} {"类型":<5} {"条件":<52} {"笔数":>4} {"胜率":>7} {"均盈亏":>10}') print(f' {"---":>3} {"-----":<5} {"-"*52} {"-"*4} {"-"*7} {"-"*10}') for i, (_, r) in enumerate(dark.head(top_n).iterrows(), 1): cond = _fmt_zone(r) print(f' {i:>3} {r["type"]:<5} {cond:<52} {int(r["n"]):>4} ' f'{r["wr"]:>7.1%} {r["avg_pnl"]:>+10,.0f}') # ────────────────────────────────────────────────────────────────── # 6. CSV 导出 # ────────────────────────────────────────────────────────────────── def export_csv(trades: pd.DataFrame, all_zones: pd.DataFrame, comfort: pd.DataFrame, dark: pd.DataFrame): ts = datetime.now().strftime('%Y%m%d_%H%M%S') out = os.path.dirname(__file__) # 每笔交易完整指标(含日线) fname_trades = os.path.join(out, f'zone_trades_{ts}.csv') trades.to_csv(fname_trades, index=False, encoding='utf-8-sig') print(f' 交易明细(含日线指标): {os.path.basename(fname_trades)} ({len(trades)}笔)') # 全量分析结果 fname_zones = os.path.join(out, f'zone_analysis_{ts}.csv') all_zones.to_csv(fname_zones, index=False, encoding='utf-8-sig') print(f' 全量区域分析: {os.path.basename(fname_zones)} ({len(all_zones)}行)') # 舒适区 fname_c = os.path.join(out, f'zone_comfort_{ts}.csv') comfort.to_csv(fname_c, index=False, encoding='utf-8-sig') print(f' 舒适区: {os.path.basename(fname_c)} ({len(comfort)}个区域)') # 黑暗区 fname_d = os.path.join(out, f'zone_dark_{ts}.csv') dark.to_csv(fname_d, index=False, encoding='utf-8-sig') print(f' 黑暗区: {os.path.basename(fname_d)} ({len(dark)}个区域)') # ────────────────────────────────────────────────────────────────── # 7. 主流程 # ────────────────────────────────────────────────────────────────── def main(): print(SEP) print(' CYB50 K线 + 日线指标 舒适区 / 黑暗区 深度分析') print(f' Version B(非死亡区全量)| 最少笔数≥{MIN_TRADES} | ' f'舒适≥{WR_COMFORT:.0%} | 黑暗≤{WR_DARK:.0%}') print(SEP) # ── 加载数据 ── print(f'\n📂 加载数据...') trades = load_backtest_vB() print(f' 回测交易: {len(trades)}笔 ' f'{trades["开仓时间"].min().date()} ~ {trades["开仓时间"].max().date()}') print(f'\n📈 计算日线指标...') daily = build_daily_indicators() print(f' 日线: {len(daily)}条 {daily.index[0].date()} ~ {daily.index[-1].date()}') print(f'\n🔗 关联日线指标...') trades = attach_daily(trades, daily) daily_hits = trades['RSI_D'].notna().sum() print(f' 成功关联日线指标: {daily_hits}/{len(trades)} 笔') # ── 分析 ── print(f'\n🔬 单变量分析({len(IND_BINS)}个指标)...') uni = analyze_univariate(trades) print(f' 完成: {len(uni)}个指标') print(f'\n🔬 双变量分析({len(BIVARIATE_PAIRS)}个组合对)...') biv = analyze_bivariate(trades) print(f' 完成: {len(biv)}个组合对') print(f'\n🎯 识别舒适区 / 黑暗区...') all_zones, comfort, dark = identify_zones(uni, biv) print(f' 舒适区(WR≥{WR_COMFORT:.0%}): {len(comfort)}个') print(f' 黑暗区(WR≤{WR_DARK:.0%}): {len(dark)}个') # ── 报告 ── print(f'\n{SEP}') print(' 单变量分析 — 各指标分箱胜率') print(SEP) print_univariate_summary(uni) print(f'\n{SEP}') print(' 双变量分析 — 关键指标对组合') print(SEP) print_bivariate_heatmap(biv) print(f'\n{SEP}') print(' 综合区域汇总') print(SEP) print_zones(comfort, dark) # ── 导出 ── print(f'\n📁 导出CSV...') export_csv(trades, all_zones, comfort, dark) print() print(SEP) print(' 分析完成') print(SEP) if __name__ == '__main__': main()