| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- """
- CYB50-Pro 交易引擎主控
- 整合所有模块的主控循环
- """
- from typing import Dict, Optional, Any
- from datetime import datetime
- import pandas as pd
- from core.ecosystem import EcosystemFusion, UnifiedEcosystem
- from core.signal_fusion import SignalCollector, BayesianSignalFusion
- from agents import (
- AgentBase, DynamicAgentRouter, AgentCoordinator,
- BreakoutAgent, MeanReversionAgent
- )
- from risk import RiskManager
- from utils.logger import get_logger, EventType
- class CYB50ProEngine:
- """
- CYB50-Pro 交易引擎
- 主控流程:
- 1. 获取市场数据
- 2. 识别市场生态
- 3. 路由智能体并生成信号
- 4. 融合信号
- 5. 风险管理检查
- 6. 执行交易
- """
- def __init__(self, config: Optional[Dict] = None):
- self.config = config or {}
- self.logger = get_logger()
- # 初始化各模块
- self.ecosystem_fusion = EcosystemFusion()
- self.signal_collector = SignalCollector()
- self.signal_fusion = BayesianSignalFusion()
- self.agent_router = DynamicAgentRouter()
- self.agent_coordinator = AgentCoordinator()
- self.risk_manager = RiskManager()
- # 初始化风险预算
- self.risk_manager.budget_manager.allocate_monthly_budget(1_000_000)
- # 初始化智能体 - 仅Breakout(当前最优单策略)
- self.agents: Dict[str, AgentBase] = {
- "breakout": BreakoutAgent(),
- }
- self.is_running = False
- def run_cycle(
- self,
- price_data: pd.DataFrame,
- account_value: float,
- tick_data: Optional[pd.DataFrame] = None
- ) -> Dict[str, Any]:
- """
- 执行一个交易周期
- Args:
- price_data: 价格数据
- account_value: 账户价值
- tick_data: tick数据(可选)
- Returns:
- 周期结果字典
- """
- results = {
- "timestamp": datetime.now(),
- "signals": {},
- "executed": False
- }
- try:
- # 1. 识别市场生态
- ecosystem = self.ecosystem_fusion.fuse(
- price_data=price_data,
- tick_data=tick_data
- )
- results["ecosystem"] = ecosystem.to_dict()
- self.logger.log_ecosystem(
- EventType.ECOSYSTEM_CHANGE,
- ecosystem.macro.regime.value,
- ecosystem.meso.health_score,
- ecosystem.micro.state.value,
- ecosystem.confidence
- )
- # 2. 智能体路由
- routing = self.agent_router.route(
- self.agents,
- price_data,
- ecosystem
- )
- results["routing"] = {
- "active_agents": routing.active_agents,
- "weights": {k: v.weight for k, v in routing.weights.items()}
- }
- # 3. 生成信号
- agent_signals = {}
- for name in routing.active_agents:
- if name in self.agents:
- signal = self.agents[name].generate_signal(price_data, ecosystem)
- if signal:
- agent_signals[name] = signal
- results["signals"][name] = {
- "direction": signal.direction.value,
- "confidence": signal.confidence,
- "position": signal.suggested_position
- }
- # 4. 协同处理
- if agent_signals:
- coordinated = self.agent_coordinator.coordinate(
- agent_signals,
- {k: v.weight for k, v in routing.weights.items()}
- )
- results["coordinated"] = {
- "direction": coordinated.final_direction.value,
- "position": coordinated.final_position,
- "reasoning": coordinated.reasoning
- }
- # 5. 风险管理检查
- risk_check = self.risk_manager.check_trade_permission(
- account_value=account_value,
- proposed_position=coordinated.final_position,
- entry_price=price_data['close'].iloc[-1]
- )
- results["risk_check"] = {
- "can_trade": risk_check.can_trade,
- "allowed_position": risk_check.allowed_position,
- "risk_level": risk_check.risk_level,
- "warnings": risk_check.warnings
- }
- if risk_check.can_trade and coordinated.final_position > 0:
- results["executed"] = True
- results["final_position"] = min(
- coordinated.final_position,
- risk_check.allowed_position
- )
- # 更新风险状态
- self.risk_manager.update_account_state(account_value)
- except Exception as e:
- self.logger.log_system(
- EventType.SYSTEM_ERROR,
- f"Engine cycle error: {str(e)}",
- level="error"
- )
- results["error"] = str(e)
- return results
- def start(self):
- """启动引擎"""
- self.is_running = True
- self.logger.log_system(
- EventType.SYSTEM_START,
- "CYB50-Pro engine started",
- level="info"
- )
- def stop(self):
- """停止引擎"""
- self.is_running = False
- self.logger.log_system(
- EventType.SYSTEM_STOP,
- "CYB50-Pro engine stopped",
- level="info"
- )
- def get_status(self) -> Dict[str, Any]:
- """获取引擎状态"""
- return {
- "is_running": self.is_running,
- "agents": {name: agent.get_performance_summary()
- for name, agent in self.agents.items()},
- "risk_summary": self.risk_manager.get_risk_summary()
- }
|