#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 T+1 回测引擎 - 正确的资金管理版本 核心规则: 1. T+1规则:买入当天不能卖出 2. 资金占用:持仓期间资金被占用,不能开新仓 3. 100%仓位:每次开仓使用全部可用资金 4. 信号连续性:如果前一交易持仓中,新信号出现时如何处理? 回测逻辑: - 遍历原始做多交易信号 - 维护持仓状态和资金 - 检查每笔交易的资金可用性 - 记录实际执行的交易 """ import pandas as pd import numpy as np from datetime import datetime, timedelta import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) def simulate_t1_trades_correct(data_df, long_trades_df, initial_capital=1000000): """ 正确的T+1回测 - 带资金管理 参数: data_df: 包含所有价格数据的DataFrame long_trades_df: 原始做多交易列表(作为信号源) initial_capital: 初始资金 返回: t1_trades_df: T+1规则下实际执行的交易 """ print("\n" + "="*80) print("T+1回测引擎 - 正确资金管理版本") print("="*80) if len(long_trades_df) == 0: print("没有做多交易记录") return pd.DataFrame() # 按开仓时间排序 signals = long_trades_df.sort_values('开仓时间').reset_index(drop=True) # 状态变量 capital = initial_capital # 总资金 available_capital = initial_capital # 可用资金(未持仓时等于总资金) position = 0 # 持仓数量 entry_price = 0 # 开仓价格 entry_time = None # 开仓时间 can_sell_time = None # 最早可卖出时间(T+1规则) entry_signals_str = '' # 入场信号描述 t1_trades = [] skipped_trades = [] print(f"\n初始资金: {initial_capital:,.0f}元") print(f"总信号数: {len(signals)}笔") print("\n" + "-"*80) for idx, signal in signals.iterrows(): signal_entry_time = signal['开仓时间'] signal_entry_price = signal['开仓价格'] signal_exit_time = signal['平仓时间'] signal_exit_price = signal['平仓价格'] signal_exit_reason = signal['退出原因'] signal_entry_signals = signal.get('入场信号', '') # 情况1:当前无持仓 if position == 0: # 检查可用资金是否足够 if available_capital <= 0: print(f"\n信号 #{idx+1}: {signal_entry_time.strftime('%m-%d %H:%M')} 跳过 - 资金不足") skipped_trades.append({ '信号序号': idx+1, '信号时间': signal_entry_time, '跳过原因': '资金不足', '备注': f'可用资金: {available_capital:,.0f}元' }) continue # 开仓 - 使用全部可用资金 position_size = int(available_capital / signal_entry_price) if position_size <= 0: print(f"\n信号 #{idx+1}: {signal_entry_time.strftime('%m-%d %H:%M')} 跳过 - 股价过高无法开仓") continue cost = position_size * signal_entry_price # 更新状态 position = position_size entry_price = signal_entry_price entry_time = signal_entry_time can_sell_time = signal_entry_time + timedelta(days=1) # T+1:次日才能卖 can_sell_time = can_sell_time.replace(hour=9, minute=30) # 假设次日9:30可卖 entry_signals_str = signal_entry_signals available_capital = 0 # 资金全部占用 print(f"\n[开仓] 信号 #{idx+1}: {entry_time.strftime('%m-%d %H:%M')}") print(f" 价格: {entry_price:.2f}, 数量: {position}股, 成本: {cost:,.0f}元") print(f" 最早可卖: {can_sell_time.strftime('%m-%d %H:%M')} (T+1)") # 确定平仓时间和价格 # 原始信号可能是T0(当天),但T+1要延期 actual_exit_time = max(signal_exit_time, can_sell_time) # 如果延期,需要从data_df中找到对应的价格 if actual_exit_time > signal_exit_time: # 查找延期后的平仓价格 future_data = data_df[data_df.index >= can_sell_time] if len(future_data) > 0: # 使用最早可用时间的价格 actual_exit_time = future_data.index[0] actual_exit_price = future_data.iloc[0]['Open'] t1_adjusted = True print(f" [T+1调整] 原始平仓: {signal_exit_time.strftime('%m-%d %H:%M')} → 新平仓: {actual_exit_time.strftime('%m-%d %H:%M')}") else: # 没有后续数据,使用原始平仓 actual_exit_time = signal_exit_time actual_exit_price = signal_exit_price t1_adjusted = False print(f" [警告] 无法找到T+1数据,使用原始平仓时间") else: actual_exit_price = signal_exit_price t1_adjusted = False print(f" [正常持仓] 平仓: {actual_exit_time.strftime('%m-%d %H:%M')}") # 计算盈亏 gross_pnl = (actual_exit_price - entry_price) * position pnl_pct = (actual_exit_price - entry_price) / entry_price * 100 # 更新资金 capital += gross_pnl available_capital = capital # 平仓后资金释放 # 记录交易 trade_record = { '交易方向': '做多', '开仓时间': entry_time, '平仓时间': actual_exit_time, '开仓价格': entry_price, '平仓价格': actual_exit_price, '仓位': position, '盈亏金额': gross_pnl, '盈亏百分比': pnl_pct, '退出原因': signal_exit_reason if not t1_adjusted else f"T+1延期-{signal_exit_reason}", '持仓周期数': int((actual_exit_time - entry_time).total_seconds() / 1800), '持仓小时数': (actual_exit_time - entry_time).total_seconds() / 3600, 'T+1调整': '是(T0→T1)' if t1_adjusted else '否', '原平仓时间': signal_exit_time, '原平仓价格': signal_exit_price, '入场信号': entry_signals_str, '平仓时资金': capital, } t1_trades.append(trade_record) status = "✅盈利" if gross_pnl > 0 else "❌亏损" print(f" [平仓] {actual_exit_time.strftime('%m-%d %H:%M')} @ {actual_exit_price:.2f}") print(f" {status}: {gross_pnl:+,.0f}元 ({pnl_pct:+.2f}%)") print(f" 当前总资金: {capital:,.0f}元") # 重置持仓状态 position = 0 entry_price = 0 entry_time = None can_sell_time = None # 情况2:当前有持仓 else: # 检查新信号是否在持仓期间 if signal_entry_time < entry_time: print(f"\n信号 #{idx+1}: {signal_entry_time.strftime('%m-%d %H:%M')} 跳过 - 时间早于当前持仓") continue # 信号出现时已有持仓,跳过该信号 print(f"\n信号 #{idx+1}: {signal_entry_time.strftime('%m-%d %H:%M')} 跳过 - 当前持仓中 (持仓从 {entry_time.strftime('%m-%d %H:%M')} 开始)") skipped_trades.append({ '信号序号': idx+1, '信号时间': signal_entry_time, '跳过原因': '持仓中无法开仓', '备注': f'当前持仓: {entry_time.strftime("%m-%d %H:%M")} 买入' }) continue # 处理最后一笔持仓(如果存在) if position > 0: final_price = data_df.iloc[-1]['Close'] final_time = data_df.index[-1] gross_pnl = (final_price - entry_price) * position pnl_pct = (final_price - entry_price) / entry_price * 100 capital += gross_pnl trade_record = { '交易方向': '做多', '开仓时间': entry_time, '平仓时间': final_time, '开仓价格': entry_price, '平仓价格': final_price, '仓位': position, '盈亏金额': gross_pnl, '盈亏百分比': pnl_pct, '退出原因': f'回测强制平仓(最终价格{final_price:.2f})', '持仓周期数': int((final_time - entry_time).total_seconds() / 1800), '持仓小时数': (final_time - entry_time).total_seconds() / 3600, 'T+1调整': '否', '原平仓时间': final_time, '原平仓价格': final_price, '入场信号': entry_signals_str, '平仓时资金': capital, } t1_trades.append(trade_record) print(f"\n[强制平仓] {final_time.strftime('%m-%d %H:%M')} @ {final_price:.2f}") print(f" 盈亏: {gross_pnl:+,.0f}元 ({pnl_pct:+.2f}%)") # 生成结果 t1_trades_df = pd.DataFrame(t1_trades) print("\n" + "="*80) print("T+1回测完成") print("="*80) print(f"原始信号数: {len(signals)}笔") print(f"实际执行: {len(t1_trades)}笔") print(f"跳过信号: {len(skipped_trades)}笔") print(f"最终资金: {capital:,.0f}元") print(f"总收益率: {(capital/initial_capital-1)*100:+.2f}%") if len(skipped_trades) > 0: print("\n【跳过的信号】") for st in skipped_trades: print(f" 信号#{st['信号序号']} {st['信号时间'].strftime('%m-%d %H:%M')}: {st['跳过原因']}") return t1_trades_df def compare_with_original(original_trades, t1_trades, initial_capital=1000000): """对比原始交易和T+1交易""" print("\n" + "="*80) print("对比分析") print("="*80) # 原始统计 orig_pnl = original_trades['盈亏金额'].sum() orig_final = initial_capital + orig_pnl # T+1统计 t1_pnl = t1_trades['盈亏金额'].sum() if len(t1_trades) > 0 else 0 t1_final = initial_capital + t1_pnl print(f"\n【原始交易(假设T0,无资金限制)】") print(f" 交易次数: {len(original_trades)}") print(f" 总盈亏: {orig_pnl:+,.0f}元") print(f" 最终资金: {orig_final:,.0f}元") print(f" 收益率: {(orig_final/initial_capital-1)*100:+.2f}%") print(f"\n【T+1回测(正确资金管理)】") print(f" 交易次数: {len(t1_trades)}") print(f" 总盈亏: {t1_pnl:+,.0f}元") print(f" 最终资金: {t1_final:,.0f}元") print(f" 收益率: {(t1_final/initial_capital-1)*100:+.2f}%") print(f"\n【差异分析】") print(f" 交易次数差异: {len(original_trades) - len(t1_trades)}笔 (被T+1规则过滤)") print(f" 盈亏差异: {t1_pnl - orig_pnl:+,.0f}元") print(f" 收益率差异: {(t1_final - orig_final)/initial_capital*100:+.2f}%") def main(): """主程序 - 测试正确版本""" print("="*80) print("CYB50 T+1 回测引擎 - 正确资金管理版本") print("="*80) initial_capital = 1000000 # 1. 获取数据和原始交易 print("\n【步骤1】获取数据和原始交易信号...") config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) end_date = datetime.now() start_date = end_date - timedelta(days=70) raw_data = fetcher.fetch_30min_data(start_date, end_date) data_with_indicators = fetcher.calculate_intraday_indicators(raw_data) signal_generator = DualDirectionSignalGenerator() signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) executor = DualDirectionExecutor(initial_capital=initial_capital) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() print(f"✅ 获取到 {len(long_trades)} 笔做多交易信号") # 2. 运行正确的T+1回测 print("\n【步骤2】运行正确的T+1回测...") t1_trades = simulate_t1_trades_correct(data_with_indicators, long_trades, initial_capital) # 3. 对比分析 print("\n【步骤3】对比分析...") compare_with_original(long_trades, t1_trades, initial_capital) # 4. 导出结果 if len(t1_trades) > 0: print("\n【步骤4】导出结果...") export_df = t1_trades.copy() for col in ['开仓时间', '平仓时间', '原平仓时间']: if col in export_df.columns: export_df[col] = export_df[col].dt.strftime('%Y-%m-%d %H:%M:%S') timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = f'cyb50_t1_correct_{timestamp}.csv' export_df.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"✅ 已保存: {output_file}") print("\n" + "="*80) print("完成!") print("="*80) if __name__ == "__main__": main()