#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 T+1 回测引擎 - 正确的资金管理版本 V2 核心逻辑: 1. 按时间顺序处理所有信号 2. 维护持仓状态(是否持仓、持仓期间) 3. 当新信号的开仓时间落在已有持仓期间时,跳过该信号 4. 正确处理T+1延期平仓 """ import pandas as pd import numpy as np from datetime import datetime, timedelta import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) def simulate_t1_trades_v2(data_df, long_trades_df, initial_capital=1000000): """ 正确的T+1回测 V2 - 正确处理时间重叠 参数: data_df: 包含所有价格数据的DataFrame long_trades_df: 原始做多交易列表(作为信号源) initial_capital: 初始资金 返回: t1_trades_df: T+1规则下实际执行的交易 """ print("\n" + "="*80) print("T+1回测引擎 V2 - 正确处理时间重叠") print("="*80) if len(long_trades_df) == 0: print("没有做多交易记录") return pd.DataFrame() # 准备信号列表 signals = [] for idx, trade in long_trades_df.iterrows(): signals.append({ 'signal_id': idx + 1, 'entry_time': trade['开仓时间'], 'entry_price': trade['开仓价格'], 'exit_time': trade['平仓时间'], 'exit_price': trade['平仓价格'], 'exit_reason': trade['退出原因'], 'entry_signals': trade.get('入场信号', '') }) # 按开仓时间排序 signals = sorted(signals, key=lambda x: x['entry_time']) t1_trades = [] capital = initial_capital # 当前持仓状态 current_position = None # None 或 {'entry_time', 'entry_price', 'can_sell_time', ...} print(f"\n初始资金: {initial_capital:,.0f}元") print(f"总信号数: {len(signals)}笔") print("\n" + "-"*80) for signal in signals: sig_entry = signal['entry_time'] sig_exit = signal['exit_time'] # 检查是否有持仓 if current_position is not None: # 检查新信号是否在持仓期间 hold_end = current_position['actual_exit_time'] if sig_entry < hold_end: # 新信号在持仓期间,跳过 print(f"\n[跳过] 信号 #{signal['signal_id']}: {sig_entry.strftime('%m-%d %H:%M')}") print(f" 原因: 当前持仓中 (持仓期: {current_position['entry_time'].strftime('%m-%d %H:%M')} → {hold_end.strftime('%m-%d %H:%M')})") continue else: # 新信号在持仓结束后,先结算当前持仓 print(f"\n[结算前持仓] 信号 #{signal['signal_id']} 到来时,前持仓已结束") # 持仓已经结束,可以开新仓 current_position = None # 现在可以开仓 entry_time = sig_entry entry_price = signal['entry_price'] # 计算可卖时间(T+1规则) can_sell_time = entry_time + timedelta(days=1) can_sell_time = can_sell_time.replace(hour=9, minute=30) # 确定实际平仓时间 # 原始平仓时间 vs T+1最早可卖时间,取较晚者 actual_exit_time = max(sig_exit, can_sell_time) # 查找实际平仓价格 if actual_exit_time > sig_exit: # 需要延期,查找数据 future_data = data_df[data_df.index >= actual_exit_time] if len(future_data) > 0: actual_exit_time = future_data.index[0] actual_exit_price = future_data.iloc[0]['Open'] t1_adjusted = True else: # 无后续数据,使用原始 actual_exit_time = sig_exit actual_exit_price = signal['exit_price'] t1_adjusted = False else: actual_exit_price = signal['exit_price'] t1_adjusted = False # 计算仓位(使用全部资金) position_size = int(capital / entry_price) if position_size <= 0: print(f"\n[跳过] 信号 #{signal['signal_id']}: {entry_time.strftime('%m-%d %H:%M')} - 资金不足") continue # 记录持仓 current_position = { 'entry_time': entry_time, 'entry_price': entry_price, 'position_size': position_size, 'actual_exit_time': actual_exit_time, 'actual_exit_price': actual_exit_price, 't1_adjusted': t1_adjusted, 'original_exit_time': sig_exit, 'original_exit_price': signal['exit_price'], 'exit_reason': signal['exit_reason'], 'entry_signals': signal['entry_signals'] } # 计算盈亏 gross_pnl = (actual_exit_price - entry_price) * position_size pnl_pct = (actual_exit_price - entry_price) / entry_price * 100 # 更新资金 capital += gross_pnl # 记录交易 trade_record = { '交易方向': '做多', '开仓时间': entry_time, '平仓时间': actual_exit_time, '开仓价格': entry_price, '平仓价格': actual_exit_price, '仓位': position_size, '盈亏金额': gross_pnl, '盈亏百分比': pnl_pct, '退出原因': signal['exit_reason'] if not t1_adjusted else f"T+1延期-{signal['exit_reason']}", '持仓周期数': int((actual_exit_time - entry_time).total_seconds() / 1800), '持仓小时数': (actual_exit_time - entry_time).total_seconds() / 3600, 'T+1调整': '是(T0→T1)' if t1_adjusted else '否', '原平仓时间': sig_exit, '原平仓价格': signal['exit_price'], '入场信号': signal['entry_signals'], '平仓时资金': capital, } t1_trades.append(trade_record) # 打印 status = "✅盈利" if gross_pnl > 0 else "❌亏损" adj_str = "[T+1调整] " if t1_adjusted else "" print(f"\n[执行] 信号 #{signal['signal_id']}: {entry_time.strftime('%m-%d %H:%M')} → {actual_exit_time.strftime('%m-%d %H:%M')}") print(f" {adj_str}价格: {entry_price:.2f} → {actual_exit_price:.2f}") print(f" 盈亏: {gross_pnl:+,.0f}元 ({pnl_pct:+.2f}%) {status}") print(f" 资金: {capital:,.0f}元") # 处理最后一笔持仓(如果数据结束前未平仓) if current_position is not None and current_position['actual_exit_time'] > data_df.index[-1]: final_price = data_df.iloc[-1]['Close'] final_time = data_df.index[-1] entry_time = current_position['entry_time'] entry_price = current_position['entry_price'] position_size = current_position['position_size'] gross_pnl = (final_price - entry_price) * position_size pnl_pct = (final_price - entry_price) / entry_price * 100 capital += gross_pnl trade_record = { '交易方向': '做多', '开仓时间': entry_time, '平仓时间': final_time, '开仓价格': entry_price, '平仓价格': final_price, '仓位': position_size, '盈亏金额': gross_pnl, '盈亏百分比': pnl_pct, '退出原因': '回测强制平仓', '持仓周期数': int((final_time - entry_time).total_seconds() / 1800), '持仓小时数': (final_time - entry_time).total_seconds() / 3600, 'T+1调整': '否', '原平仓时间': final_time, '原平仓价格': final_price, '入场信号': current_position['entry_signals'], '平仓时资金': capital, } t1_trades.append(trade_record) print(f"\n[强制平仓] {final_time.strftime('%m-%d %H:%M')} @ {final_price:.2f}") print(f" 盈亏: {gross_pnl:+,.0f}元") # 生成结果 t1_trades_df = pd.DataFrame(t1_trades) print("\n" + "="*80) print("T+1回测 V2 完成") print("="*80) print(f"原始信号数: {len(signals)}笔") print(f"实际执行: {len(t1_trades)}笔") print(f"跳过信号: {len(signals) - len(t1_trades)}笔") print(f"最终资金: {capital:,.0f}元") print(f"总收益率: {(capital/initial_capital-1)*100:+.2f}%") return t1_trades_df def main(): """主程序 - 测试V2版本""" print("="*80) print("CYB50 T+1 回测引擎 V2") print("="*80) initial_capital = 1000000 # 获取数据和原始交易 print("\n【步骤1】获取数据...") config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) end_date = datetime.now() start_date = end_date - timedelta(days=70) raw_data = fetcher.fetch_30min_data(start_date, end_date) data_with_indicators = fetcher.calculate_intraday_indicators(raw_data) signal_generator = DualDirectionSignalGenerator() signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) executor = DualDirectionExecutor(initial_capital=initial_capital) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() print(f"✅ 原始做多信号: {len(long_trades)}笔") # 运行V2回测 print("\n【步骤2】运行T+1回测 V2...") t1_trades = simulate_t1_trades_v2(data_with_indicators, long_trades, initial_capital) # 对比 print("\n【步骤3】对比...") orig_pnl = long_trades['盈亏金额'].sum() t1_pnl = t1_trades['盈亏金额'].sum() if len(t1_trades) > 0 else 0 print(f"原始交易: {len(long_trades)}笔, 盈亏 {orig_pnl:+,.0f}元") print(f"T+1交易: {len(t1_trades)}笔, 盈亏 {t1_pnl:+,.0f}元") print(f"差异: {t1_pnl - orig_pnl:+,.0f}元") # 检查时间重叠 if len(t1_trades) > 0: print("\n【步骤4】验证时间无重叠...") prev_exit = None overlap = 0 for _, row in t1_trades.iterrows(): if prev_exit is not None and row['开仓时间'] < prev_exit: overlap += 1 print(f" ⚠️ 重叠: {row['开仓时间'].strftime('%m-%d %H:%M')} 早于前笔平仓 {prev_exit.strftime('%m-%d %H:%M')}") prev_exit = row['平仓时间'] if overlap == 0: print(" ✅ 无时间重叠") else: print(f" ❌ 发现 {overlap} 处重叠") # 保存 if len(t1_trades) > 0: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = f'cyb50_t1_v2_{timestamp}.csv' export_df = t1_trades.copy() for col in ['开仓时间', '平仓时间']: export_df[col] = export_df[col].dt.strftime('%Y-%m-%d %H:%M:%S') export_df.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"\n✅ 已保存: {output_file}") print("\n" + "="*80) print("完成!") print("="*80) if __name__ == "__main__": main()