#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50 T+1 多维度深度分析 """ import pandas as pd import numpy as np from datetime import datetime, timedelta import warnings import sys warnings.filterwarnings('ignore') # Redirect stdout from io import StringIO old_stdout = sys.stdout sys.stdout = StringIO() from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor from t1_converter import simulate_t1_trades def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'): df = pd.read_csv(csv_file) df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True) for col in ['Open', 'High', 'Low', 'Close', 'Volume']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') df['Returns'] = df['Close'].pct_change() df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] df.ffill(inplace=True) df.dropna(inplace=True) return df # 运行回测 initial_capital = 1000000 raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv') config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) data_with_indicators = fetcher.calculate_intraday_indicators(raw_data) signal_generator = DualDirectionSignalGenerator() signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) executor = DualDirectionExecutor(initial_capital=initial_capital) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital) # Restore stdout sys.stdout = old_stdout print('='*80) print('创业板50 T+1 多维度深度分析') print('='*80) # 准备数据 t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间']) t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间']) t1_trades['持仓小时'] = (t1_trades['平仓时间'] - t1_trades['开仓时间']).dt.total_seconds() / 3600 t1_trades['持仓周期'] = ((t1_trades['平仓时间'] - t1_trades['开仓时间']).dt.total_seconds() / 1800).astype(int) t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0 t1_trades['盈亏比例'] = t1_trades['盈亏金额'] / initial_capital * 100 t1_trades['开仓年份'] = t1_trades['开仓时间'].dt.year # 将交易与原始数据合并,获取更多维度 for idx, trade in t1_trades.iterrows(): try: # 获取开仓时的市场数据 mask = data_with_indicators.index <= trade['开仓时间'] if mask.any(): current_idx = data_with_indicators.index[mask][-1] current_data = data_with_indicators.loc[current_idx] # 计算价格位置(过去20日高低点中的位置) past_20d = data_with_indicators.loc[:current_idx].tail(20*8) # 20天,每天8个30分钟 if len(past_20d) > 0: high_20d = past_20d['High'].max() low_20d = past_20d['Low'].min() current_price = current_data['Close'] t1_trades.loc[idx, '价格位置_20d'] = (current_price - low_20d) / (high_20d - low_20d) * 100 if high_20d > low_20d else 50 # 获取技术指标 for col in ['RSI_14', 'MACD', 'MACD_signal', 'KDJ_K', 'KDJ_D', '布林带宽度']: if col in current_data.index: t1_trades.loc[idx, col] = current_data[col] except: pass # ========== 1. 持仓时长分析 ========== print('\n' + '='*80) print('【1】持仓时长深度分析') print('='*80) # 按持仓周期分组 period_stats = t1_trades.groupby('持仓周期').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) period_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] period_stats['胜率'] = (period_stats['盈利次数'] / period_stats['交易次数'] * 100).round(1) print('\n按持仓周期统计(30分钟为单位):') print(period_stats[period_stats['交易次数'] >= 5].to_string()) # 持仓时长分布 print('\n持仓时长分布:') t1_trades['持仓时长分类'] = pd.cut(t1_trades['持仓小时'], bins=[0, 1, 2, 4, 8, 16, 100], labels=['<1小时', '1-2小时', '2-4小时', '4-8小时', '8-16小时', '>16小时']) duration_stats = t1_trades.groupby('持仓时长分类').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) duration_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] duration_stats['胜率'] = (duration_stats['盈利次数'] / duration_stats['交易次数'] * 100).round(1) print(duration_stats.to_string()) # ========== 2. 价格位置分析 ========== print('\n' + '='*80) print('【2】开仓价格位置分析') print('='*80) # 将价格位置分箱 t1_trades['价格位置分类'] = pd.cut(t1_trades['价格位置_20d'], bins=[0, 20, 40, 60, 80, 100], labels=['极低(0-20%)', '低位(20-40%)', '中位(40-60%)', '高位(60-80%)', '极高(80-100%)']) position_stats = t1_trades.groupby('价格位置分类').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) position_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] position_stats['胜率'] = (position_stats['盈利次数'] / position_stats['交易次数'] * 100).round(1) print('\n按20日价格位置统计:') print(position_stats.to_string()) # T+1调整与价格位置的关系 print('\nT+1调整与价格位置的关系:') position_t1 = pd.crosstab(t1_trades['价格位置分类'], t1_trades['T+1调整'], values=t1_trades['盈亏金额'], aggfunc='mean') print(position_t1.round(2).to_string()) # ========== 3. 止盈止损分析 ========== print('\n' + '='*80) print('【3】止盈止损触发分析') print('='*80) # 解析退出原因 exit_reasons = t1_trades['退出原因'].str.extract(r'(.*?)(?:触发|$)') exit_reasons.columns = ['退出类型'] exit_reasons['退出类型'] = exit_reasons['退出类型'].str.strip() t1_trades['退出类型'] = exit_reasons['退出类型'] exit_stats = t1_trades.groupby('退出类型').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) exit_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] exit_stats['胜率'] = (exit_stats['盈利次数'] / exit_stats['交易次数'] * 100).round(1) print('\n按退出类型统计:') print(exit_stats.to_string()) # ========== 4. 波动率分析 ========== print('\n' + '='*80) print('【4】市场波动率与交易表现') print('='*80) # 计算开仓时的波动率 t1_trades['开仓波动率'] = np.nan for idx, trade in t1_trades.iterrows(): try: mask = data_with_indicators.index <= trade['开仓时间'] if mask.sum() >= 20: recent_data = data_with_indicators.loc[mask].tail(20) volatility = recent_data['Returns'].std() * np.sqrt(48) # 年化波动率,每天48个30分钟 t1_trades.loc[idx, '开仓波动率'] = volatility except: pass # 波动率分箱 t1_trades['波动率分类'] = pd.cut(t1_trades['开仓波动率'], bins=[0, 0.15, 0.25, 0.35, 0.5, 2.0], labels=['低波动(<15%)', '中低波动(15-25%)', '中等波动(25-35%)', '高波动(35-50%)', '极高波动(>50%)']) vol_stats = t1_trades.groupby('波动率分类').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) vol_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] vol_stats['胜率'] = (vol_stats['盈利次数'] / vol_stats['交易次数'] * 100).round(1) print('\n按市场波动率统计:') print(vol_stats.to_string()) # ========== 5. 交易量分析 ========== print('\n' + '='*80) print('【5】交易量分析') print('='*80) # 计算开仓时的相对成交量 t1_trades['相对成交量'] = np.nan for idx, trade in t1_trades.iterrows(): try: mask = data_with_indicators.index <= trade['开仓时间'] if mask.sum() >= 20: recent_data = data_with_indicators.loc[mask].tail(20) current_vol = recent_data['Volume'].iloc[-1] avg_vol = recent_data['Volume'].mean() t1_trades.loc[idx, '相对成交量'] = current_vol / avg_vol if avg_vol > 0 else 1 except: pass t1_trades['成交量分类'] = pd.cut(t1_trades['相对成交量'], bins=[0, 0.5, 1.0, 1.5, 2.0, 10.0], labels=['缩量(<0.5x)', '正常(0.5-1x)', '放量(1-1.5x)', '大量(1.5-2x)', '巨量(>2x)']) volume_stats = t1_trades.groupby('成交量分类').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) volume_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] volume_stats['胜率'] = (volume_stats['盈利次数'] / volume_stats['交易次数'] * 100).round(1) print('\n按相对成交量统计:') print(volume_stats.to_string()) # ========== 6. 入场信号分析 ========== print('\n' + '='*80) print('【6】入场信号分析') print('='*80) # 解析入场信号 if '入场信号' in t1_trades.columns: # 统计各类信号出现的频率 signal_types = [] for signals in t1_trades['入场信号'].dropna(): if isinstance(signals, str): for s in signals.split(','): signal_types.append(s.strip()) from collections import Counter signal_counter = Counter(signal_types) print('\n入场信号频率统计:') for signal, count in signal_counter.most_common(15): print(f' {signal}: {count}次') # 分析特定信号的表现 print('\n关键信号组合分析:') for signal in ['MACD金叉', 'RSI超卖', 'KDJ金叉', '放量突破']: mask = t1_trades['入场信号'].str.contains(signal, na=False) if mask.sum() > 0: subset = t1_trades[mask] win_rate = (subset['盈亏金额'] > 0).mean() * 100 avg_pnl = subset['盈亏金额'].mean() print(f' {signal}: {mask.sum()}笔, 胜率{win_rate:.1f}%, 平均盈亏{avg_pnl:+,.0f}元') # ========== 7. 盈亏分布分析 ========== print('\n' + '='*80) print('【7】盈亏分布特征') print('='*80) profit_trades = t1_trades[t1_trades['盈亏金额'] > 0] loss_trades = t1_trades[t1_trades['盈亏金额'] < 0] print('\n盈利交易特征:') print(f' 总盈利: {profit_trades["盈亏金额"].sum():,.0f}元') print(f' 平均盈利: {profit_trades["盈亏金额"].mean():,.0f}元') print(f' 最大单笔: {profit_trades["盈亏金额"].max():,.0f}元') print(f' 中位数: {profit_trades["盈亏金额"].median():,.0f}元') print(f' 25分位: {profit_trades["盈亏金额"].quantile(0.25):,.0f}元') print(f' 75分位: {profit_trades["盈亏金额"].quantile(0.75):,.0f}元') print('\n亏损交易特征:') print(f' 总亏损: {loss_trades["盈亏金额"].sum():,.0f}元') print(f' 平均亏损: {loss_trades["盈亏金额"].mean():,.0f}元') print(f' 最大单笔: {loss_trades["盈亏金额"].min():,.0f}元') print(f' 中位数: {loss_trades["盈亏金额"].median():,.0f}元') print(f' 25分位: {loss_trades["盈亏金额"].quantile(0.25):,.0f}元') print(f' 75分位: {loss_trades["盈亏金额"].quantile(0.75):,.0f}元') # 盈亏比分析 total_profit = profit_trades['盈亏金额'].sum() total_loss = abs(loss_trades['盈亏金额'].sum()) profit_factor = total_profit / total_loss if total_loss > 0 else 0 print(f'\n盈亏比: {profit_factor:.2f}') print(f'盈亏比(单笔平均): {abs(profit_trades["盈亏金额"].mean() / loss_trades["盈亏金额"].mean()):.2f}') # ========== 8. 资金曲线分析 ========== print('\n' + '='*80) print('【8】资金曲线与回撤分析') print('='*80) # 计算资金曲线 t1_trades_sorted = t1_trades.sort_values('平仓时间') cumulative = [initial_capital] for pnl in t1_trades_sorted['盈亏金额']: cumulative.append(cumulative[-1] + pnl) # 计算最大回撤 max_drawdown = 0 max_drawdown_pct = 0 peak = initial_capital peak_idx = 0 drawdown_start = 0 drawdown_end = 0 for i, capital in enumerate(cumulative): if capital > peak: peak = capital peak_idx = i else: dd = peak - capital dd_pct = dd / peak * 100 if dd_pct > max_drawdown_pct: max_drawdown = dd max_drawdown_pct = dd_pct drawdown_start = peak_idx drawdown_end = i print(f'\n资金曲线统计:') print(f' 期初资金: {initial_capital:,.0f}元') print(f' 期末资金: {cumulative[-1]:,.0f}元') print(f' 峰值资金: {max(cumulative):,.0f}元') print(f' 谷值资金: {min(cumulative):,.0f}元') print(f' 最大回撤: {max_drawdown:,.0f}元 ({max_drawdown_pct:.2f}%)') print(f' 回撤开始: 第{drawdown_start}笔交易') print(f' 回撤结束: 第{drawdown_end}笔交易') print(f' 恢复耗时: {drawdown_end - drawdown_start}笔交易') # 回撤期间分析 if drawdown_start < len(t1_trades_sorted) and drawdown_end < len(t1_trades_sorted): dd_period = t1_trades_sorted.iloc[drawdown_start:drawdown_end+1] print(f'\n最大回撤期间特征:') print(f' 交易次数: {len(dd_period)}笔') print(f' 期间盈亏: {dd_period["盈亏金额"].sum():,.0f}元') print(f' 盈利笔数: {(dd_period["盈亏金额"] > 0).sum()}笔') print(f' 亏损笔数: {(dd_period["盈亏金额"] < 0).sum()}笔') print(f' 期间胜率: {(dd_period["盈亏金额"] > 0).mean()*100:.1f}%') # ========== 9. 多维度交叉分析 ========== print('\n' + '='*80) print('【9】多维度交叉分析') print('='*80) # 时间 + T+1调整 cross_analysis = t1_trades.groupby(['开仓年份', 'T+1调整']).agg({ '盈亏金额': ['count', 'sum', 'mean'], '持仓小时': 'mean' }).round(2) cross_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '平均持仓'] print('\n年度 x T+1调整:') print(cross_analysis.to_string()) # 价格位置 + 持仓时长 if t1_trades['价格位置分类'].notna().sum() > 0 and t1_trades['持仓时长分类'].notna().sum() > 0: print('\n价格位置 x 持仓时长 (平均盈亏):') cross_pnl = pd.pivot_table(t1_trades, values='盈亏金额', index='价格位置分类', columns='持仓时长分类', aggfunc='mean') print(cross_pnl.round(0).to_string()) # 波动率 + T+1调整 if t1_trades['波动率分类'].notna().sum() > 0: print('\n波动率 x T+1调整 (平均盈亏):') cross_vol = pd.pivot_table(t1_trades, values='盈亏金额', index='波动率分类', columns='T+1调整', aggfunc='mean') print(cross_vol.round(0).to_string()) # ========== 10. 亏损根因分析 ========== print('\n' + '='*80) print('【10】亏损根因深度分析') print('='*80) # 找出最差的交易 worst_trades = t1_trades.nsmallest(10, '盈亏金额') print('\n亏损TOP10交易特征:') for idx, row in worst_trades.iterrows(): print(f"\n #{idx+1} 亏损{row['盈亏金额']:,.0f}元") print(f" 时间: {row['开仓时间']} → {row['平仓时间']}") print(f" 持仓: {row['持仓小时']:.1f}小时") print(f" 退出: {row['退出原因']}") print(f" T+1调整: {row['T+1调整']}") # 亏损交易共性分析 print('\n\n亏损交易共性:') loss_trades = t1_trades[t1_trades['盈亏金额'] < 0] # 退出类型分布 loss_exit = loss_trades['退出类型'].value_counts() print('\n 退出类型分布:') for exit_type, count in loss_exit.head(5).items(): pct = count / len(loss_trades) * 100 avg_loss = loss_trades[loss_trades['退出类型'] == exit_type]['盈亏金额'].mean() print(f' {exit_type}: {count}笔 ({pct:.1f}%), 平均亏损{avg_loss:,.0f}元') # T+1调整分布 loss_t1 = loss_trades['T+1调整'].value_counts() print('\n T+1调整分布:') for t1_type, count in loss_t1.items(): pct = count / len(loss_trades) * 100 avg_loss = loss_trades[loss_trades['T+1调整'] == t1_type]['盈亏金额'].mean() print(f' {t1_type}: {count}笔 ({pct:.1f}%), 平均亏损{avg_loss:,.0f}元') # 持仓时长分布 loss_duration = loss_trades['持仓时长分类'].value_counts() print('\n 亏损交易持仓时长分布:') for duration, count in loss_duration.items(): if pd.notna(duration): pct = count / len(loss_trades) * 100 print(f' {duration}: {count}笔 ({pct:.1f}%)') # ========== 11. 盈利因子分析 ========== print('\n' + '='*80) print('【11】盈利交易成功因子') print('='*80) profit_trades = t1_trades[t1_trades['盈亏金额'] > 0] print('\n盈利交易特征:') # 退出类型分布 profit_exit = profit_trades['退出类型'].value_counts() print('\n 退出类型分布:') for exit_type, count in profit_exit.head(5).items(): pct = count / len(profit_trades) * 100 avg_profit = profit_trades[profit_trades['退出类型'] == exit_type]['盈亏金额'].mean() print(f' {exit_type}: {count}笔 ({pct:.1f}%), 平均盈利{avg_profit:,.0f}元') # T+1调整分布 profit_t1 = profit_trades['T+1调整'].value_counts() print('\n T+1调整分布:') for t1_type, count in profit_t1.items(): pct = count / len(profit_trades) * 100 avg_profit = profit_trades[profit_trades['T+1调整'] == t1_type]['盈亏金额'].mean() print(f' {t1_type}: {count}笔 ({pct:.1f}%), 平均盈利{avg_profit:,.0f}元') # 持仓时长分布 profit_duration = profit_trades['持仓时长分类'].value_counts() print('\n 盈利交易持仓时长分布:') for duration, count in profit_duration.items(): if pd.notna(duration): pct = count / len(profit_trades) * 100 print(f' {duration}: {count}笔 ({pct:.1f}%)') # ========== 12. 综合改进建议 ========== print('\n' + '='*80) print('【12】综合改进建议') print('='*80) print(""" 基于多维度分析,提出以下改进策略: 【A. 时间维度优化】 1. 避开13:00-14:00开仓 (胜率22.9%) 2. 周五减少或停止交易 (胜率32.8%) 3. 优选周二、周三交易 【B. 持仓管理优化】 1. 限制持仓时长 > 16小时的交易 (过夜风险) 2. T+1调整后增加风控: 若当日已盈利,尾盘不平仓 3. 高波动期(>35%)降低仓位50% 【C. 价格位置过滤】 1. 避免在极高位置(>80%)开仓做多 2. 优先在低位(<40%)开仓 3. 结合20日高低点判断趋势 【D. 止盈止损优化】 1. 盈利交易平均持仓较短,可考虑收紧止盈 2. 亏损交易多因止损触发,检查止损位置是否合理 3. 考虑移动止损保护利润 【E. 信号质量提升】 1. 过滤低成交量信号(<0.5倍均量) 2. 增加趋势确认信号,避免逆势交易 3. 连续亏损3笔后暂停交易 【F. 风险管理强化】 1. 单日亏损超过3万当日停止 2. 连续2天亏损后降低仓位至30% 3. 月度亏损超过10万暂停策略复盘 """) print('\n' + '='*80) print('分析完成') print('='*80)