| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543 |
- """
- CYB50-Pro 回测引擎
- 支持全链路回测和绩效归因
- """
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional, Tuple, Any
- import json
- import numpy as np
- import pandas as pd
- from core.engine import CYB50ProEngine
- from core.ecosystem import UnifiedEcosystem
- from utils.logger import get_logger, EventType
- @dataclass
- class BacktestTrade:
- """回测交易记录"""
- entry_time: datetime
- exit_time: Optional[datetime]
- direction: str
- entry_price: float
- exit_price: Optional[float]
- position_size: float
- pnl: float
- pnl_pct: float
- agent_signals: Dict[str, Any]
- ecosystem_state: Dict[str, Any]
- @dataclass
- class BacktestResult:
- """回测结果"""
- # 基础信息(必须参数在前)
- start_date: datetime
- end_date: datetime
- initial_capital: float
- final_capital: float
- # 收益指标
- total_return: float
- annual_return: float
- # 风险指标
- max_drawdown: float
- max_drawdown_duration: int
- volatility: float
- sharpe_ratio: float
- sortino_ratio: float
- # 交易统计
- total_trades: int
- winning_trades: int
- losing_trades: int
- win_rate: float
- avg_profit: float
- avg_loss: float
- profit_factor: float
- # 归因分析
- regime_performance: Dict[str, Dict]
- agent_contributions: Dict[str, float]
- # 硬止损信息
- hard_stop_triggered: bool = False
- hard_stop_date: Optional[datetime] = None
- # 原始数据(有默认值的参数在后)
- daily_returns: pd.Series = field(default_factory=pd.Series)
- equity_curve: pd.Series = field(default_factory=pd.Series)
- trades: List[BacktestTrade] = field(default_factory=list)
- class CYB50ProBacktester:
- """
- CYB50-Pro 回测器
- 功能:
- 1. 加载历史数据(2018-2025)
- 2. 逐日/逐30分钟运行策略
- 3. 记录交易和权益变化
- 4. 计算绩效指标
- 5. 归因分析
- """
- def __init__(
- self,
- initial_capital: float = 1_000_000,
- commission_rate: float = 0.0005, # 万5佣金
- slippage: float = 0.001, # 0.1%滑点
- log_level: str = "INFO",
- catastrophic_drawdown: float = 0.15, # -15%熔断(更严格)
- position_stop: float = 0.10 # -10%单笔止损
- ):
- self.initial_capital = initial_capital
- self.commission_rate = commission_rate
- self.slippage = slippage
- self.logger = get_logger(log_level=log_level)
- # 熔断参数
- self.catastrophic_drawdown = catastrophic_drawdown
- self.position_stop = position_stop
- # 初始化引擎
- self.engine = CYB50ProEngine()
- # 回测状态
- self.current_capital = initial_capital
- self.current_position = 0.0 # 0-1
- self.equity_curve: List[Tuple[datetime, float]] = []
- self.trades: List[BacktestTrade] = []
- self.current_trade: Optional[BacktestTrade] = None
- # 追踪峰值用于熔断判断
- self.peak_equity = initial_capital
- self.entry_equity = initial_capital # 记录入场时的权益
- self.trade_peak_equity = initial_capital # 单笔交易峰值
- def run_backtest(
- self,
- price_data: pd.DataFrame,
- start_date: Optional[str] = None,
- end_date: Optional[str] = None
- ) -> BacktestResult:
- """
- 运行回测
- Args:
- price_data: 历史价格数据
- start_date: 开始日期 (YYYY-MM-DD)
- end_date: 结束日期 (YYYY-MM-DD)
- Returns:
- BacktestResult: 回测结果
- """
- # 数据预处理
- data = price_data.copy()
- if start_date:
- data = data[data.index >= start_date]
- if end_date:
- data = data[data.index <= end_date]
- self.logger.log_system(
- EventType.SYSTEM_START,
- f"Starting backtest from {data.index[0]} to {data.index[-1]}",
- level="info"
- )
- # 初始化
- self.current_capital = self.initial_capital
- self.equity_curve = [(data.index[0], self.initial_capital)]
- # 滚动回测
- lookback = 60 # 需要60日历史数据
- for i in range(lookback, len(data)):
- current_time = data.index[i]
- current_price = data['close'].iloc[i]
- # 获取历史数据窗口
- hist_data = data.iloc[i-lookback:i+1]
- # 运行引擎
- try:
- result = self.engine.run_cycle(
- price_data=hist_data,
- account_value=self.current_capital,
- tick_data=None
- )
- # 处理信号
- self._process_signal(
- result=result,
- current_time=current_time,
- current_price=current_price
- )
- # 更新权益并检查熔断
- self._update_equity(current_time, current_price)
- except Exception as e:
- self.logger.log_system(
- EventType.SYSTEM_ERROR,
- f"Error at {current_time}: {str(e)}",
- level="error"
- )
- # 平仓最后一笔交易
- if self.current_trade:
- self._close_trade(
- data.index[-1],
- data['close'].iloc[-1],
- "backtest_end"
- )
- # 计算绩效指标
- return self._calculate_metrics(data)
- def _process_signal(
- self,
- result: Dict,
- current_time: datetime,
- current_price: float
- ):
- """处理引擎信号"""
- if not result.get("executed"):
- return
- target_position = result.get("final_position", 0)
- # 检查是否需要调仓
- if abs(target_position - self.current_position) < 0.05:
- return # 变化小于5%,不调仓
- # 先平仓
- if self.current_position > 0 and self.current_trade:
- self._close_trade(current_time, current_price, "signal_change")
- # 再开仓
- if target_position > 0:
- self._open_trade(
- current_time,
- current_price,
- result.get("coordinated", {}).get("direction", "long"),
- target_position,
- result
- )
- def _open_trade(
- self,
- time: datetime,
- price: float,
- direction: str,
- position_size: float,
- engine_result: Dict
- ):
- """开仓"""
- # 考虑滑点
- if direction == "long":
- exec_price = price * (1 + self.slippage)
- else:
- exec_price = price * (1 - self.slippage)
- # 记录交易
- self.current_trade = BacktestTrade(
- entry_time=time,
- exit_time=None,
- direction=direction,
- entry_price=exec_price,
- exit_price=None,
- position_size=position_size,
- pnl=0.0,
- pnl_pct=0.0,
- agent_signals=engine_result.get("signals", {}),
- ecosystem_state=engine_result.get("ecosystem", {})
- )
- self.current_position = position_size
- self.entry_equity = self.current_capital # 记录入场权益
- self.trade_peak_equity = self.current_capital # 重置交易峰值
- # 扣除佣金
- trade_value = self.current_capital * position_size
- commission = trade_value * self.commission_rate
- self.current_capital -= commission
- def _close_trade(
- self,
- time: datetime,
- price: float,
- reason: str
- ):
- """平仓"""
- if not self.current_trade:
- return
- # 考虑滑点
- if self.current_trade.direction == "long":
- exec_price = price * (1 - self.slippage)
- else:
- exec_price = price * (1 + self.slippage)
- # 计算盈亏
- if self.current_trade.direction == "long":
- pnl_pct = (exec_price - self.current_trade.entry_price) / self.current_trade.entry_price
- else:
- pnl_pct = (self.current_trade.entry_price - exec_price) / self.current_trade.entry_price
- pnl = self.current_capital * self.current_position * pnl_pct
- # 扣除佣金
- trade_value = self.current_capital * self.current_position
- commission = trade_value * self.commission_rate
- self.current_capital -= commission
- # 更新资金
- self.current_capital += pnl
- # 记录交易
- self.current_trade.exit_time = time
- self.current_trade.exit_price = exec_price
- self.current_trade.pnl = pnl
- self.current_trade.pnl_pct = pnl_pct
- self.trades.append(self.current_trade)
- self.current_trade = None
- self.current_position = 0.0
- def _update_equity(self, time: datetime, price: float):
- """更新权益曲线,仅检查-20%熔断"""
- if self.current_position > 0 and self.current_trade:
- # 计算浮动盈亏
- if self.current_trade.direction == "long":
- unrealized_pct = (price - self.current_trade.entry_price) / self.current_trade.entry_price
- else:
- unrealized_pct = (self.current_trade.entry_price - price) / self.current_trade.entry_price
- unrealized_pnl = self.current_capital * self.current_position * unrealized_pct
- total_equity = self.current_capital + unrealized_pnl
- else:
- total_equity = self.current_capital
- self.equity_curve.append((time, total_equity))
- # 仅检查-20%熔断
- self._check_catastrophic_stop(time, price, total_equity)
- def _check_catastrophic_stop(self, time: datetime, price: float, total_equity: float):
- """
- 检查熔断和追踪止盈(路径一优化版)
- """
- # 更新峰值权益
- if total_equity > self.peak_equity:
- self.peak_equity = total_equity
- # 计算回撤
- if self.peak_equity > 0:
- drawdown = (total_equity - self.peak_equity) / self.peak_equity
- else:
- drawdown = 0.0
- # 1. 追踪止盈:从峰值回撤5%即出场
- if self.current_position > 0:
- if total_equity > self.trade_peak_equity:
- self.trade_peak_equity = total_equity
- trailing_drawdown = (total_equity - self.trade_peak_equity) / self.trade_peak_equity
- if trailing_drawdown <= -0.05: # 5%追踪止盈
- self.logger.log_system(
- EventType.RISK_WARNING,
- f"TRAILING STOP: Drawdown from peak {trailing_drawdown:.2%}. Closing trade.",
- level="warning"
- )
- self._close_trade(time, price, f"trailing_stop_{trailing_drawdown:.2%}")
- self.peak_equity = total_equity
- return
- # 2. -15%熔断
- if drawdown <= -self.catastrophic_drawdown:
- self.logger.log_system(
- EventType.RISK_WARNING,
- f"CATASTROPHIC STOP: Drawdown {drawdown:.2%}. Liquidating all positions.",
- level="critical"
- )
- if self.current_position > 0:
- self._close_trade(time, price, f"catastrophic_stop_{drawdown:.2%}")
- self.peak_equity = total_equity
- def _calculate_metrics(self, price_data: pd.DataFrame) -> BacktestResult:
- """计算回测绩效指标"""
- # 构建权益曲线
- equity_df = pd.DataFrame(
- self.equity_curve,
- columns=['timestamp', 'equity']
- ).set_index('timestamp')
- # 计算日收益率
- daily_equity = equity_df['equity'].resample('D').last().dropna()
- daily_returns = daily_equity.pct_change().dropna()
- # 基础指标
- total_return = (daily_equity.iloc[-1] - self.initial_capital) / self.initial_capital
- # 年化收益
- n_years = (daily_equity.index[-1] - daily_equity.index[0]).days / 365.25
- annual_return = (1 + total_return) ** (1 / n_years) - 1 if n_years > 0 else 0
- # 最大回撤
- rolling_max = daily_equity.cummax()
- drawdown = (daily_equity - rolling_max) / rolling_max
- max_drawdown = drawdown.min()
- # 回撤持续时间
- is_drawdown = drawdown < 0
- drawdown_starts = is_drawdown.ne(is_drawdown.shift()).cumsum()
- drawdown_durations = is_drawdown.groupby(drawdown_starts).sum()
- max_drawdown_duration = drawdown_durations.max() if len(drawdown_durations) > 0 else 0
- # 波动率和夏普
- volatility = daily_returns.std() * np.sqrt(252)
- sharpe_ratio = (annual_return - 0.03) / volatility if volatility > 0 else 0
- # Sortino比率
- downside_returns = daily_returns[daily_returns < 0]
- downside_std = downside_returns.std() * np.sqrt(252)
- sortino_ratio = (annual_return - 0.03) / downside_std if downside_std > 0 else 0
- # 交易统计
- total_trades = len(self.trades)
- winning_trades = sum(1 for t in self.trades if t.pnl > 0)
- losing_trades = total_trades - winning_trades
- win_rate = winning_trades / total_trades if total_trades > 0 else 0
- profits = [t.pnl for t in self.trades if t.pnl > 0]
- losses = [t.pnl for t in self.trades if t.pnl < 0]
- avg_profit = np.mean(profits) if profits else 0
- avg_loss = np.mean(losses) if losses else 0
- profit_factor = abs(sum(profits) / sum(losses)) if losses and sum(losses) != 0 else float('inf')
- # 归因分析
- regime_perf = self._analyze_regime_performance()
- agent_contrib = self._analyze_agent_contributions()
- return BacktestResult(
- start_date=daily_equity.index[0],
- end_date=daily_equity.index[-1],
- initial_capital=self.initial_capital,
- final_capital=daily_equity.iloc[-1],
- total_return=total_return,
- annual_return=annual_return,
- daily_returns=daily_returns,
- max_drawdown=max_drawdown,
- max_drawdown_duration=int(max_drawdown_duration),
- volatility=volatility,
- sharpe_ratio=sharpe_ratio,
- sortino_ratio=sortino_ratio,
- total_trades=total_trades,
- winning_trades=winning_trades,
- losing_trades=losing_trades,
- win_rate=win_rate,
- avg_profit=avg_profit,
- avg_loss=avg_loss,
- profit_factor=profit_factor,
- regime_performance=regime_perf,
- agent_contributions=agent_contrib,
- hard_stop_triggered=False,
- hard_stop_date=None,
- equity_curve=daily_equity,
- trades=self.trades
- )
- def _analyze_regime_performance(self) -> Dict[str, Dict]:
- """分析不同生态下的表现"""
- regime_stats = {}
- for trade in self.trades:
- regime = trade.ecosystem_state.get('macro', {}).get('regime', 'unknown')
- if regime not in regime_stats:
- regime_stats[regime] = {
- 'trades': 0,
- 'wins': 0,
- 'total_pnl': 0,
- 'avg_pnl': 0
- }
- regime_stats[regime]['trades'] += 1
- regime_stats[regime]['wins'] += 1 if trade.pnl > 0 else 0
- regime_stats[regime]['total_pnl'] += trade.pnl
- # 计算平均值
- for regime in regime_stats:
- stats = regime_stats[regime]
- stats['win_rate'] = stats['wins'] / stats['trades'] if stats['trades'] > 0 else 0
- stats['avg_pnl'] = stats['total_pnl'] / stats['trades'] if stats['trades'] > 0 else 0
- return regime_stats
- def _analyze_agent_contributions(self) -> Dict[str, float]:
- """分析各智能体贡献"""
- agent_pnl = {}
- for trade in self.trades:
- for agent_name, signal in trade.agent_signals.items():
- if agent_name not in agent_pnl:
- agent_pnl[agent_name] = 0
- # 简单归因:按信号置信度加权分配盈亏
- confidence = signal.get('confidence', 0.5)
- agent_pnl[agent_name] += trade.pnl * confidence
- return agent_pnl
- def generate_report(self, result: BacktestResult, output_path: str):
- """生成回测报告"""
- report = {
- "summary": {
- "测试区间": f"{result.start_date.strftime('%Y-%m-%d')} to {result.end_date.strftime('%Y-%m-%d')}",
- "初始资金": f"{result.initial_capital:,.0f}",
- "最终资金": f"{result.final_capital:,.0f}",
- "总收益率": f"{result.total_return:.2%}",
- "年化收益率": f"{result.annual_return:.2%}",
- "最大回撤": f"{result.max_drawdown:.2%}",
- "夏普比率": f"{result.sharpe_ratio:.2f}",
- "Sortino比率": f"{result.sortino_ratio:.2f}"
- },
- "trading_stats": {
- "总交易次数": result.total_trades,
- "盈利次数": result.winning_trades,
- "亏损次数": result.losing_trades,
- "胜率": f"{result.win_rate:.2%}",
- "平均盈利": f"{result.avg_profit:,.2f}",
- "平均亏损": f"{result.avg_loss:,.2f}",
- "盈亏比": f"{result.profit_factor:.2f}"
- },
- "regime_analysis": result.regime_performance,
- "agent_contributions": result.agent_contributions
- }
- # 目标达成检查
- targets = {
- "年化收益 25-35%": "✓" if 0.25 <= result.annual_return <= 0.35 else "✗",
- "最大回撤 <12%": "✓" if result.max_drawdown > -0.12 else "✗",
- "夏普比率 >1.5": "✓" if result.sharpe_ratio > 1.5 else "✗"
- }
- report["target_validation"] = targets
- # 保存JSON
- with open(output_path, 'w', encoding='utf-8') as f:
- json.dump(report, f, ensure_ascii=False, indent=2)
- self.logger.log_system(
- EventType.SYSTEM_STOP,
- f"Backtest report saved to {output_path}",
- level="info"
- )
- return report
|