engine.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. """
  2. CYB50-Pro 交易引擎主控
  3. 整合所有模块的主控循环
  4. """
  5. from typing import Dict, Optional, Any
  6. from datetime import datetime
  7. import pandas as pd
  8. from core.ecosystem import EcosystemFusion, UnifiedEcosystem
  9. from core.signal_fusion import SignalCollector, BayesianSignalFusion
  10. from agents import (
  11. AgentBase, DynamicAgentRouter, AgentCoordinator,
  12. BreakoutAgent, MeanReversionAgent
  13. )
  14. from risk import RiskManager
  15. from utils.logger import get_logger, EventType
  16. class CYB50ProEngine:
  17. """
  18. CYB50-Pro 交易引擎
  19. 主控流程:
  20. 1. 获取市场数据
  21. 2. 识别市场生态
  22. 3. 路由智能体并生成信号
  23. 4. 融合信号
  24. 5. 风险管理检查
  25. 6. 执行交易
  26. """
  27. def __init__(self, config: Optional[Dict] = None):
  28. self.config = config or {}
  29. self.logger = get_logger()
  30. # 初始化各模块
  31. self.ecosystem_fusion = EcosystemFusion()
  32. self.signal_collector = SignalCollector()
  33. self.signal_fusion = BayesianSignalFusion()
  34. self.agent_router = DynamicAgentRouter()
  35. self.agent_coordinator = AgentCoordinator()
  36. self.risk_manager = RiskManager()
  37. # 初始化风险预算
  38. self.risk_manager.budget_manager.allocate_monthly_budget(1_000_000)
  39. # 初始化智能体 - 仅Breakout(当前最优单策略)
  40. self.agents: Dict[str, AgentBase] = {
  41. "breakout": BreakoutAgent(),
  42. }
  43. self.is_running = False
  44. def run_cycle(
  45. self,
  46. price_data: pd.DataFrame,
  47. account_value: float,
  48. tick_data: Optional[pd.DataFrame] = None
  49. ) -> Dict[str, Any]:
  50. """
  51. 执行一个交易周期
  52. Args:
  53. price_data: 价格数据
  54. account_value: 账户价值
  55. tick_data: tick数据(可选)
  56. Returns:
  57. 周期结果字典
  58. """
  59. results = {
  60. "timestamp": datetime.now(),
  61. "signals": {},
  62. "executed": False
  63. }
  64. try:
  65. # 1. 识别市场生态
  66. ecosystem = self.ecosystem_fusion.fuse(
  67. price_data=price_data,
  68. tick_data=tick_data
  69. )
  70. results["ecosystem"] = ecosystem.to_dict()
  71. self.logger.log_ecosystem(
  72. EventType.ECOSYSTEM_CHANGE,
  73. ecosystem.macro.regime.value,
  74. ecosystem.meso.health_score,
  75. ecosystem.micro.state.value,
  76. ecosystem.confidence
  77. )
  78. # 2. 智能体路由
  79. routing = self.agent_router.route(
  80. self.agents,
  81. price_data,
  82. ecosystem
  83. )
  84. results["routing"] = {
  85. "active_agents": routing.active_agents,
  86. "weights": {k: v.weight for k, v in routing.weights.items()}
  87. }
  88. # 3. 生成信号
  89. agent_signals = {}
  90. for name in routing.active_agents:
  91. if name in self.agents:
  92. signal = self.agents[name].generate_signal(price_data, ecosystem)
  93. if signal:
  94. agent_signals[name] = signal
  95. results["signals"][name] = {
  96. "direction": signal.direction.value,
  97. "confidence": signal.confidence,
  98. "position": signal.suggested_position
  99. }
  100. # 4. 协同处理
  101. if agent_signals:
  102. coordinated = self.agent_coordinator.coordinate(
  103. agent_signals,
  104. {k: v.weight for k, v in routing.weights.items()}
  105. )
  106. results["coordinated"] = {
  107. "direction": coordinated.final_direction.value,
  108. "position": coordinated.final_position,
  109. "reasoning": coordinated.reasoning
  110. }
  111. # 5. 风险管理检查
  112. risk_check = self.risk_manager.check_trade_permission(
  113. account_value=account_value,
  114. proposed_position=coordinated.final_position,
  115. entry_price=price_data['close'].iloc[-1]
  116. )
  117. results["risk_check"] = {
  118. "can_trade": risk_check.can_trade,
  119. "allowed_position": risk_check.allowed_position,
  120. "risk_level": risk_check.risk_level,
  121. "warnings": risk_check.warnings
  122. }
  123. if risk_check.can_trade and coordinated.final_position > 0:
  124. results["executed"] = True
  125. results["final_position"] = min(
  126. coordinated.final_position,
  127. risk_check.allowed_position
  128. )
  129. # 更新风险状态
  130. self.risk_manager.update_account_state(account_value)
  131. except Exception as e:
  132. self.logger.log_system(
  133. EventType.SYSTEM_ERROR,
  134. f"Engine cycle error: {str(e)}",
  135. level="error"
  136. )
  137. results["error"] = str(e)
  138. return results
  139. def start(self):
  140. """启动引擎"""
  141. self.is_running = True
  142. self.logger.log_system(
  143. EventType.SYSTEM_START,
  144. "CYB50-Pro engine started",
  145. level="info"
  146. )
  147. def stop(self):
  148. """停止引擎"""
  149. self.is_running = False
  150. self.logger.log_system(
  151. EventType.SYSTEM_STOP,
  152. "CYB50-Pro engine stopped",
  153. level="info"
  154. )
  155. def get_status(self) -> Dict[str, Any]:
  156. """获取引擎状态"""
  157. return {
  158. "is_running": self.is_running,
  159. "agents": {name: agent.get_performance_summary()
  160. for name, agent in self.agents.items()},
  161. "risk_summary": self.risk_manager.get_risk_summary()
  162. }