""" CYB50-Pro 交易引擎主控 整合所有模块的主控循环 """ from typing import Dict, Optional, Any from datetime import datetime import pandas as pd from core.ecosystem import EcosystemFusion, UnifiedEcosystem from core.signal_fusion import SignalCollector, BayesianSignalFusion from agents import ( AgentBase, DynamicAgentRouter, AgentCoordinator, BreakoutAgent, MeanReversionAgent ) from risk import RiskManager from utils.logger import get_logger, EventType class CYB50ProEngine: """ CYB50-Pro 交易引擎 主控流程: 1. 获取市场数据 2. 识别市场生态 3. 路由智能体并生成信号 4. 融合信号 5. 风险管理检查 6. 执行交易 """ def __init__(self, config: Optional[Dict] = None): self.config = config or {} self.logger = get_logger() # 初始化各模块 self.ecosystem_fusion = EcosystemFusion() self.signal_collector = SignalCollector() self.signal_fusion = BayesianSignalFusion() self.agent_router = DynamicAgentRouter() self.agent_coordinator = AgentCoordinator() self.risk_manager = RiskManager() # 初始化风险预算 self.risk_manager.budget_manager.allocate_monthly_budget(1_000_000) # 初始化智能体 - 仅Breakout(当前最优单策略) self.agents: Dict[str, AgentBase] = { "breakout": BreakoutAgent(), } self.is_running = False def run_cycle( self, price_data: pd.DataFrame, account_value: float, tick_data: Optional[pd.DataFrame] = None ) -> Dict[str, Any]: """ 执行一个交易周期 Args: price_data: 价格数据 account_value: 账户价值 tick_data: tick数据(可选) Returns: 周期结果字典 """ results = { "timestamp": datetime.now(), "signals": {}, "executed": False } try: # 1. 识别市场生态 ecosystem = self.ecosystem_fusion.fuse( price_data=price_data, tick_data=tick_data ) results["ecosystem"] = ecosystem.to_dict() self.logger.log_ecosystem( EventType.ECOSYSTEM_CHANGE, ecosystem.macro.regime.value, ecosystem.meso.health_score, ecosystem.micro.state.value, ecosystem.confidence ) # 2. 智能体路由 routing = self.agent_router.route( self.agents, price_data, ecosystem ) results["routing"] = { "active_agents": routing.active_agents, "weights": {k: v.weight for k, v in routing.weights.items()} } # 3. 生成信号 agent_signals = {} for name in routing.active_agents: if name in self.agents: signal = self.agents[name].generate_signal(price_data, ecosystem) if signal: agent_signals[name] = signal results["signals"][name] = { "direction": signal.direction.value, "confidence": signal.confidence, "position": signal.suggested_position } # 4. 协同处理 if agent_signals: coordinated = self.agent_coordinator.coordinate( agent_signals, {k: v.weight for k, v in routing.weights.items()} ) results["coordinated"] = { "direction": coordinated.final_direction.value, "position": coordinated.final_position, "reasoning": coordinated.reasoning } # 5. 风险管理检查 risk_check = self.risk_manager.check_trade_permission( account_value=account_value, proposed_position=coordinated.final_position, entry_price=price_data['close'].iloc[-1] ) results["risk_check"] = { "can_trade": risk_check.can_trade, "allowed_position": risk_check.allowed_position, "risk_level": risk_check.risk_level, "warnings": risk_check.warnings } if risk_check.can_trade and coordinated.final_position > 0: results["executed"] = True results["final_position"] = min( coordinated.final_position, risk_check.allowed_position ) # 更新风险状态 self.risk_manager.update_account_state(account_value) except Exception as e: self.logger.log_system( EventType.SYSTEM_ERROR, f"Engine cycle error: {str(e)}", level="error" ) results["error"] = str(e) return results def start(self): """启动引擎""" self.is_running = True self.logger.log_system( EventType.SYSTEM_START, "CYB50-Pro engine started", level="info" ) def stop(self): """停止引擎""" self.is_running = False self.logger.log_system( EventType.SYSTEM_STOP, "CYB50-Pro engine stopped", level="info" ) def get_status(self) -> Dict[str, Any]: """获取引擎状态""" return { "is_running": self.is_running, "agents": {name: agent.get_performance_summary() for name, agent in self.agents.items()}, "risk_summary": self.risk_manager.get_risk_summary() }