| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- CYB50 T+1 回测引擎 - 正确的资金管理版本 V2
- 核心逻辑:
- 1. 按时间顺序处理所有信号
- 2. 维护持仓状态(是否持仓、持仓期间)
- 3. 当新信号的开仓时间落在已有持仓期间时,跳过该信号
- 4. 正确处理T+1延期平仓
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import sys
- sys.path.insert(0, '/root/.openclaw/workspace/cat-fly')
- from cyb50_30min_dual_direction import (
- ConfigManager, IntradayDataFetcher,
- DualDirectionSignalGenerator, DualDirectionExecutor
- )
- def simulate_t1_trades_v2(data_df, long_trades_df, initial_capital=1000000):
- """
- 正确的T+1回测 V2 - 正确处理时间重叠
-
- 参数:
- data_df: 包含所有价格数据的DataFrame
- long_trades_df: 原始做多交易列表(作为信号源)
- initial_capital: 初始资金
-
- 返回:
- t1_trades_df: T+1规则下实际执行的交易
- """
- print("\n" + "="*80)
- print("T+1回测引擎 V2 - 正确处理时间重叠")
- print("="*80)
-
- if len(long_trades_df) == 0:
- print("没有做多交易记录")
- return pd.DataFrame()
-
- # 准备信号列表
- signals = []
- for idx, trade in long_trades_df.iterrows():
- signals.append({
- 'signal_id': idx + 1,
- 'entry_time': trade['开仓时间'],
- 'entry_price': trade['开仓价格'],
- 'exit_time': trade['平仓时间'],
- 'exit_price': trade['平仓价格'],
- 'exit_reason': trade['退出原因'],
- 'entry_signals': trade.get('入场信号', '')
- })
-
- # 按开仓时间排序
- signals = sorted(signals, key=lambda x: x['entry_time'])
-
- t1_trades = []
- capital = initial_capital
-
- # 当前持仓状态
- current_position = None # None 或 {'entry_time', 'entry_price', 'can_sell_time', ...}
-
- print(f"\n初始资金: {initial_capital:,.0f}元")
- print(f"总信号数: {len(signals)}笔")
- print("\n" + "-"*80)
-
- for signal in signals:
- sig_entry = signal['entry_time']
- sig_exit = signal['exit_time']
-
- # 检查是否有持仓
- if current_position is not None:
- # 检查新信号是否在持仓期间
- hold_end = current_position['actual_exit_time']
-
- if sig_entry < hold_end:
- # 新信号在持仓期间,跳过
- print(f"\n[跳过] 信号 #{signal['signal_id']}: {sig_entry.strftime('%m-%d %H:%M')}")
- print(f" 原因: 当前持仓中 (持仓期: {current_position['entry_time'].strftime('%m-%d %H:%M')} → {hold_end.strftime('%m-%d %H:%M')})")
- continue
- else:
- # 新信号在持仓结束后,先结算当前持仓
- print(f"\n[结算前持仓] 信号 #{signal['signal_id']} 到来时,前持仓已结束")
- # 持仓已经结束,可以开新仓
- current_position = None
-
- # 现在可以开仓
- entry_time = sig_entry
- entry_price = signal['entry_price']
-
- # 计算可卖时间(T+1规则)
- can_sell_time = entry_time + timedelta(days=1)
- can_sell_time = can_sell_time.replace(hour=9, minute=30)
-
- # 确定实际平仓时间
- # 原始平仓时间 vs T+1最早可卖时间,取较晚者
- actual_exit_time = max(sig_exit, can_sell_time)
-
- # 查找实际平仓价格
- if actual_exit_time > sig_exit:
- # 需要延期,查找数据
- future_data = data_df[data_df.index >= actual_exit_time]
- if len(future_data) > 0:
- actual_exit_time = future_data.index[0]
- actual_exit_price = future_data.iloc[0]['Open']
- t1_adjusted = True
- else:
- # 无后续数据,使用原始
- actual_exit_time = sig_exit
- actual_exit_price = signal['exit_price']
- t1_adjusted = False
- else:
- actual_exit_price = signal['exit_price']
- t1_adjusted = False
-
- # 计算仓位(使用全部资金)
- position_size = int(capital / entry_price)
- if position_size <= 0:
- print(f"\n[跳过] 信号 #{signal['signal_id']}: {entry_time.strftime('%m-%d %H:%M')} - 资金不足")
- continue
-
- # 记录持仓
- current_position = {
- 'entry_time': entry_time,
- 'entry_price': entry_price,
- 'position_size': position_size,
- 'actual_exit_time': actual_exit_time,
- 'actual_exit_price': actual_exit_price,
- 't1_adjusted': t1_adjusted,
- 'original_exit_time': sig_exit,
- 'original_exit_price': signal['exit_price'],
- 'exit_reason': signal['exit_reason'],
- 'entry_signals': signal['entry_signals']
- }
-
- # 计算盈亏
- gross_pnl = (actual_exit_price - entry_price) * position_size
- pnl_pct = (actual_exit_price - entry_price) / entry_price * 100
-
- # 更新资金
- capital += gross_pnl
-
- # 记录交易
- # 计算原始盈亏(假设按原始时间平仓)
- if t1_adjusted:
- original_pnl = (signal['exit_price'] - entry_price) * position_size
- pnl_change = gross_pnl - original_pnl
- # 为T+1延期交易生成新的退出原因(基于实际平仓价格)
- stop_loss = entry_price * 0.992
- take_profit = entry_price * 1.02
- if actual_exit_price <= stop_loss:
- exit_reason = f"T+1止损(价格{actual_exit_price:.2f}触及止损线{stop_loss:.2f},实际亏损{abs(pnl_pct):.2f}%)"
- elif actual_exit_price >= take_profit:
- exit_reason = f"T+1止盈(价格{actual_exit_price:.2f}触及止盈线{take_profit:.2f},实际盈利{pnl_pct:.2f}%)"
- else:
- exit_reason = f"T+1平仓(价格{actual_exit_price:.2f},盈亏{pnl_pct:+.2f}%)"
- else:
- original_pnl = gross_pnl
- pnl_change = 0
- exit_reason = signal['exit_reason']
-
- trade_record = {
- '交易方向': '做多',
- '开仓时间': entry_time,
- '平仓时间': actual_exit_time,
- '开仓价格': entry_price,
- '平仓价格': actual_exit_price,
- '仓位': position_size,
- '盈亏金额': gross_pnl,
- '盈亏百分比': pnl_pct,
- '退出原因': exit_reason,
- '持仓周期数': int((actual_exit_time - entry_time).total_seconds() / 1800),
- '持仓小时数': (actual_exit_time - entry_time).total_seconds() / 3600,
- 'T+1调整': '是(T0→T1)' if t1_adjusted else '否',
- '原平仓时间': sig_exit,
- '原平仓价格': signal['exit_price'],
- '原盈亏': original_pnl,
- '盈亏变化': pnl_change,
- '入场信号': signal['entry_signals'],
- '平仓时资金': capital,
- }
- t1_trades.append(trade_record)
-
- # 打印
- status = "✅盈利" if gross_pnl > 0 else "❌亏损"
- adj_str = "[T+1调整] " if t1_adjusted else ""
- print(f"\n[执行] 信号 #{signal['signal_id']}: {entry_time.strftime('%m-%d %H:%M')} → {actual_exit_time.strftime('%m-%d %H:%M')}")
- print(f" {adj_str}价格: {entry_price:.2f} → {actual_exit_price:.2f}")
- print(f" 盈亏: {gross_pnl:+,.0f}元 ({pnl_pct:+.2f}%) {status}")
- print(f" 资金: {capital:,.0f}元")
-
- # 处理最后一笔持仓(如果数据结束前未平仓)
- if current_position is not None and current_position['actual_exit_time'] > data_df.index[-1]:
- final_price = data_df.iloc[-1]['Close']
- final_time = data_df.index[-1]
-
- entry_time = current_position['entry_time']
- entry_price = current_position['entry_price']
- position_size = current_position['position_size']
-
- gross_pnl = (final_price - entry_price) * position_size
- pnl_pct = (final_price - entry_price) / entry_price * 100
- capital += gross_pnl
-
- trade_record = {
- '交易方向': '做多',
- '开仓时间': entry_time,
- '平仓时间': final_time,
- '开仓价格': entry_price,
- '平仓价格': final_price,
- '仓位': position_size,
- '盈亏金额': gross_pnl,
- '盈亏百分比': pnl_pct,
- '退出原因': '回测强制平仓',
- '持仓周期数': int((final_time - entry_time).total_seconds() / 1800),
- '持仓小时数': (final_time - entry_time).total_seconds() / 3600,
- 'T+1调整': '否',
- '原平仓时间': final_time,
- '原平仓价格': final_price,
- '原盈亏': gross_pnl,
- '盈亏变化': 0,
- '入场信号': current_position['entry_signals'],
- '平仓时资金': capital,
- }
- t1_trades.append(trade_record)
-
- print(f"\n[强制平仓] {final_time.strftime('%m-%d %H:%M')} @ {final_price:.2f}")
- print(f" 盈亏: {gross_pnl:+,.0f}元")
-
- # 生成结果
- t1_trades_df = pd.DataFrame(t1_trades)
-
- print("\n" + "="*80)
- print("T+1回测 V2 完成")
- print("="*80)
- print(f"原始信号数: {len(signals)}笔")
- print(f"实际执行: {len(t1_trades)}笔")
- print(f"跳过信号: {len(signals) - len(t1_trades)}笔")
- print(f"最终资金: {capital:,.0f}元")
- print(f"总收益率: {(capital/initial_capital-1)*100:+.2f}%")
-
- return t1_trades_df
- def compare_results(original_trades, t1_trades, initial_capital=1000000):
- """对比原始交易和T+1交易"""
- print("\n" + "="*80)
- print("T+1转换前后对比")
- print("="*80)
-
- # 原始统计
- original_total_pnl = original_trades['盈亏金额'].sum()
- original_final = initial_capital + original_total_pnl
- original_return = (original_final / initial_capital - 1) * 100
- original_win_rate = (original_trades['盈亏金额'] > 0).sum() / len(original_trades) * 100
-
- # T+1统计
- t1_total_pnl = t1_trades['盈亏金额'].sum() if len(t1_trades) > 0 else 0
- t1_final = initial_capital + t1_total_pnl
- t1_return = (t1_final / initial_capital - 1) * 100
- t1_win_rate = (t1_trades['盈亏金额'] > 0).sum() / len(t1_trades) * 100 if len(t1_trades) > 0 else 0
-
- # T0交易统计
- t0_adjusted = t1_trades[t1_trades['T+1调整'] == '是(T0→T1)'] if len(t1_trades) > 0 else pd.DataFrame()
-
- print(f"\n【原始交易(T0规则)】")
- print(f" 交易次数: {len(original_trades)}")
- print(f" 总盈亏: {original_total_pnl:+,.2f}元")
- print(f" 最终资金: {original_final:,.2f}元")
- print(f" 收益率: {original_return:+.2f}%")
- print(f" 胜率: {original_win_rate:.1f}%")
-
- print(f"\n【T+1转换后】")
- print(f" 交易次数: {len(t1_trades)}")
- print(f" 总盈亏: {t1_total_pnl:+,.2f}元")
- print(f" 最终资金: {t1_final:,.2f}元")
- print(f" 收益率: {t1_return:+.2f}%")
- print(f" 胜率: {t1_win_rate:.1f}%")
-
- print(f"\n【T+1调整统计】")
- print(f" T0→T1调整交易数: {len(t0_adjusted)}笔")
- if len(t0_adjusted) > 0:
- print(f" 调整后盈亏变化: {t0_adjusted['盈亏变化'].sum():+,.2f}元")
- print(f" 平均每笔变化: {t0_adjusted['盈亏变化'].mean():+,.2f}元")
-
- print(f"\n【收益差异】")
- print(f" 收益率变化: {(t1_return - original_return):+.2f}%")
- print(f" 绝对盈亏差: {(t1_total_pnl - original_total_pnl):+,.2f}元")
- def main():
- """主程序 - 测试V2版本"""
- print("="*80)
- print("CYB50 T+1 回测引擎 V2")
- print("="*80)
-
- initial_capital = 1000000
-
- # 获取数据和原始交易
- print("\n【步骤1】获取数据...")
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
-
- end_date = datetime.now()
- start_date = end_date - timedelta(days=70)
-
- raw_data = fetcher.fetch_30min_data(start_date, end_date)
- data_with_indicators = fetcher.calculate_intraday_indicators(raw_data)
-
- signal_generator = DualDirectionSignalGenerator()
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
-
- executor = DualDirectionExecutor(initial_capital=initial_capital)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
-
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- print(f"✅ 原始做多信号: {len(long_trades)}笔")
-
- # 运行V2回测
- print("\n【步骤2】运行T+1回测 V2...")
- t1_trades = simulate_t1_trades_v2(data_with_indicators, long_trades, initial_capital)
-
- # 对比
- print("\n【步骤3】对比...")
- orig_pnl = long_trades['盈亏金额'].sum()
- t1_pnl = t1_trades['盈亏金额'].sum() if len(t1_trades) > 0 else 0
-
- print(f"原始交易: {len(long_trades)}笔, 盈亏 {orig_pnl:+,.0f}元")
- print(f"T+1交易: {len(t1_trades)}笔, 盈亏 {t1_pnl:+,.0f}元")
- print(f"差异: {t1_pnl - orig_pnl:+,.0f}元")
-
- # 检查时间重叠
- if len(t1_trades) > 0:
- print("\n【步骤4】验证时间无重叠...")
- prev_exit = None
- overlap = 0
- for _, row in t1_trades.iterrows():
- if prev_exit is not None and row['开仓时间'] < prev_exit:
- overlap += 1
- print(f" ⚠️ 重叠: {row['开仓时间'].strftime('%m-%d %H:%M')} 早于前笔平仓 {prev_exit.strftime('%m-%d %H:%M')}")
- prev_exit = row['平仓时间']
-
- if overlap == 0:
- print(" ✅ 无时间重叠")
- else:
- print(f" ❌ 发现 {overlap} 处重叠")
-
- # 保存
- if len(t1_trades) > 0:
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- output_file = f'cyb50_t1_v2_{timestamp}.csv'
-
- export_df = t1_trades.copy()
- for col in ['开仓时间', '平仓时间']:
- export_df[col] = export_df[col].dt.strftime('%Y-%m-%d %H:%M:%S')
-
- export_df.to_csv(output_file, index=False, encoding='utf-8-sig')
- print(f"\n✅ 已保存: {output_file}")
-
- print("\n" + "="*80)
- print("完成!")
- print("="*80)
- if __name__ == "__main__":
- main()
|