| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50 T+1 交易时间维度分析
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime
- import warnings
- import sys
- warnings.filterwarnings('ignore')
- # Redirect stdout to suppress verbose output
- from io import StringIO
- old_stdout = sys.stdout
- sys.stdout = StringIO()
- from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor
- from t1_converter import simulate_t1_trades
- def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
- df = pd.read_csv(csv_file)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
- for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- return df
- # 运行回测
- initial_capital = 1000000
- raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- data_with_indicators = fetcher.calculate_intraday_indicators(raw_data)
- signal_generator = DualDirectionSignalGenerator()
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- executor = DualDirectionExecutor(initial_capital=initial_capital)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital)
- # Restore stdout
- sys.stdout = old_stdout
- # ============ 时间维度分析 ============
- print('='*80)
- print('创业板50 T+1 交易时间维度深度分析')
- print('='*80)
- # 准备数据
- t1_trades['开仓日期'] = pd.to_datetime(t1_trades['开仓时间']).dt.date
- t1_trades['开仓月份'] = pd.to_datetime(t1_trades['开仓时间']).dt.to_period('M')
- t1_trades['开仓季度'] = pd.to_datetime(t1_trades['开仓时间']).dt.to_period('Q')
- t1_trades['开仓年份'] = pd.to_datetime(t1_trades['开仓时间']).dt.year
- t1_trades['开仓小时'] = pd.to_datetime(t1_trades['开仓时间']).dt.hour
- t1_trades['星期几'] = pd.to_datetime(t1_trades['开仓时间']).dt.day_name()
- t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0
- t1_trades['T+1调整'] = t1_trades['T+1调整'].fillna('否')
- # ========== 1. 年度分析 ==========
- print('\n' + '='*80)
- print('【1】年度表现分析')
- print('='*80)
- yearly_stats = t1_trades.groupby('开仓年份').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- yearly_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- yearly_stats['胜率'] = (yearly_stats['盈利次数'] / yearly_stats['交易次数'] * 100).round(1)
- yearly_stats['亏损次数'] = yearly_stats['交易次数'] - yearly_stats['盈利次数']
- # 计算累计资金
- cumulative = initial_capital
- yearly_stats['期初资金'] = 0.0
- yearly_stats['期末资金'] = 0.0
- for year in yearly_stats.index:
- yearly_stats.loc[year, '期初资金'] = float(cumulative)
- cumulative += yearly_stats.loc[year, '总盈亏']
- yearly_stats.loc[year, '期末资金'] = float(cumulative)
- yearly_stats['年度收益率'] = ((yearly_stats['期末资金'] / yearly_stats['期初资金'] - 1) * 100).round(2)
- print(yearly_stats[['交易次数', '盈利次数', '亏损次数', '胜率', '总盈亏', '年度收益率']].to_string())
- # ========== 2. 月度分析 ==========
- print('\n' + '='*80)
- print('【2】月度表现分析 (各年度月份对比)')
- print('='*80)
- monthly_stats = t1_trades.groupby(['开仓年份', pd.to_datetime(t1_trades['开仓时间']).dt.month]).agg({
- '盈亏金额': ['count', 'sum'],
- '是否盈利': 'sum'
- }).round(2)
- monthly_stats.columns = ['交易次数', '总盈亏', '盈利次数']
- monthly_stats['胜率'] = (monthly_stats['盈利次数'] / monthly_stats['交易次数'] * 100).round(1)
- monthly_stats = monthly_stats[monthly_stats['交易次数'] > 0]
- print('\n月度盈亏分布:')
- for year in sorted(t1_trades['开仓年份'].unique()):
- year_data = monthly_stats.loc[year] if year in monthly_stats.index else None
- if year_data is not None:
- print(f'\n{year}年:')
- print(year_data[['交易次数', '胜率', '总盈亏']].to_string())
- # 找出最佳/最差月份
- best_months = monthly_stats.nlargest(5, '总盈亏')[['交易次数', '胜率', '总盈亏']]
- worst_months = monthly_stats.nsmallest(5, '总盈亏')[['交易次数', '胜率', '总盈亏']]
- print('\n【最佳月份TOP5】')
- print(best_months.to_string())
- print('\n【最差月份TOP5】')
- print(worst_months.to_string())
- # ========== 3. T+1调整的影响分析 ==========
- print('\n' + '='*80)
- print('【3】T+1调整深度分析')
- print('='*80)
- # 按时间分析T+1调整
- t1_by_year = t1_trades.groupby(['开仓年份', 'T+1调整']).agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- t1_by_year.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- t1_by_year['胜率'] = (t1_by_year['盈利次数'] / t1_by_year['交易次数'] * 100).round(1)
- print('\n按年度和T+1调整类型统计:')
- print(t1_by_year.to_string())
- # 分析T+1调整的盈亏分布
- print('\nT+1调整交易的盈亏分布:')
- t1_adj_trades = t1_trades[t1_trades['T+1调整'] == '是(T0→T1)']
- if len(t1_adj_trades) > 0:
- bins = [-np.inf, -50000, -20000, -10000, -5000, 0, 5000, 10000, 20000, 50000, np.inf]
- labels = ['< -5万', '-5万~-2万', '-2万~-1万', '-1万~-5千', '-5千~0',
- '0~5千', '5千~1万', '1万~2万', '2万~5万', '> 5万']
- t1_adj_trades['盈亏区间'] = pd.cut(t1_adj_trades['盈亏金额'], bins=bins, labels=labels)
- print(t1_adj_trades['盈亏区间'].value_counts().sort_index().to_string())
- # ========== 4. 日内时间分析 ==========
- print('\n' + '='*80)
- print('【4】日内开仓时间分析')
- print('='*80)
- hourly_stats = t1_trades.groupby('开仓小时').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- hourly_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- hourly_stats['胜率'] = (hourly_stats['盈利次数'] / hourly_stats['交易次数'] * 100).round(1)
- print('\n按小时统计:')
- print(hourly_stats[['交易次数', '胜率', '平均盈亏', '总盈亏']].to_string())
- # ========== 5. 星期分析 ==========
- print('\n' + '='*80)
- print('【5】星期效应分析')
- print('='*80)
- # 星期顺序
- weekday_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday']
- weekday_names = {'Monday': '周一', 'Tuesday': '周二', 'Wednesday': '周三',
- 'Thursday': '周四', 'Friday': '周五'}
- weekday_stats = t1_trades.groupby('星期几').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- weekday_stats.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- weekday_stats['胜率'] = (weekday_stats['盈利次数'] / weekday_stats['交易次数'] * 100).round(1)
- # 按正确顺序排列
- weekday_stats = weekday_stats.reindex([d for d in weekday_order if d in weekday_stats.index])
- weekday_stats.index = [weekday_names.get(d, d) for d in weekday_stats.index]
- print(weekday_stats[['交易次数', '胜率', '平均盈亏', '总盈亏']].to_string())
- # ========== 6. 连续亏损分析 ==========
- print('\n' + '='*80)
- print('【6】连续交易表现分析')
- print('='*80)
- # 计算连续盈亏
- t1_trades_sorted = t1_trades.sort_values('开仓时间').reset_index(drop=True)
- t1_trades_sorted['连续盈亏'] = 0
- current_streak = 0
- streak_type = None
- streaks = []
- current_streak_info = {'类型': None, '长度': 0, '盈亏': 0, '开始': None, '结束': None}
- for i, row in t1_trades_sorted.iterrows():
- is_profit = row['盈亏金额'] > 0
- if current_streak_info['类型'] is None:
- current_streak_info['类型'] = '盈利' if is_profit else '亏损'
- current_streak_info['开始'] = row['开仓时间']
- if (is_profit and current_streak_info['类型'] == '盈利') or (not is_profit and current_streak_info['类型'] == '亏损'):
- current_streak_info['长度'] += 1
- current_streak_info['盈亏'] += row['盈亏金额']
- current_streak_info['结束'] = row['平仓时间']
- else:
- streaks.append(current_streak_info.copy())
- current_streak_info = {
- '类型': '盈利' if is_profit else '亏损',
- '长度': 1,
- '盈亏': row['盈亏金额'],
- '开始': row['开仓时间'],
- '结束': row['平仓时间']
- }
- if current_streak_info['类型'] is not None:
- streaks.append(current_streak_info)
- streaks_df = pd.DataFrame(streaks)
- # 分析连续亏损
- loss_streaks = streaks_df[streaks_df['类型'] == '亏损'].copy()
- if len(loss_streaks) > 0:
- print(f'\n连续亏损统计:')
- print(f' 总次数: {len(loss_streaks)}次')
- print(f' 平均长度: {loss_streaks["长度"].mean():.1f}笔')
- print(f' 最大连续亏损: {loss_streaks["长度"].max()}笔')
- print(f' 最长连续亏损详情:')
- worst_streak = loss_streaks.loc[loss_streaks['长度'].idxmax()]
- print(f' 时间: {worst_streak["开始"]} ~ {worst_streak["结束"]}')
- print(f' 连续{worst_streak["长度"]}笔亏损, 总亏损{worst_streak["盈亏"]:,.0f}元')
- print('\n连续亏损TOP5:')
- worst5 = loss_streaks.nsmallest(5, '盈亏')[['开始', '结束', '长度', '盈亏']]
- print(worst5.to_string(index=False))
- # ========== 7. 市场环境分析 ==========
- print('\n' + '='*80)
- print('【7】市场环境与交易表现')
- print('='*80)
- # 计算月度市场收益率
- monthly_market = data_with_indicators.resample('ME')['Close'].agg(['first', 'last'])
- monthly_market['月收益率'] = (monthly_market['last'] / monthly_market['first'] - 1) * 100
- # 合并交易数据
- t1_trades['月份'] = pd.to_datetime(t1_trades['开仓时间']).dt.to_period('M')
- monthly_trade_perf = t1_trades.groupby('月份')['盈亏金额'].sum()
- # 找出市场大跌月份
- bear_months = monthly_market[monthly_market['月收益率'] < -5].index
- print(f'\n市场大跌月份(跌幅>5%): {len(bear_months)}个')
- for m in bear_months[:5]:
- if m in monthly_trade_perf.index:
- print(f' {m}: 市场{monthly_market.loc[m, "月收益率"]:.1f}%, 策略{monthly_trade_perf[m]:+,.0f}元')
- # ========== 8. 亏损交易特征分析 ==========
- print('\n' + '='*80)
- print('【8】亏损交易深度分析')
- print('='*80)
- loss_trades = t1_trades[t1_trades['盈亏金额'] < 0].copy()
- if len(loss_trades) > 0:
- print(f'\n亏损交易总数: {len(loss_trades)}笔')
- print(f'总亏损金额: {loss_trades["盈亏金额"].sum():,.0f}元')
- print(f'平均单笔亏损: {loss_trades["盈亏金额"].mean():,.0f}元')
- print(f'最大单笔亏损: {loss_trades["盈亏金额"].min():,.0f}元')
- # 按T+1调整分析亏损
- loss_by_t1 = loss_trades.groupby('T+1调整').agg({
- '盈亏金额': ['count', 'sum', 'mean']
- }).round(2)
- loss_by_t1.columns = ['亏损笔数', '总亏损', '平均亏损']
- print('\n按T+1调整分类的亏损:')
- print(loss_by_t1.to_string())
- # 年度亏损分布
- loss_by_year = loss_trades.groupby('开仓年份')['盈亏金额'].sum()
- print('\n年度亏损分布:')
- print(loss_by_year.to_string())
- # ========== 9. 改进建议 ==========
- print('\n' + '='*80)
- print('【9】改进建议')
- print('='*80)
- print("""
- 基于以上分析,提出以下改进方向:
- 1. 【T+1规则适应】
- - 当前76笔T+1调整交易亏损26.9万,是主要亏损来源
- - 建议: 增加隔夜风险判断,避免在尾盘开仓
- - 建议: 降低T+1场景的仓位或暂停交易
- 2. 【时间过滤】
- - 分析显示某些月份/时段表现持续不佳
- - 建议: 在已知的大跌月份降低仓位或空仓
- - 建议: 优化开仓时间窗口
- 3. 【连续亏损保护】
- - 检测到连续亏损模式
- - 建议: 连续亏损3笔后暂停交易或降低仓位50%
- - 建议: 单日亏损超过阈值时当日停止开新仓
- 4. 【市场环境过滤】
- - 大跌月份亏损严重
- - 建议: 增加趋势过滤器,下跌趋势中减少交易频率
- - 建议: 结合大盘指数趋势进行交易决策
- 5. 【止盈止损优化】
- - 当前胜率40.8%偏低,盈亏比0.92<1
- - 建议: 优化止盈止损比例,提高盈亏比
- - 建议: 考虑移动止损保护盈利
- """)
- print('\n' + '='*80)
- print('分析完成')
- print('='*80)
|