engine.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543
  1. """
  2. CYB50-Pro 回测引擎
  3. 支持全链路回测和绩效归因
  4. """
  5. from dataclasses import dataclass, field
  6. from datetime import datetime, timedelta
  7. from typing import Dict, List, Optional, Tuple, Any
  8. import json
  9. import numpy as np
  10. import pandas as pd
  11. from core.engine import CYB50ProEngine
  12. from core.ecosystem import UnifiedEcosystem
  13. from utils.logger import get_logger, EventType
  14. @dataclass
  15. class BacktestTrade:
  16. """回测交易记录"""
  17. entry_time: datetime
  18. exit_time: Optional[datetime]
  19. direction: str
  20. entry_price: float
  21. exit_price: Optional[float]
  22. position_size: float
  23. pnl: float
  24. pnl_pct: float
  25. agent_signals: Dict[str, Any]
  26. ecosystem_state: Dict[str, Any]
  27. @dataclass
  28. class BacktestResult:
  29. """回测结果"""
  30. # 基础信息(必须参数在前)
  31. start_date: datetime
  32. end_date: datetime
  33. initial_capital: float
  34. final_capital: float
  35. # 收益指标
  36. total_return: float
  37. annual_return: float
  38. # 风险指标
  39. max_drawdown: float
  40. max_drawdown_duration: int
  41. volatility: float
  42. sharpe_ratio: float
  43. sortino_ratio: float
  44. # 交易统计
  45. total_trades: int
  46. winning_trades: int
  47. losing_trades: int
  48. win_rate: float
  49. avg_profit: float
  50. avg_loss: float
  51. profit_factor: float
  52. # 归因分析
  53. regime_performance: Dict[str, Dict]
  54. agent_contributions: Dict[str, float]
  55. # 硬止损信息
  56. hard_stop_triggered: bool = False
  57. hard_stop_date: Optional[datetime] = None
  58. # 原始数据(有默认值的参数在后)
  59. daily_returns: pd.Series = field(default_factory=pd.Series)
  60. equity_curve: pd.Series = field(default_factory=pd.Series)
  61. trades: List[BacktestTrade] = field(default_factory=list)
  62. class CYB50ProBacktester:
  63. """
  64. CYB50-Pro 回测器
  65. 功能:
  66. 1. 加载历史数据(2018-2025)
  67. 2. 逐日/逐30分钟运行策略
  68. 3. 记录交易和权益变化
  69. 4. 计算绩效指标
  70. 5. 归因分析
  71. """
  72. def __init__(
  73. self,
  74. initial_capital: float = 1_000_000,
  75. commission_rate: float = 0.0005, # 万5佣金
  76. slippage: float = 0.001, # 0.1%滑点
  77. log_level: str = "INFO",
  78. catastrophic_drawdown: float = 0.15, # -15%熔断(更严格)
  79. position_stop: float = 0.10 # -10%单笔止损
  80. ):
  81. self.initial_capital = initial_capital
  82. self.commission_rate = commission_rate
  83. self.slippage = slippage
  84. self.logger = get_logger(log_level=log_level)
  85. # 熔断参数
  86. self.catastrophic_drawdown = catastrophic_drawdown
  87. self.position_stop = position_stop
  88. # 初始化引擎
  89. self.engine = CYB50ProEngine()
  90. # 回测状态
  91. self.current_capital = initial_capital
  92. self.current_position = 0.0 # 0-1
  93. self.equity_curve: List[Tuple[datetime, float]] = []
  94. self.trades: List[BacktestTrade] = []
  95. self.current_trade: Optional[BacktestTrade] = None
  96. # 追踪峰值用于熔断判断
  97. self.peak_equity = initial_capital
  98. self.entry_equity = initial_capital # 记录入场时的权益
  99. self.trade_peak_equity = initial_capital # 单笔交易峰值
  100. def run_backtest(
  101. self,
  102. price_data: pd.DataFrame,
  103. start_date: Optional[str] = None,
  104. end_date: Optional[str] = None
  105. ) -> BacktestResult:
  106. """
  107. 运行回测
  108. Args:
  109. price_data: 历史价格数据
  110. start_date: 开始日期 (YYYY-MM-DD)
  111. end_date: 结束日期 (YYYY-MM-DD)
  112. Returns:
  113. BacktestResult: 回测结果
  114. """
  115. # 数据预处理
  116. data = price_data.copy()
  117. if start_date:
  118. data = data[data.index >= start_date]
  119. if end_date:
  120. data = data[data.index <= end_date]
  121. self.logger.log_system(
  122. EventType.SYSTEM_START,
  123. f"Starting backtest from {data.index[0]} to {data.index[-1]}",
  124. level="info"
  125. )
  126. # 初始化
  127. self.current_capital = self.initial_capital
  128. self.equity_curve = [(data.index[0], self.initial_capital)]
  129. # 滚动回测
  130. lookback = 60 # 需要60日历史数据
  131. for i in range(lookback, len(data)):
  132. current_time = data.index[i]
  133. current_price = data['close'].iloc[i]
  134. # 获取历史数据窗口
  135. hist_data = data.iloc[i-lookback:i+1]
  136. # 运行引擎
  137. try:
  138. result = self.engine.run_cycle(
  139. price_data=hist_data,
  140. account_value=self.current_capital,
  141. tick_data=None
  142. )
  143. # 处理信号
  144. self._process_signal(
  145. result=result,
  146. current_time=current_time,
  147. current_price=current_price
  148. )
  149. # 更新权益并检查熔断
  150. self._update_equity(current_time, current_price)
  151. except Exception as e:
  152. self.logger.log_system(
  153. EventType.SYSTEM_ERROR,
  154. f"Error at {current_time}: {str(e)}",
  155. level="error"
  156. )
  157. # 平仓最后一笔交易
  158. if self.current_trade:
  159. self._close_trade(
  160. data.index[-1],
  161. data['close'].iloc[-1],
  162. "backtest_end"
  163. )
  164. # 计算绩效指标
  165. return self._calculate_metrics(data)
  166. def _process_signal(
  167. self,
  168. result: Dict,
  169. current_time: datetime,
  170. current_price: float
  171. ):
  172. """处理引擎信号"""
  173. if not result.get("executed"):
  174. return
  175. target_position = result.get("final_position", 0)
  176. # 检查是否需要调仓
  177. if abs(target_position - self.current_position) < 0.05:
  178. return # 变化小于5%,不调仓
  179. # 先平仓
  180. if self.current_position > 0 and self.current_trade:
  181. self._close_trade(current_time, current_price, "signal_change")
  182. # 再开仓
  183. if target_position > 0:
  184. self._open_trade(
  185. current_time,
  186. current_price,
  187. result.get("coordinated", {}).get("direction", "long"),
  188. target_position,
  189. result
  190. )
  191. def _open_trade(
  192. self,
  193. time: datetime,
  194. price: float,
  195. direction: str,
  196. position_size: float,
  197. engine_result: Dict
  198. ):
  199. """开仓"""
  200. # 考虑滑点
  201. if direction == "long":
  202. exec_price = price * (1 + self.slippage)
  203. else:
  204. exec_price = price * (1 - self.slippage)
  205. # 记录交易
  206. self.current_trade = BacktestTrade(
  207. entry_time=time,
  208. exit_time=None,
  209. direction=direction,
  210. entry_price=exec_price,
  211. exit_price=None,
  212. position_size=position_size,
  213. pnl=0.0,
  214. pnl_pct=0.0,
  215. agent_signals=engine_result.get("signals", {}),
  216. ecosystem_state=engine_result.get("ecosystem", {})
  217. )
  218. self.current_position = position_size
  219. self.entry_equity = self.current_capital # 记录入场权益
  220. self.trade_peak_equity = self.current_capital # 重置交易峰值
  221. # 扣除佣金
  222. trade_value = self.current_capital * position_size
  223. commission = trade_value * self.commission_rate
  224. self.current_capital -= commission
  225. def _close_trade(
  226. self,
  227. time: datetime,
  228. price: float,
  229. reason: str
  230. ):
  231. """平仓"""
  232. if not self.current_trade:
  233. return
  234. # 考虑滑点
  235. if self.current_trade.direction == "long":
  236. exec_price = price * (1 - self.slippage)
  237. else:
  238. exec_price = price * (1 + self.slippage)
  239. # 计算盈亏
  240. if self.current_trade.direction == "long":
  241. pnl_pct = (exec_price - self.current_trade.entry_price) / self.current_trade.entry_price
  242. else:
  243. pnl_pct = (self.current_trade.entry_price - exec_price) / self.current_trade.entry_price
  244. pnl = self.current_capital * self.current_position * pnl_pct
  245. # 扣除佣金
  246. trade_value = self.current_capital * self.current_position
  247. commission = trade_value * self.commission_rate
  248. self.current_capital -= commission
  249. # 更新资金
  250. self.current_capital += pnl
  251. # 记录交易
  252. self.current_trade.exit_time = time
  253. self.current_trade.exit_price = exec_price
  254. self.current_trade.pnl = pnl
  255. self.current_trade.pnl_pct = pnl_pct
  256. self.trades.append(self.current_trade)
  257. self.current_trade = None
  258. self.current_position = 0.0
  259. def _update_equity(self, time: datetime, price: float):
  260. """更新权益曲线,仅检查-20%熔断"""
  261. if self.current_position > 0 and self.current_trade:
  262. # 计算浮动盈亏
  263. if self.current_trade.direction == "long":
  264. unrealized_pct = (price - self.current_trade.entry_price) / self.current_trade.entry_price
  265. else:
  266. unrealized_pct = (self.current_trade.entry_price - price) / self.current_trade.entry_price
  267. unrealized_pnl = self.current_capital * self.current_position * unrealized_pct
  268. total_equity = self.current_capital + unrealized_pnl
  269. else:
  270. total_equity = self.current_capital
  271. self.equity_curve.append((time, total_equity))
  272. # 仅检查-20%熔断
  273. self._check_catastrophic_stop(time, price, total_equity)
  274. def _check_catastrophic_stop(self, time: datetime, price: float, total_equity: float):
  275. """
  276. 检查熔断和追踪止盈(路径一优化版)
  277. """
  278. # 更新峰值权益
  279. if total_equity > self.peak_equity:
  280. self.peak_equity = total_equity
  281. # 计算回撤
  282. if self.peak_equity > 0:
  283. drawdown = (total_equity - self.peak_equity) / self.peak_equity
  284. else:
  285. drawdown = 0.0
  286. # 1. 追踪止盈:从峰值回撤5%即出场
  287. if self.current_position > 0:
  288. if total_equity > self.trade_peak_equity:
  289. self.trade_peak_equity = total_equity
  290. trailing_drawdown = (total_equity - self.trade_peak_equity) / self.trade_peak_equity
  291. if trailing_drawdown <= -0.05: # 5%追踪止盈
  292. self.logger.log_system(
  293. EventType.RISK_WARNING,
  294. f"TRAILING STOP: Drawdown from peak {trailing_drawdown:.2%}. Closing trade.",
  295. level="warning"
  296. )
  297. self._close_trade(time, price, f"trailing_stop_{trailing_drawdown:.2%}")
  298. self.peak_equity = total_equity
  299. return
  300. # 2. -15%熔断
  301. if drawdown <= -self.catastrophic_drawdown:
  302. self.logger.log_system(
  303. EventType.RISK_WARNING,
  304. f"CATASTROPHIC STOP: Drawdown {drawdown:.2%}. Liquidating all positions.",
  305. level="critical"
  306. )
  307. if self.current_position > 0:
  308. self._close_trade(time, price, f"catastrophic_stop_{drawdown:.2%}")
  309. self.peak_equity = total_equity
  310. def _calculate_metrics(self, price_data: pd.DataFrame) -> BacktestResult:
  311. """计算回测绩效指标"""
  312. # 构建权益曲线
  313. equity_df = pd.DataFrame(
  314. self.equity_curve,
  315. columns=['timestamp', 'equity']
  316. ).set_index('timestamp')
  317. # 计算日收益率
  318. daily_equity = equity_df['equity'].resample('D').last().dropna()
  319. daily_returns = daily_equity.pct_change().dropna()
  320. # 基础指标
  321. total_return = (daily_equity.iloc[-1] - self.initial_capital) / self.initial_capital
  322. # 年化收益
  323. n_years = (daily_equity.index[-1] - daily_equity.index[0]).days / 365.25
  324. annual_return = (1 + total_return) ** (1 / n_years) - 1 if n_years > 0 else 0
  325. # 最大回撤
  326. rolling_max = daily_equity.cummax()
  327. drawdown = (daily_equity - rolling_max) / rolling_max
  328. max_drawdown = drawdown.min()
  329. # 回撤持续时间
  330. is_drawdown = drawdown < 0
  331. drawdown_starts = is_drawdown.ne(is_drawdown.shift()).cumsum()
  332. drawdown_durations = is_drawdown.groupby(drawdown_starts).sum()
  333. max_drawdown_duration = drawdown_durations.max() if len(drawdown_durations) > 0 else 0
  334. # 波动率和夏普
  335. volatility = daily_returns.std() * np.sqrt(252)
  336. sharpe_ratio = (annual_return - 0.03) / volatility if volatility > 0 else 0
  337. # Sortino比率
  338. downside_returns = daily_returns[daily_returns < 0]
  339. downside_std = downside_returns.std() * np.sqrt(252)
  340. sortino_ratio = (annual_return - 0.03) / downside_std if downside_std > 0 else 0
  341. # 交易统计
  342. total_trades = len(self.trades)
  343. winning_trades = sum(1 for t in self.trades if t.pnl > 0)
  344. losing_trades = total_trades - winning_trades
  345. win_rate = winning_trades / total_trades if total_trades > 0 else 0
  346. profits = [t.pnl for t in self.trades if t.pnl > 0]
  347. losses = [t.pnl for t in self.trades if t.pnl < 0]
  348. avg_profit = np.mean(profits) if profits else 0
  349. avg_loss = np.mean(losses) if losses else 0
  350. profit_factor = abs(sum(profits) / sum(losses)) if losses and sum(losses) != 0 else float('inf')
  351. # 归因分析
  352. regime_perf = self._analyze_regime_performance()
  353. agent_contrib = self._analyze_agent_contributions()
  354. return BacktestResult(
  355. start_date=daily_equity.index[0],
  356. end_date=daily_equity.index[-1],
  357. initial_capital=self.initial_capital,
  358. final_capital=daily_equity.iloc[-1],
  359. total_return=total_return,
  360. annual_return=annual_return,
  361. daily_returns=daily_returns,
  362. max_drawdown=max_drawdown,
  363. max_drawdown_duration=int(max_drawdown_duration),
  364. volatility=volatility,
  365. sharpe_ratio=sharpe_ratio,
  366. sortino_ratio=sortino_ratio,
  367. total_trades=total_trades,
  368. winning_trades=winning_trades,
  369. losing_trades=losing_trades,
  370. win_rate=win_rate,
  371. avg_profit=avg_profit,
  372. avg_loss=avg_loss,
  373. profit_factor=profit_factor,
  374. regime_performance=regime_perf,
  375. agent_contributions=agent_contrib,
  376. hard_stop_triggered=False,
  377. hard_stop_date=None,
  378. equity_curve=daily_equity,
  379. trades=self.trades
  380. )
  381. def _analyze_regime_performance(self) -> Dict[str, Dict]:
  382. """分析不同生态下的表现"""
  383. regime_stats = {}
  384. for trade in self.trades:
  385. regime = trade.ecosystem_state.get('macro', {}).get('regime', 'unknown')
  386. if regime not in regime_stats:
  387. regime_stats[regime] = {
  388. 'trades': 0,
  389. 'wins': 0,
  390. 'total_pnl': 0,
  391. 'avg_pnl': 0
  392. }
  393. regime_stats[regime]['trades'] += 1
  394. regime_stats[regime]['wins'] += 1 if trade.pnl > 0 else 0
  395. regime_stats[regime]['total_pnl'] += trade.pnl
  396. # 计算平均值
  397. for regime in regime_stats:
  398. stats = regime_stats[regime]
  399. stats['win_rate'] = stats['wins'] / stats['trades'] if stats['trades'] > 0 else 0
  400. stats['avg_pnl'] = stats['total_pnl'] / stats['trades'] if stats['trades'] > 0 else 0
  401. return regime_stats
  402. def _analyze_agent_contributions(self) -> Dict[str, float]:
  403. """分析各智能体贡献"""
  404. agent_pnl = {}
  405. for trade in self.trades:
  406. for agent_name, signal in trade.agent_signals.items():
  407. if agent_name not in agent_pnl:
  408. agent_pnl[agent_name] = 0
  409. # 简单归因:按信号置信度加权分配盈亏
  410. confidence = signal.get('confidence', 0.5)
  411. agent_pnl[agent_name] += trade.pnl * confidence
  412. return agent_pnl
  413. def generate_report(self, result: BacktestResult, output_path: str):
  414. """生成回测报告"""
  415. report = {
  416. "summary": {
  417. "测试区间": f"{result.start_date.strftime('%Y-%m-%d')} to {result.end_date.strftime('%Y-%m-%d')}",
  418. "初始资金": f"{result.initial_capital:,.0f}",
  419. "最终资金": f"{result.final_capital:,.0f}",
  420. "总收益率": f"{result.total_return:.2%}",
  421. "年化收益率": f"{result.annual_return:.2%}",
  422. "最大回撤": f"{result.max_drawdown:.2%}",
  423. "夏普比率": f"{result.sharpe_ratio:.2f}",
  424. "Sortino比率": f"{result.sortino_ratio:.2f}"
  425. },
  426. "trading_stats": {
  427. "总交易次数": result.total_trades,
  428. "盈利次数": result.winning_trades,
  429. "亏损次数": result.losing_trades,
  430. "胜率": f"{result.win_rate:.2%}",
  431. "平均盈利": f"{result.avg_profit:,.2f}",
  432. "平均亏损": f"{result.avg_loss:,.2f}",
  433. "盈亏比": f"{result.profit_factor:.2f}"
  434. },
  435. "regime_analysis": result.regime_performance,
  436. "agent_contributions": result.agent_contributions
  437. }
  438. # 目标达成检查
  439. targets = {
  440. "年化收益 25-35%": "✓" if 0.25 <= result.annual_return <= 0.35 else "✗",
  441. "最大回撤 <12%": "✓" if result.max_drawdown > -0.12 else "✗",
  442. "夏普比率 >1.5": "✓" if result.sharpe_ratio > 1.5 else "✗"
  443. }
  444. report["target_validation"] = targets
  445. # 保存JSON
  446. with open(output_path, 'w', encoding='utf-8') as f:
  447. json.dump(report, f, ensure_ascii=False, indent=2)
  448. self.logger.log_system(
  449. EventType.SYSTEM_STOP,
  450. f"Backtest report saved to {output_path}",
  451. level="info"
  452. )
  453. return report