""" CYB50-Pro 回测引擎 支持全链路回测和绩效归因 """ from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple, Any import json import numpy as np import pandas as pd from core.engine import CYB50ProEngine from core.ecosystem import UnifiedEcosystem from utils.logger import get_logger, EventType @dataclass class BacktestTrade: """回测交易记录""" entry_time: datetime exit_time: Optional[datetime] direction: str entry_price: float exit_price: Optional[float] position_size: float pnl: float pnl_pct: float agent_signals: Dict[str, Any] ecosystem_state: Dict[str, Any] @dataclass class BacktestResult: """回测结果""" # 基础信息(必须参数在前) start_date: datetime end_date: datetime initial_capital: float final_capital: float # 收益指标 total_return: float annual_return: float # 风险指标 max_drawdown: float max_drawdown_duration: int volatility: float sharpe_ratio: float sortino_ratio: float # 交易统计 total_trades: int winning_trades: int losing_trades: int win_rate: float avg_profit: float avg_loss: float profit_factor: float # 归因分析 regime_performance: Dict[str, Dict] agent_contributions: Dict[str, float] # 硬止损信息 hard_stop_triggered: bool = False hard_stop_date: Optional[datetime] = None # 原始数据(有默认值的参数在后) daily_returns: pd.Series = field(default_factory=pd.Series) equity_curve: pd.Series = field(default_factory=pd.Series) trades: List[BacktestTrade] = field(default_factory=list) class CYB50ProBacktester: """ CYB50-Pro 回测器 功能: 1. 加载历史数据(2018-2025) 2. 逐日/逐30分钟运行策略 3. 记录交易和权益变化 4. 计算绩效指标 5. 归因分析 """ def __init__( self, initial_capital: float = 1_000_000, commission_rate: float = 0.0005, # 万5佣金 slippage: float = 0.001, # 0.1%滑点 log_level: str = "INFO", catastrophic_drawdown: float = 0.15, # -15%熔断(更严格) position_stop: float = 0.10 # -10%单笔止损 ): self.initial_capital = initial_capital self.commission_rate = commission_rate self.slippage = slippage self.logger = get_logger(log_level=log_level) # 熔断参数 self.catastrophic_drawdown = catastrophic_drawdown self.position_stop = position_stop # 初始化引擎 self.engine = CYB50ProEngine() # 回测状态 self.current_capital = initial_capital self.current_position = 0.0 # 0-1 self.equity_curve: List[Tuple[datetime, float]] = [] self.trades: List[BacktestTrade] = [] self.current_trade: Optional[BacktestTrade] = None # 追踪峰值用于熔断判断 self.peak_equity = initial_capital self.entry_equity = initial_capital # 记录入场时的权益 self.trade_peak_equity = initial_capital # 单笔交易峰值 def run_backtest( self, price_data: pd.DataFrame, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> BacktestResult: """ 运行回测 Args: price_data: 历史价格数据 start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) Returns: BacktestResult: 回测结果 """ # 数据预处理 data = price_data.copy() if start_date: data = data[data.index >= start_date] if end_date: data = data[data.index <= end_date] self.logger.log_system( EventType.SYSTEM_START, f"Starting backtest from {data.index[0]} to {data.index[-1]}", level="info" ) # 初始化 self.current_capital = self.initial_capital self.equity_curve = [(data.index[0], self.initial_capital)] # 滚动回测 lookback = 60 # 需要60日历史数据 for i in range(lookback, len(data)): current_time = data.index[i] current_price = data['close'].iloc[i] # 获取历史数据窗口 hist_data = data.iloc[i-lookback:i+1] # 运行引擎 try: result = self.engine.run_cycle( price_data=hist_data, account_value=self.current_capital, tick_data=None ) # 处理信号 self._process_signal( result=result, current_time=current_time, current_price=current_price ) # 更新权益并检查熔断 self._update_equity(current_time, current_price) except Exception as e: self.logger.log_system( EventType.SYSTEM_ERROR, f"Error at {current_time}: {str(e)}", level="error" ) # 平仓最后一笔交易 if self.current_trade: self._close_trade( data.index[-1], data['close'].iloc[-1], "backtest_end" ) # 计算绩效指标 return self._calculate_metrics(data) def _process_signal( self, result: Dict, current_time: datetime, current_price: float ): """处理引擎信号""" if not result.get("executed"): return target_position = result.get("final_position", 0) # 检查是否需要调仓 if abs(target_position - self.current_position) < 0.05: return # 变化小于5%,不调仓 # 先平仓 if self.current_position > 0 and self.current_trade: self._close_trade(current_time, current_price, "signal_change") # 再开仓 if target_position > 0: self._open_trade( current_time, current_price, result.get("coordinated", {}).get("direction", "long"), target_position, result ) def _open_trade( self, time: datetime, price: float, direction: str, position_size: float, engine_result: Dict ): """开仓""" # 考虑滑点 if direction == "long": exec_price = price * (1 + self.slippage) else: exec_price = price * (1 - self.slippage) # 记录交易 self.current_trade = BacktestTrade( entry_time=time, exit_time=None, direction=direction, entry_price=exec_price, exit_price=None, position_size=position_size, pnl=0.0, pnl_pct=0.0, agent_signals=engine_result.get("signals", {}), ecosystem_state=engine_result.get("ecosystem", {}) ) self.current_position = position_size self.entry_equity = self.current_capital # 记录入场权益 self.trade_peak_equity = self.current_capital # 重置交易峰值 # 扣除佣金 trade_value = self.current_capital * position_size commission = trade_value * self.commission_rate self.current_capital -= commission def _close_trade( self, time: datetime, price: float, reason: str ): """平仓""" if not self.current_trade: return # 考虑滑点 if self.current_trade.direction == "long": exec_price = price * (1 - self.slippage) else: exec_price = price * (1 + self.slippage) # 计算盈亏 if self.current_trade.direction == "long": pnl_pct = (exec_price - self.current_trade.entry_price) / self.current_trade.entry_price else: pnl_pct = (self.current_trade.entry_price - exec_price) / self.current_trade.entry_price pnl = self.current_capital * self.current_position * pnl_pct # 扣除佣金 trade_value = self.current_capital * self.current_position commission = trade_value * self.commission_rate self.current_capital -= commission # 更新资金 self.current_capital += pnl # 记录交易 self.current_trade.exit_time = time self.current_trade.exit_price = exec_price self.current_trade.pnl = pnl self.current_trade.pnl_pct = pnl_pct self.trades.append(self.current_trade) self.current_trade = None self.current_position = 0.0 def _update_equity(self, time: datetime, price: float): """更新权益曲线,仅检查-20%熔断""" if self.current_position > 0 and self.current_trade: # 计算浮动盈亏 if self.current_trade.direction == "long": unrealized_pct = (price - self.current_trade.entry_price) / self.current_trade.entry_price else: unrealized_pct = (self.current_trade.entry_price - price) / self.current_trade.entry_price unrealized_pnl = self.current_capital * self.current_position * unrealized_pct total_equity = self.current_capital + unrealized_pnl else: total_equity = self.current_capital self.equity_curve.append((time, total_equity)) # 仅检查-20%熔断 self._check_catastrophic_stop(time, price, total_equity) def _check_catastrophic_stop(self, time: datetime, price: float, total_equity: float): """ 检查熔断和追踪止盈(路径一优化版) """ # 更新峰值权益 if total_equity > self.peak_equity: self.peak_equity = total_equity # 计算回撤 if self.peak_equity > 0: drawdown = (total_equity - self.peak_equity) / self.peak_equity else: drawdown = 0.0 # 1. 追踪止盈:从峰值回撤5%即出场 if self.current_position > 0: if total_equity > self.trade_peak_equity: self.trade_peak_equity = total_equity trailing_drawdown = (total_equity - self.trade_peak_equity) / self.trade_peak_equity if trailing_drawdown <= -0.05: # 5%追踪止盈 self.logger.log_system( EventType.RISK_WARNING, f"TRAILING STOP: Drawdown from peak {trailing_drawdown:.2%}. Closing trade.", level="warning" ) self._close_trade(time, price, f"trailing_stop_{trailing_drawdown:.2%}") self.peak_equity = total_equity return # 2. -15%熔断 if drawdown <= -self.catastrophic_drawdown: self.logger.log_system( EventType.RISK_WARNING, f"CATASTROPHIC STOP: Drawdown {drawdown:.2%}. Liquidating all positions.", level="critical" ) if self.current_position > 0: self._close_trade(time, price, f"catastrophic_stop_{drawdown:.2%}") self.peak_equity = total_equity def _calculate_metrics(self, price_data: pd.DataFrame) -> BacktestResult: """计算回测绩效指标""" # 构建权益曲线 equity_df = pd.DataFrame( self.equity_curve, columns=['timestamp', 'equity'] ).set_index('timestamp') # 计算日收益率 daily_equity = equity_df['equity'].resample('D').last().dropna() daily_returns = daily_equity.pct_change().dropna() # 基础指标 total_return = (daily_equity.iloc[-1] - self.initial_capital) / self.initial_capital # 年化收益 n_years = (daily_equity.index[-1] - daily_equity.index[0]).days / 365.25 annual_return = (1 + total_return) ** (1 / n_years) - 1 if n_years > 0 else 0 # 最大回撤 rolling_max = daily_equity.cummax() drawdown = (daily_equity - rolling_max) / rolling_max max_drawdown = drawdown.min() # 回撤持续时间 is_drawdown = drawdown < 0 drawdown_starts = is_drawdown.ne(is_drawdown.shift()).cumsum() drawdown_durations = is_drawdown.groupby(drawdown_starts).sum() max_drawdown_duration = drawdown_durations.max() if len(drawdown_durations) > 0 else 0 # 波动率和夏普 volatility = daily_returns.std() * np.sqrt(252) sharpe_ratio = (annual_return - 0.03) / volatility if volatility > 0 else 0 # Sortino比率 downside_returns = daily_returns[daily_returns < 0] downside_std = downside_returns.std() * np.sqrt(252) sortino_ratio = (annual_return - 0.03) / downside_std if downside_std > 0 else 0 # 交易统计 total_trades = len(self.trades) winning_trades = sum(1 for t in self.trades if t.pnl > 0) losing_trades = total_trades - winning_trades win_rate = winning_trades / total_trades if total_trades > 0 else 0 profits = [t.pnl for t in self.trades if t.pnl > 0] losses = [t.pnl for t in self.trades if t.pnl < 0] avg_profit = np.mean(profits) if profits else 0 avg_loss = np.mean(losses) if losses else 0 profit_factor = abs(sum(profits) / sum(losses)) if losses and sum(losses) != 0 else float('inf') # 归因分析 regime_perf = self._analyze_regime_performance() agent_contrib = self._analyze_agent_contributions() return BacktestResult( start_date=daily_equity.index[0], end_date=daily_equity.index[-1], initial_capital=self.initial_capital, final_capital=daily_equity.iloc[-1], total_return=total_return, annual_return=annual_return, daily_returns=daily_returns, max_drawdown=max_drawdown, max_drawdown_duration=int(max_drawdown_duration), volatility=volatility, sharpe_ratio=sharpe_ratio, sortino_ratio=sortino_ratio, total_trades=total_trades, winning_trades=winning_trades, losing_trades=losing_trades, win_rate=win_rate, avg_profit=avg_profit, avg_loss=avg_loss, profit_factor=profit_factor, regime_performance=regime_perf, agent_contributions=agent_contrib, hard_stop_triggered=False, hard_stop_date=None, equity_curve=daily_equity, trades=self.trades ) def _analyze_regime_performance(self) -> Dict[str, Dict]: """分析不同生态下的表现""" regime_stats = {} for trade in self.trades: regime = trade.ecosystem_state.get('macro', {}).get('regime', 'unknown') if regime not in regime_stats: regime_stats[regime] = { 'trades': 0, 'wins': 0, 'total_pnl': 0, 'avg_pnl': 0 } regime_stats[regime]['trades'] += 1 regime_stats[regime]['wins'] += 1 if trade.pnl > 0 else 0 regime_stats[regime]['total_pnl'] += trade.pnl # 计算平均值 for regime in regime_stats: stats = regime_stats[regime] stats['win_rate'] = stats['wins'] / stats['trades'] if stats['trades'] > 0 else 0 stats['avg_pnl'] = stats['total_pnl'] / stats['trades'] if stats['trades'] > 0 else 0 return regime_stats def _analyze_agent_contributions(self) -> Dict[str, float]: """分析各智能体贡献""" agent_pnl = {} for trade in self.trades: for agent_name, signal in trade.agent_signals.items(): if agent_name not in agent_pnl: agent_pnl[agent_name] = 0 # 简单归因:按信号置信度加权分配盈亏 confidence = signal.get('confidence', 0.5) agent_pnl[agent_name] += trade.pnl * confidence return agent_pnl def generate_report(self, result: BacktestResult, output_path: str): """生成回测报告""" report = { "summary": { "测试区间": f"{result.start_date.strftime('%Y-%m-%d')} to {result.end_date.strftime('%Y-%m-%d')}", "初始资金": f"{result.initial_capital:,.0f}", "最终资金": f"{result.final_capital:,.0f}", "总收益率": f"{result.total_return:.2%}", "年化收益率": f"{result.annual_return:.2%}", "最大回撤": f"{result.max_drawdown:.2%}", "夏普比率": f"{result.sharpe_ratio:.2f}", "Sortino比率": f"{result.sortino_ratio:.2f}" }, "trading_stats": { "总交易次数": result.total_trades, "盈利次数": result.winning_trades, "亏损次数": result.losing_trades, "胜率": f"{result.win_rate:.2%}", "平均盈利": f"{result.avg_profit:,.2f}", "平均亏损": f"{result.avg_loss:,.2f}", "盈亏比": f"{result.profit_factor:.2f}" }, "regime_analysis": result.regime_performance, "agent_contributions": result.agent_contributions } # 目标达成检查 targets = { "年化收益 25-35%": "✓" if 0.25 <= result.annual_return <= 0.35 else "✗", "最大回撤 <12%": "✓" if result.max_drawdown > -0.12 else "✗", "夏普比率 >1.5": "✓" if result.sharpe_ratio > 1.5 else "✗" } report["target_validation"] = targets # 保存JSON with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) self.logger.log_system( EventType.SYSTEM_STOP, f"Backtest report saved to {output_path}", level="info" ) return report