#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50 T+1 交易时间维度分析 """ import pandas as pd import numpy as np from datetime import datetime import warnings import sys warnings.filterwarnings('ignore') # Redirect stdout to suppress verbose output from io import StringIO old_stdout = sys.stdout sys.stdout = StringIO() from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor from t1_converter import simulate_t1_trades def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'): df = pd.read_csv(csv_file) df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True) for col in ['Open', 'High', 'Low', 'Close', 'Volume']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') df['Returns'] = df['Close'].pct_change() df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] df.ffill(inplace=True) df.dropna(inplace=True) return df # 运行回测 initial_capital = 1000000 raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv') config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) data_with_indicators = fetcher.calculate_intraday_indicators(raw_data) signal_generator = DualDirectionSignalGenerator() signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) executor = DualDirectionExecutor(initial_capital=initial_capital) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital) # Restore stdout sys.stdout = old_stdout # ============ 时间维度分析 ============ print('='*80) print('创业板50 T+1 交易时间维度深度分析') print('='*80) # 准备数据 t1_trades['开仓日期'] = pd.to_datetime(t1_trades['开仓时间']).dt.date t1_trades['开仓月份'] = pd.to_datetime(t1_trades['开仓时间']).dt.to_period('M') t1_trades['开仓季度'] = pd.to_datetime(t1_trades['开仓时间']).dt.to_period('Q') t1_trades['开仓年份'] = pd.to_datetime(t1_trades['开仓时间']).dt.year t1_trades['开仓小时'] = pd.to_datetime(t1_trades['开仓时间']).dt.hour t1_trades['星期几'] = pd.to_datetime(t1_trades['开仓时间']).dt.day_name() t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0 t1_trades['T+1调整'] = t1_trades['T+1调整'].fillna('否') # ========== 1. 年度分析 ========== print('\n' + '='*80) print('【1】年度表现分析') print('='*80) yearly_stats = t1_trades.groupby('开仓年份').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) yearly_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] yearly_stats['胜率'] = (yearly_stats['盈利次数'] / yearly_stats['交易次数'] * 100).round(1) yearly_stats['亏损次数'] = yearly_stats['交易次数'] - yearly_stats['盈利次数'] # 计算累计资金 cumulative = initial_capital yearly_stats['期初资金'] = 0.0 yearly_stats['期末资金'] = 0.0 for year in yearly_stats.index: yearly_stats.loc[year, '期初资金'] = float(cumulative) cumulative += yearly_stats.loc[year, '总盈亏'] yearly_stats.loc[year, '期末资金'] = float(cumulative) yearly_stats['年度收益率'] = ((yearly_stats['期末资金'] / yearly_stats['期初资金'] - 1) * 100).round(2) print(yearly_stats[['交易次数', '盈利次数', '亏损次数', '胜率', '总盈亏', '年度收益率']].to_string()) # ========== 2. 月度分析 ========== print('\n' + '='*80) print('【2】月度表现分析 (各年度月份对比)') print('='*80) monthly_stats = t1_trades.groupby(['开仓年份', pd.to_datetime(t1_trades['开仓时间']).dt.month]).agg({ '盈亏金额': ['count', 'sum'], '是否盈利': 'sum' }).round(2) monthly_stats.columns = ['交易次数', '总盈亏', '盈利次数'] monthly_stats['胜率'] = (monthly_stats['盈利次数'] / monthly_stats['交易次数'] * 100).round(1) monthly_stats = monthly_stats[monthly_stats['交易次数'] > 0] print('\n月度盈亏分布:') for year in sorted(t1_trades['开仓年份'].unique()): year_data = monthly_stats.loc[year] if year in monthly_stats.index else None if year_data is not None: print(f'\n{year}年:') print(year_data[['交易次数', '胜率', '总盈亏']].to_string()) # 找出最佳/最差月份 best_months = monthly_stats.nlargest(5, '总盈亏')[['交易次数', '胜率', '总盈亏']] worst_months = monthly_stats.nsmallest(5, '总盈亏')[['交易次数', '胜率', '总盈亏']] print('\n【最佳月份TOP5】') print(best_months.to_string()) print('\n【最差月份TOP5】') print(worst_months.to_string()) # ========== 3. T+1调整的影响分析 ========== print('\n' + '='*80) print('【3】T+1调整深度分析') print('='*80) # 按时间分析T+1调整 t1_by_year = t1_trades.groupby(['开仓年份', 'T+1调整']).agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) t1_by_year.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] t1_by_year['胜率'] = (t1_by_year['盈利次数'] / t1_by_year['交易次数'] * 100).round(1) print('\n按年度和T+1调整类型统计:') print(t1_by_year.to_string()) # 分析T+1调整的盈亏分布 print('\nT+1调整交易的盈亏分布:') t1_adj_trades = t1_trades[t1_trades['T+1调整'] == '是(T0→T1)'] if len(t1_adj_trades) > 0: bins = [-np.inf, -50000, -20000, -10000, -5000, 0, 5000, 10000, 20000, 50000, np.inf] labels = ['< -5万', '-5万~-2万', '-2万~-1万', '-1万~-5千', '-5千~0', '0~5千', '5千~1万', '1万~2万', '2万~5万', '> 5万'] t1_adj_trades['盈亏区间'] = pd.cut(t1_adj_trades['盈亏金额'], bins=bins, labels=labels) print(t1_adj_trades['盈亏区间'].value_counts().sort_index().to_string()) # ========== 4. 日内时间分析 ========== print('\n' + '='*80) print('【4】日内开仓时间分析') print('='*80) hourly_stats = t1_trades.groupby('开仓小时').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) hourly_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] hourly_stats['胜率'] = (hourly_stats['盈利次数'] / hourly_stats['交易次数'] * 100).round(1) print('\n按小时统计:') print(hourly_stats[['交易次数', '胜率', '平均盈亏', '总盈亏']].to_string()) # ========== 5. 星期分析 ========== print('\n' + '='*80) print('【5】星期效应分析') print('='*80) # 星期顺序 weekday_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'] weekday_names = {'Monday': '周一', 'Tuesday': '周二', 'Wednesday': '周三', 'Thursday': '周四', 'Friday': '周五'} weekday_stats = t1_trades.groupby('星期几').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) weekday_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] weekday_stats['胜率'] = (weekday_stats['盈利次数'] / weekday_stats['交易次数'] * 100).round(1) # 按正确顺序排列 weekday_stats = weekday_stats.reindex([d for d in weekday_order if d in weekday_stats.index]) weekday_stats.index = [weekday_names.get(d, d) for d in weekday_stats.index] print(weekday_stats[['交易次数', '胜率', '平均盈亏', '总盈亏']].to_string()) # ========== 6. 连续亏损分析 ========== print('\n' + '='*80) print('【6】连续交易表现分析') print('='*80) # 计算连续盈亏 t1_trades_sorted = t1_trades.sort_values('开仓时间').reset_index(drop=True) t1_trades_sorted['连续盈亏'] = 0 current_streak = 0 streak_type = None streaks = [] current_streak_info = {'类型': None, '长度': 0, '盈亏': 0, '开始': None, '结束': None} for i, row in t1_trades_sorted.iterrows(): is_profit = row['盈亏金额'] > 0 if current_streak_info['类型'] is None: current_streak_info['类型'] = '盈利' if is_profit else '亏损' current_streak_info['开始'] = row['开仓时间'] if (is_profit and current_streak_info['类型'] == '盈利') or (not is_profit and current_streak_info['类型'] == '亏损'): current_streak_info['长度'] += 1 current_streak_info['盈亏'] += row['盈亏金额'] current_streak_info['结束'] = row['平仓时间'] else: streaks.append(current_streak_info.copy()) current_streak_info = { '类型': '盈利' if is_profit else '亏损', '长度': 1, '盈亏': row['盈亏金额'], '开始': row['开仓时间'], '结束': row['平仓时间'] } if current_streak_info['类型'] is not None: streaks.append(current_streak_info) streaks_df = pd.DataFrame(streaks) # 分析连续亏损 loss_streaks = streaks_df[streaks_df['类型'] == '亏损'].copy() if len(loss_streaks) > 0: print(f'\n连续亏损统计:') print(f' 总次数: {len(loss_streaks)}次') print(f' 平均长度: {loss_streaks["长度"].mean():.1f}笔') print(f' 最大连续亏损: {loss_streaks["长度"].max()}笔') print(f' 最长连续亏损详情:') worst_streak = loss_streaks.loc[loss_streaks['长度'].idxmax()] print(f' 时间: {worst_streak["开始"]} ~ {worst_streak["结束"]}') print(f' 连续{worst_streak["长度"]}笔亏损, 总亏损{worst_streak["盈亏"]:,.0f}元') print('\n连续亏损TOP5:') worst5 = loss_streaks.nsmallest(5, '盈亏')[['开始', '结束', '长度', '盈亏']] print(worst5.to_string(index=False)) # ========== 7. 市场环境分析 ========== print('\n' + '='*80) print('【7】市场环境与交易表现') print('='*80) # 计算月度市场收益率 monthly_market = data_with_indicators.resample('ME')['Close'].agg(['first', 'last']) monthly_market['月收益率'] = (monthly_market['last'] / monthly_market['first'] - 1) * 100 # 合并交易数据 t1_trades['月份'] = pd.to_datetime(t1_trades['开仓时间']).dt.to_period('M') monthly_trade_perf = t1_trades.groupby('月份')['盈亏金额'].sum() # 找出市场大跌月份 bear_months = monthly_market[monthly_market['月收益率'] < -5].index print(f'\n市场大跌月份(跌幅>5%): {len(bear_months)}个') for m in bear_months[:5]: if m in monthly_trade_perf.index: print(f' {m}: 市场{monthly_market.loc[m, "月收益率"]:.1f}%, 策略{monthly_trade_perf[m]:+,.0f}元') # ========== 8. 亏损交易特征分析 ========== print('\n' + '='*80) print('【8】亏损交易深度分析') print('='*80) loss_trades = t1_trades[t1_trades['盈亏金额'] < 0].copy() if len(loss_trades) > 0: print(f'\n亏损交易总数: {len(loss_trades)}笔') print(f'总亏损金额: {loss_trades["盈亏金额"].sum():,.0f}元') print(f'平均单笔亏损: {loss_trades["盈亏金额"].mean():,.0f}元') print(f'最大单笔亏损: {loss_trades["盈亏金额"].min():,.0f}元') # 按T+1调整分析亏损 loss_by_t1 = loss_trades.groupby('T+1调整').agg({ '盈亏金额': ['count', 'sum', 'mean'] }).round(2) loss_by_t1.columns = ['亏损笔数', '总亏损', '平均亏损'] print('\n按T+1调整分类的亏损:') print(loss_by_t1.to_string()) # 年度亏损分布 loss_by_year = loss_trades.groupby('开仓年份')['盈亏金额'].sum() print('\n年度亏损分布:') print(loss_by_year.to_string()) # ========== 9. 改进建议 ========== print('\n' + '='*80) print('【9】改进建议') print('='*80) print(""" 基于以上分析,提出以下改进方向: 1. 【T+1规则适应】 - 当前76笔T+1调整交易亏损26.9万,是主要亏损来源 - 建议: 增加隔夜风险判断,避免在尾盘开仓 - 建议: 降低T+1场景的仓位或暂停交易 2. 【时间过滤】 - 分析显示某些月份/时段表现持续不佳 - 建议: 在已知的大跌月份降低仓位或空仓 - 建议: 优化开仓时间窗口 3. 【连续亏损保护】 - 检测到连续亏损模式 - 建议: 连续亏损3笔后暂停交易或降低仓位50% - 建议: 单日亏损超过阈值时当日停止开新仓 4. 【市场环境过滤】 - 大跌月份亏损严重 - 建议: 增加趋势过滤器,下跌趋势中减少交易频率 - 建议: 结合大盘指数趋势进行交易决策 5. 【止盈止损优化】 - 当前胜率40.8%偏低,盈亏比0.92<1 - 建议: 优化止盈止损比例,提高盈亏比 - 建议: 考虑移动止损保护盈利 """) print('\n' + '='*80) print('分析完成') print('='*80)