engine.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. """
  2. 三龙出海主控引擎 (Dragon Rotation Engine)
  3. 整合多品种数据、品种选择、信号生成、仓位管理、风控体系
  4. 实现每日交易循环:数据更新 → 品种选择 → 信号生成 → 仓位调整 → 风控检查
  5. """
  6. from typing import Dict, List, Optional, Any
  7. from datetime import datetime, timedelta
  8. import pandas as pd
  9. import numpy as np
  10. from core.ecosystem import EcosystemFusion, UnifiedEcosystem
  11. from .data_loader import MultiAssetDataLoader
  12. from .selector import DragonSelector, SymbolScore
  13. from .signal_engine import UnifiedSignalEngine, SignalResult
  14. from .position_manager import PositionManager
  15. from .risk_manager import MultiLayerRiskManager, RiskCheckResult, RiskStatus
  16. class DragonRotationEngine:
  17. """
  18. 三龙出海多品种轮动引擎
  19. 核心流程:
  20. 1. 每日更新三品种数据
  21. 2. DragonSelector选择最优品种
  22. 3. UnifiedSignalEngine生成交易信号
  23. 4. PositionManager计算建议仓位
  24. 5. RiskManager执行风控检查
  25. 6. 执行交易并记录
  26. """
  27. def __init__(
  28. self,
  29. initial_capital: float = 1_000_000,
  30. score_threshold: float = 60,
  31. data_dir: str = "quant"
  32. ):
  33. self.initial_capital = initial_capital
  34. self.current_capital = initial_capital
  35. self.score_threshold = score_threshold
  36. # 初始化各模块
  37. self.data_loader = MultiAssetDataLoader(data_dir)
  38. self.ecosystem_fusion = EcosystemFusion()
  39. self.selector = DragonSelector(score_threshold=score_threshold)
  40. self.signal_engine = UnifiedSignalEngine()
  41. self.position_manager = PositionManager()
  42. self.risk_manager = MultiLayerRiskManager()
  43. # 当前状态
  44. self.current_symbol: Optional[str] = None
  45. self.current_position: float = 0.0
  46. self.in_position: bool = False
  47. # 数据缓存
  48. self.data_dict: Dict[str, pd.DataFrame] = {}
  49. # 交易记录
  50. self.trades: List[Dict] = []
  51. self.daily_stats: List[Dict] = []
  52. def initialize(self, start_date: Optional[str] = None):
  53. """初始化引擎,加载历史数据"""
  54. print("Initializing Dragon Rotation Engine...")
  55. # 加载三品种数据
  56. self.data_dict = self.data_loader.load_all_data(start_date)
  57. if len(self.data_dict) < 3:
  58. raise ValueError(f"Failed to load data. Only {len(self.data_dict)} symbols loaded.")
  59. print(f"Loaded data for: {list(self.data_dict.keys())}")
  60. # 初始化风险管理
  61. self.risk_manager.initialize(self.initial_capital, datetime.now())
  62. print("Engine initialized successfully.")
  63. def run_cycle(
  64. self,
  65. current_date: Optional[datetime] = None,
  66. verbose: bool = False
  67. ) -> Dict[str, Any]:
  68. """
  69. 执行一个交易周期
  70. Args:
  71. current_date: 当前日期(回测用)
  72. verbose: 是否打印详细信息
  73. Returns:
  74. Dict: 周期结果
  75. """
  76. if current_date is None:
  77. current_date = datetime.now()
  78. result = {
  79. "date": current_date,
  80. "symbol": None,
  81. "signal": "neutral",
  82. "position": 0.0,
  83. "executed": False,
  84. "reason": ""
  85. }
  86. try:
  87. # 1. 获取三品种当前数据窗口
  88. symbol_data = {}
  89. for symbol in self.data_dict.keys():
  90. df = self.data_loader.get_symbol_data_at_date(
  91. self.data_dict, symbol, current_date, lookback=80
  92. )
  93. if df is not None and len(df) >= 60:
  94. symbol_data[symbol] = df
  95. if len(symbol_data) < 3:
  96. result["reason"] = "Insufficient data"
  97. return result
  98. # 2. 识别当前生态
  99. # 使用第一个品种的数据进行生态识别
  100. first_symbol = list(symbol_data.keys())[0]
  101. ecosystem = self.ecosystem_fusion.fuse(
  102. price_data=symbol_data[first_symbol],
  103. tick_data=None
  104. )
  105. # 3. 品种选择
  106. selected_symbol, score_details = self.selector.select(
  107. symbol_data, ecosystem, current_date
  108. )
  109. if selected_symbol is None:
  110. # 空仓
  111. if self.in_position:
  112. # 平仓
  113. self._close_position(current_date, "no_selection")
  114. result["reason"] = "No symbol meets threshold"
  115. return result
  116. result["symbol"] = selected_symbol
  117. # 4. 检查是否需要切换品种
  118. if self.current_symbol is not None and self.current_symbol != selected_symbol:
  119. # 切换品种
  120. self._switch_symbol(self.current_symbol, selected_symbol, current_date)
  121. self.current_symbol = selected_symbol
  122. # 5. 生成交易信号
  123. signal = self.signal_engine.generate_signal(
  124. symbol_data[selected_symbol],
  125. current_date
  126. )
  127. result["signal"] = signal.signal
  128. # 6. 风控检查
  129. if self.in_position:
  130. # 更新持仓价格
  131. current_price = symbol_data[selected_symbol]['close'].iloc[-1]
  132. self.risk_manager.update_position(selected_symbol, current_price)
  133. # 检查风险
  134. risk_check = self.risk_manager.check_risk(selected_symbol, current_date)
  135. if risk_check.should_close_position:
  136. self._close_position(current_date, risk_check.message)
  137. result["reason"] = risk_check.message
  138. return result
  139. if risk_check.should_reduce_position:
  140. self._reduce_position(
  141. selected_symbol,
  142. risk_check.reduction_pct,
  143. current_date
  144. )
  145. # 7. 入场信号处理
  146. if signal.signal == "enter_long" and not self.in_position:
  147. # 计算仓位
  148. volatility = score_details.volatility_60d if score_details else 0.25
  149. position_size = self.position_manager.calculate_position(
  150. selected_symbol,
  151. signal.confidence,
  152. volatility,
  153. current_date
  154. )
  155. if position_size > 0:
  156. # 执行入场
  157. self._open_position(
  158. selected_symbol,
  159. position_size,
  160. symbol_data[selected_symbol]['close'].iloc[-1],
  161. current_date
  162. )
  163. result["position"] = position_size
  164. result["executed"] = True
  165. result["reason"] = "Entry signal"
  166. elif signal.signal == "exit" and self.in_position:
  167. # 出场信号
  168. self._close_position(current_date, "signal_exit")
  169. result["reason"] = "Exit signal"
  170. elif signal.signal == "hold" and self.in_position:
  171. # 持仓中,检查再平衡
  172. if self.position_manager.should_rebalance(current_date):
  173. self.position_manager.rebalance(symbol_data, current_date)
  174. # 8. 更新权益
  175. self._update_equity(current_date)
  176. # 记录每日统计
  177. self.daily_stats.append({
  178. "date": current_date,
  179. "symbol": selected_symbol,
  180. "score": score_details.total_score if score_details else 0,
  181. "signal": signal.signal,
  182. "position": self.current_position,
  183. "equity": self.current_capital
  184. })
  185. if verbose:
  186. print(f"[{current_date.strftime('%Y-%m-%d')}] "
  187. f"Symbol: {selected_symbol}, Signal: {signal.signal}, "
  188. f"Position: {self.current_position:.2%}")
  189. except Exception as e:
  190. result["reason"] = f"Error: {str(e)}"
  191. print(f"Error in cycle: {e}")
  192. return result
  193. def _open_position(
  194. self,
  195. symbol: str,
  196. position_size: float,
  197. price: float,
  198. date: datetime
  199. ):
  200. """开仓"""
  201. self.in_position = True
  202. self.current_position = position_size
  203. # 注册到风险管理
  204. self.risk_manager.register_position(symbol, price, position_size, date)
  205. # 记录交易
  206. self.trades.append({
  207. "date": date,
  208. "action": "open",
  209. "symbol": symbol,
  210. "price": price,
  211. "position": position_size,
  212. "equity": self.current_capital
  213. })
  214. def _close_position(self, date: datetime, reason: str):
  215. """平仓"""
  216. if not self.in_position:
  217. return
  218. # 记录交易
  219. self.trades.append({
  220. "date": date,
  221. "action": "close",
  222. "symbol": self.current_symbol,
  223. "reason": reason,
  224. "position": self.current_position,
  225. "equity": self.current_capital
  226. })
  227. # 清理状态
  228. self.in_position = False
  229. self.current_position = 0.0
  230. self.risk_manager.close_position(self.current_symbol)
  231. self.signal_engine._reset_position()
  232. def _reduce_position(self, symbol: str, reduction_pct: float, date: datetime):
  233. """减仓"""
  234. new_size = self.position_manager.reduce_position(symbol, reduction_pct)
  235. self.current_position = new_size
  236. self.trades.append({
  237. "date": date,
  238. "action": "reduce",
  239. "symbol": symbol,
  240. "reduction": reduction_pct,
  241. "new_position": new_size
  242. })
  243. def _switch_symbol(self, old_symbol: str, new_symbol: str, date: datetime):
  244. """切换品种"""
  245. # 先平旧仓位
  246. if self.in_position:
  247. self._close_position(date, f"switch_to_{new_symbol}")
  248. # 重置信号引擎状态
  249. self.signal_engine._reset_position()
  250. def _update_equity(self, date: datetime):
  251. """更新账户权益"""
  252. # 简化计算:实际权益更新在回测引擎中处理
  253. self.risk_manager.update_equity(self.current_capital, date)
  254. def get_performance_summary(self) -> Dict[str, Any]:
  255. """获取绩效摘要"""
  256. if not self.daily_stats:
  257. return {}
  258. equity_curve = [s["equity"] for s in self.daily_stats]
  259. # 计算收益率
  260. total_return = (equity_curve[-1] - self.initial_capital) / self.initial_capital
  261. # 计算最大回撤
  262. peak = self.initial_capital
  263. max_drawdown = 0.0
  264. for equity in equity_curve:
  265. if equity > peak:
  266. peak = equity
  267. drawdown = (peak - equity) / peak
  268. max_drawdown = max(max_drawdown, drawdown)
  269. # 计算年化收益
  270. days = len(self.daily_stats)
  271. annual_return = (1 + total_return) ** (252 / days) - 1 if days > 0 else 0
  272. # 计算波动率
  273. daily_returns = [(equity_curve[i] - equity_curve[i-1]) / equity_curve[i-1]
  274. for i in range(1, len(equity_curve))]
  275. volatility = np.std(daily_returns) * np.sqrt(252) if daily_returns else 0
  276. # 计算夏普比率
  277. sharpe = (annual_return - 0.03) / volatility if volatility > 0 else 0
  278. return {
  279. "total_return": total_return,
  280. "annual_return": annual_return,
  281. "max_drawdown": max_drawdown,
  282. "volatility": volatility,
  283. "sharpe_ratio": sharpe,
  284. "total_trades": len([t for t in self.trades if t["action"] == "open"]),
  285. "days": days
  286. }
  287. def reset(self):
  288. """重置引擎状态"""
  289. self.current_capital = self.initial_capital
  290. self.current_symbol = None
  291. self.current_position = 0.0
  292. self.in_position = False
  293. self.trades = []
  294. self.daily_stats = []
  295. self.selector.current_symbol = None
  296. self.signal_engine._reset_position()
  297. self.position_manager = PositionManager()
  298. self.risk_manager = MultiLayerRiskManager()
  299. self.risk_manager.initialize(self.initial_capital, datetime.now())