| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50 T+1 多维度深度分析
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import warnings
- import sys
- warnings.filterwarnings('ignore')
- # Redirect stdout
- from io import StringIO
- old_stdout = sys.stdout
- sys.stdout = StringIO()
- from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor
- from t1_converter import simulate_t1_trades
- def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
- df = pd.read_csv(csv_file)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
- for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- return df
- # 运行回测
- initial_capital = 1000000
- raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- data_with_indicators = fetcher.calculate_intraday_indicators(raw_data)
- signal_generator = DualDirectionSignalGenerator()
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- executor = DualDirectionExecutor(initial_capital=initial_capital)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital)
- # Restore stdout
- sys.stdout = old_stdout
- print('='*80)
- print('创业板50 T+1 多维度深度分析')
- print('='*80)
- # 准备数据
- t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间'])
- t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间'])
- t1_trades['持仓小时'] = (t1_trades['平仓时间'] - t1_trades['开仓时间']).dt.total_seconds() / 3600
- t1_trades['持仓周期'] = ((t1_trades['平仓时间'] - t1_trades['开仓时间']).dt.total_seconds() / 1800).astype(int)
- t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0
- t1_trades['盈亏比例'] = t1_trades['盈亏金额'] / initial_capital * 100
- t1_trades['开仓年份'] = t1_trades['开仓时间'].dt.year
- # 将交易与原始数据合并,获取更多维度
- for idx, trade in t1_trades.iterrows():
- try:
- # 获取开仓时的市场数据
- mask = data_with_indicators.index <= trade['开仓时间']
- if mask.any():
- current_idx = data_with_indicators.index[mask][-1]
- current_data = data_with_indicators.loc[current_idx]
- # 计算价格位置(过去20日高低点中的位置)
- past_20d = data_with_indicators.loc[:current_idx].tail(20*8) # 20天,每天8个30分钟
- if len(past_20d) > 0:
- high_20d = past_20d['High'].max()
- low_20d = past_20d['Low'].min()
- current_price = current_data['Close']
- t1_trades.loc[idx, '价格位置_20d'] = (current_price - low_20d) / (high_20d - low_20d) * 100 if high_20d > low_20d else 50
- # 获取技术指标
- for col in ['RSI_14', 'MACD', 'MACD_signal', 'KDJ_K', 'KDJ_D', '布林带宽度']:
- if col in current_data.index:
- t1_trades.loc[idx, col] = current_data[col]
- except:
- pass
- # ========== 1. 持仓时长分析 ==========
- print('\n' + '='*80)
- print('【1】持仓时长深度分析')
- print('='*80)
- # 按持仓周期分组
- period_stats = t1_trades.groupby('持仓周期').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- period_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- period_stats['胜率'] = (period_stats['盈利次数'] / period_stats['交易次数'] * 100).round(1)
- print('\n按持仓周期统计(30分钟为单位):')
- print(period_stats[period_stats['交易次数'] >= 5].to_string())
- # 持仓时长分布
- print('\n持仓时长分布:')
- t1_trades['持仓时长分类'] = pd.cut(t1_trades['持仓小时'],
- bins=[0, 1, 2, 4, 8, 16, 100],
- labels=['<1小时', '1-2小时', '2-4小时', '4-8小时', '8-16小时', '>16小时'])
- duration_stats = t1_trades.groupby('持仓时长分类').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- duration_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- duration_stats['胜率'] = (duration_stats['盈利次数'] / duration_stats['交易次数'] * 100).round(1)
- print(duration_stats.to_string())
- # ========== 2. 价格位置分析 ==========
- print('\n' + '='*80)
- print('【2】开仓价格位置分析')
- print('='*80)
- # 将价格位置分箱
- t1_trades['价格位置分类'] = pd.cut(t1_trades['价格位置_20d'],
- bins=[0, 20, 40, 60, 80, 100],
- labels=['极低(0-20%)', '低位(20-40%)', '中位(40-60%)', '高位(60-80%)', '极高(80-100%)'])
- position_stats = t1_trades.groupby('价格位置分类').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- position_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- position_stats['胜率'] = (position_stats['盈利次数'] / position_stats['交易次数'] * 100).round(1)
- print('\n按20日价格位置统计:')
- print(position_stats.to_string())
- # T+1调整与价格位置的关系
- print('\nT+1调整与价格位置的关系:')
- position_t1 = pd.crosstab(t1_trades['价格位置分类'], t1_trades['T+1调整'],
- values=t1_trades['盈亏金额'], aggfunc='mean')
- print(position_t1.round(2).to_string())
- # ========== 3. 止盈止损分析 ==========
- print('\n' + '='*80)
- print('【3】止盈止损触发分析')
- print('='*80)
- # 解析退出原因
- exit_reasons = t1_trades['退出原因'].str.extract(r'(.*?)(?:触发|$)')
- exit_reasons.columns = ['退出类型']
- exit_reasons['退出类型'] = exit_reasons['退出类型'].str.strip()
- t1_trades['退出类型'] = exit_reasons['退出类型']
- exit_stats = t1_trades.groupby('退出类型').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- exit_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- exit_stats['胜率'] = (exit_stats['盈利次数'] / exit_stats['交易次数'] * 100).round(1)
- print('\n按退出类型统计:')
- print(exit_stats.to_string())
- # ========== 4. 波动率分析 ==========
- print('\n' + '='*80)
- print('【4】市场波动率与交易表现')
- print('='*80)
- # 计算开仓时的波动率
- t1_trades['开仓波动率'] = np.nan
- for idx, trade in t1_trades.iterrows():
- try:
- mask = data_with_indicators.index <= trade['开仓时间']
- if mask.sum() >= 20:
- recent_data = data_with_indicators.loc[mask].tail(20)
- volatility = recent_data['Returns'].std() * np.sqrt(48) # 年化波动率,每天48个30分钟
- t1_trades.loc[idx, '开仓波动率'] = volatility
- except:
- pass
- # 波动率分箱
- t1_trades['波动率分类'] = pd.cut(t1_trades['开仓波动率'],
- bins=[0, 0.15, 0.25, 0.35, 0.5, 2.0],
- labels=['低波动(<15%)', '中低波动(15-25%)', '中等波动(25-35%)', '高波动(35-50%)', '极高波动(>50%)'])
- vol_stats = t1_trades.groupby('波动率分类').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- vol_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- vol_stats['胜率'] = (vol_stats['盈利次数'] / vol_stats['交易次数'] * 100).round(1)
- print('\n按市场波动率统计:')
- print(vol_stats.to_string())
- # ========== 5. 交易量分析 ==========
- print('\n' + '='*80)
- print('【5】交易量分析')
- print('='*80)
- # 计算开仓时的相对成交量
- t1_trades['相对成交量'] = np.nan
- for idx, trade in t1_trades.iterrows():
- try:
- mask = data_with_indicators.index <= trade['开仓时间']
- if mask.sum() >= 20:
- recent_data = data_with_indicators.loc[mask].tail(20)
- current_vol = recent_data['Volume'].iloc[-1]
- avg_vol = recent_data['Volume'].mean()
- t1_trades.loc[idx, '相对成交量'] = current_vol / avg_vol if avg_vol > 0 else 1
- except:
- pass
- t1_trades['成交量分类'] = pd.cut(t1_trades['相对成交量'],
- bins=[0, 0.5, 1.0, 1.5, 2.0, 10.0],
- labels=['缩量(<0.5x)', '正常(0.5-1x)', '放量(1-1.5x)', '大量(1.5-2x)', '巨量(>2x)'])
- volume_stats = t1_trades.groupby('成交量分类').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- volume_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- volume_stats['胜率'] = (volume_stats['盈利次数'] / volume_stats['交易次数'] * 100).round(1)
- print('\n按相对成交量统计:')
- print(volume_stats.to_string())
- # ========== 6. 入场信号分析 ==========
- print('\n' + '='*80)
- print('【6】入场信号分析')
- print('='*80)
- # 解析入场信号
- if '入场信号' in t1_trades.columns:
- # 统计各类信号出现的频率
- signal_types = []
- for signals in t1_trades['入场信号'].dropna():
- if isinstance(signals, str):
- for s in signals.split(','):
- signal_types.append(s.strip())
- from collections import Counter
- signal_counter = Counter(signal_types)
- print('\n入场信号频率统计:')
- for signal, count in signal_counter.most_common(15):
- print(f' {signal}: {count}次')
- # 分析特定信号的表现
- print('\n关键信号组合分析:')
- for signal in ['MACD金叉', 'RSI超卖', 'KDJ金叉', '放量突破']:
- mask = t1_trades['入场信号'].str.contains(signal, na=False)
- if mask.sum() > 0:
- subset = t1_trades[mask]
- win_rate = (subset['盈亏金额'] > 0).mean() * 100
- avg_pnl = subset['盈亏金额'].mean()
- print(f' {signal}: {mask.sum()}笔, 胜率{win_rate:.1f}%, 平均盈亏{avg_pnl:+,.0f}元')
- # ========== 7. 盈亏分布分析 ==========
- print('\n' + '='*80)
- print('【7】盈亏分布特征')
- print('='*80)
- profit_trades = t1_trades[t1_trades['盈亏金额'] > 0]
- loss_trades = t1_trades[t1_trades['盈亏金额'] < 0]
- print('\n盈利交易特征:')
- print(f' 总盈利: {profit_trades["盈亏金额"].sum():,.0f}元')
- print(f' 平均盈利: {profit_trades["盈亏金额"].mean():,.0f}元')
- print(f' 最大单笔: {profit_trades["盈亏金额"].max():,.0f}元')
- print(f' 中位数: {profit_trades["盈亏金额"].median():,.0f}元')
- print(f' 25分位: {profit_trades["盈亏金额"].quantile(0.25):,.0f}元')
- print(f' 75分位: {profit_trades["盈亏金额"].quantile(0.75):,.0f}元')
- print('\n亏损交易特征:')
- print(f' 总亏损: {loss_trades["盈亏金额"].sum():,.0f}元')
- print(f' 平均亏损: {loss_trades["盈亏金额"].mean():,.0f}元')
- print(f' 最大单笔: {loss_trades["盈亏金额"].min():,.0f}元')
- print(f' 中位数: {loss_trades["盈亏金额"].median():,.0f}元')
- print(f' 25分位: {loss_trades["盈亏金额"].quantile(0.25):,.0f}元')
- print(f' 75分位: {loss_trades["盈亏金额"].quantile(0.75):,.0f}元')
- # 盈亏比分析
- total_profit = profit_trades['盈亏金额'].sum()
- total_loss = abs(loss_trades['盈亏金额'].sum())
- profit_factor = total_profit / total_loss if total_loss > 0 else 0
- print(f'\n盈亏比: {profit_factor:.2f}')
- print(f'盈亏比(单笔平均): {abs(profit_trades["盈亏金额"].mean() / loss_trades["盈亏金额"].mean()):.2f}')
- # ========== 8. 资金曲线分析 ==========
- print('\n' + '='*80)
- print('【8】资金曲线与回撤分析')
- print('='*80)
- # 计算资金曲线
- t1_trades_sorted = t1_trades.sort_values('平仓时间')
- cumulative = [initial_capital]
- for pnl in t1_trades_sorted['盈亏金额']:
- cumulative.append(cumulative[-1] + pnl)
- # 计算最大回撤
- max_drawdown = 0
- max_drawdown_pct = 0
- peak = initial_capital
- peak_idx = 0
- drawdown_start = 0
- drawdown_end = 0
- for i, capital in enumerate(cumulative):
- if capital > peak:
- peak = capital
- peak_idx = i
- else:
- dd = peak - capital
- dd_pct = dd / peak * 100
- if dd_pct > max_drawdown_pct:
- max_drawdown = dd
- max_drawdown_pct = dd_pct
- drawdown_start = peak_idx
- drawdown_end = i
- print(f'\n资金曲线统计:')
- print(f' 期初资金: {initial_capital:,.0f}元')
- print(f' 期末资金: {cumulative[-1]:,.0f}元')
- print(f' 峰值资金: {max(cumulative):,.0f}元')
- print(f' 谷值资金: {min(cumulative):,.0f}元')
- print(f' 最大回撤: {max_drawdown:,.0f}元 ({max_drawdown_pct:.2f}%)')
- print(f' 回撤开始: 第{drawdown_start}笔交易')
- print(f' 回撤结束: 第{drawdown_end}笔交易')
- print(f' 恢复耗时: {drawdown_end - drawdown_start}笔交易')
- # 回撤期间分析
- if drawdown_start < len(t1_trades_sorted) and drawdown_end < len(t1_trades_sorted):
- dd_period = t1_trades_sorted.iloc[drawdown_start:drawdown_end+1]
- print(f'\n最大回撤期间特征:')
- print(f' 交易次数: {len(dd_period)}笔')
- print(f' 期间盈亏: {dd_period["盈亏金额"].sum():,.0f}元')
- print(f' 盈利笔数: {(dd_period["盈亏金额"] > 0).sum()}笔')
- print(f' 亏损笔数: {(dd_period["盈亏金额"] < 0).sum()}笔')
- print(f' 期间胜率: {(dd_period["盈亏金额"] > 0).mean()*100:.1f}%')
- # ========== 9. 多维度交叉分析 ==========
- print('\n' + '='*80)
- print('【9】多维度交叉分析')
- print('='*80)
- # 时间 + T+1调整
- cross_analysis = t1_trades.groupby(['开仓年份', 'T+1调整']).agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '持仓小时': 'mean'
- }).round(2)
- cross_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '平均持仓']
- print('\n年度 x T+1调整:')
- print(cross_analysis.to_string())
- # 价格位置 + 持仓时长
- if t1_trades['价格位置分类'].notna().sum() > 0 and t1_trades['持仓时长分类'].notna().sum() > 0:
- print('\n价格位置 x 持仓时长 (平均盈亏):')
- cross_pnl = pd.pivot_table(t1_trades, values='盈亏金额',
- index='价格位置分类',
- columns='持仓时长分类',
- aggfunc='mean')
- print(cross_pnl.round(0).to_string())
- # 波动率 + T+1调整
- if t1_trades['波动率分类'].notna().sum() > 0:
- print('\n波动率 x T+1调整 (平均盈亏):')
- cross_vol = pd.pivot_table(t1_trades, values='盈亏金额',
- index='波动率分类',
- columns='T+1调整',
- aggfunc='mean')
- print(cross_vol.round(0).to_string())
- # ========== 10. 亏损根因分析 ==========
- print('\n' + '='*80)
- print('【10】亏损根因深度分析')
- print('='*80)
- # 找出最差的交易
- worst_trades = t1_trades.nsmallest(10, '盈亏金额')
- print('\n亏损TOP10交易特征:')
- for idx, row in worst_trades.iterrows():
- print(f"\n #{idx+1} 亏损{row['盈亏金额']:,.0f}元")
- print(f" 时间: {row['开仓时间']} → {row['平仓时间']}")
- print(f" 持仓: {row['持仓小时']:.1f}小时")
- print(f" 退出: {row['退出原因']}")
- print(f" T+1调整: {row['T+1调整']}")
- # 亏损交易共性分析
- print('\n\n亏损交易共性:')
- loss_trades = t1_trades[t1_trades['盈亏金额'] < 0]
- # 退出类型分布
- loss_exit = loss_trades['退出类型'].value_counts()
- print('\n 退出类型分布:')
- for exit_type, count in loss_exit.head(5).items():
- pct = count / len(loss_trades) * 100
- avg_loss = loss_trades[loss_trades['退出类型'] == exit_type]['盈亏金额'].mean()
- print(f' {exit_type}: {count}笔 ({pct:.1f}%), 平均亏损{avg_loss:,.0f}元')
- # T+1调整分布
- loss_t1 = loss_trades['T+1调整'].value_counts()
- print('\n T+1调整分布:')
- for t1_type, count in loss_t1.items():
- pct = count / len(loss_trades) * 100
- avg_loss = loss_trades[loss_trades['T+1调整'] == t1_type]['盈亏金额'].mean()
- print(f' {t1_type}: {count}笔 ({pct:.1f}%), 平均亏损{avg_loss:,.0f}元')
- # 持仓时长分布
- loss_duration = loss_trades['持仓时长分类'].value_counts()
- print('\n 亏损交易持仓时长分布:')
- for duration, count in loss_duration.items():
- if pd.notna(duration):
- pct = count / len(loss_trades) * 100
- print(f' {duration}: {count}笔 ({pct:.1f}%)')
- # ========== 11. 盈利因子分析 ==========
- print('\n' + '='*80)
- print('【11】盈利交易成功因子')
- print('='*80)
- profit_trades = t1_trades[t1_trades['盈亏金额'] > 0]
- print('\n盈利交易特征:')
- # 退出类型分布
- profit_exit = profit_trades['退出类型'].value_counts()
- print('\n 退出类型分布:')
- for exit_type, count in profit_exit.head(5).items():
- pct = count / len(profit_trades) * 100
- avg_profit = profit_trades[profit_trades['退出类型'] == exit_type]['盈亏金额'].mean()
- print(f' {exit_type}: {count}笔 ({pct:.1f}%), 平均盈利{avg_profit:,.0f}元')
- # T+1调整分布
- profit_t1 = profit_trades['T+1调整'].value_counts()
- print('\n T+1调整分布:')
- for t1_type, count in profit_t1.items():
- pct = count / len(profit_trades) * 100
- avg_profit = profit_trades[profit_trades['T+1调整'] == t1_type]['盈亏金额'].mean()
- print(f' {t1_type}: {count}笔 ({pct:.1f}%), 平均盈利{avg_profit:,.0f}元')
- # 持仓时长分布
- profit_duration = profit_trades['持仓时长分类'].value_counts()
- print('\n 盈利交易持仓时长分布:')
- for duration, count in profit_duration.items():
- if pd.notna(duration):
- pct = count / len(profit_trades) * 100
- print(f' {duration}: {count}笔 ({pct:.1f}%)')
- # ========== 12. 综合改进建议 ==========
- print('\n' + '='*80)
- print('【12】综合改进建议')
- print('='*80)
- print("""
- 基于多维度分析,提出以下改进策略:
- 【A. 时间维度优化】
- 1. 避开13:00-14:00开仓 (胜率22.9%)
- 2. 周五减少或停止交易 (胜率32.8%)
- 3. 优选周二、周三交易
- 【B. 持仓管理优化】
- 1. 限制持仓时长 > 16小时的交易 (过夜风险)
- 2. T+1调整后增加风控: 若当日已盈利,尾盘不平仓
- 3. 高波动期(>35%)降低仓位50%
- 【C. 价格位置过滤】
- 1. 避免在极高位置(>80%)开仓做多
- 2. 优先在低位(<40%)开仓
- 3. 结合20日高低点判断趋势
- 【D. 止盈止损优化】
- 1. 盈利交易平均持仓较短,可考虑收紧止盈
- 2. 亏损交易多因止损触发,检查止损位置是否合理
- 3. 考虑移动止损保护利润
- 【E. 信号质量提升】
- 1. 过滤低成交量信号(<0.5倍均量)
- 2. 增加趋势确认信号,避免逆势交易
- 3. 连续亏损3笔后暂停交易
- 【F. 风险管理强化】
- 1. 单日亏损超过3万当日停止
- 2. 连续2天亏损后降低仓位至30%
- 3. 月度亏损超过10万暂停策略复盘
- """)
- print('\n' + '='*80)
- print('分析完成')
- print('='*80)
|