""" 动态路由算法 基于期望效用最大化原则,动态计算各智能体权重,激活高效用智能体 """ from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Any from datetime import datetime import uuid import numpy as np import pandas as pd from agents.base import AgentBase, AgentSignal from core.ecosystem import UnifiedEcosystem @dataclass class AgentWeight: """智能体权重数据结构""" agent_name: str weight: float # 0-1 expected_utility: float win_probability: float expected_return: float risk_penalty: float recent_performance: float correlation_penalty: float = 0.0 is_active: bool = True @dataclass class RoutingDecision: """路由决策结果""" weights: Dict[str, AgentWeight] active_agents: List[str] total_exposure: float correlation_matrix: Optional[pd.DataFrame] = None timestamp: datetime = field(default_factory=datetime.now) routing_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) class DynamicAgentRouter: """ 动态智能体路由器 核心算法: E[U] = P(Win|Regime) * Expected_Return - λ * Risk_Penalty + α * Recent_Performance 然后: 1. 仅激活 E[U] > threshold 的智能体 2. 仓位按 E[U] 比例分配 3. 当智能体间相关性 > threshold 时,进行组合优化降相关 """ def __init__( self, activation_threshold: float = 0.0, # 激进: 无门槛,只要有信号就参与 correlation_threshold: float = 0.8, # 更高相关性阈值才降权 lambda_risk: float = 0.3, # 降低风险惩罚,追求收益 alpha_recent: float = 0.35, # 更重视近期表现 max_correlation_reduction: float = 0.3 # 温和降相关 ): self.activation_threshold = activation_threshold self.correlation_threshold = correlation_threshold self.lambda_risk = lambda_risk self.alpha_recent = alpha_recent self.max_correlation_reduction = max_correlation_reduction self.routing_history: List[RoutingDecision] = [] def route( self, agents: Dict[str, AgentBase], price_data: pd.DataFrame, ecosystem: UnifiedEcosystem ) -> RoutingDecision: """ 执行动态路由 Args: agents: 所有可用的智能体 price_data: 当前价格数据 ecosystem: 当前市场生态 Returns: RoutingDecision: 路由决策结果 """ if not agents: return RoutingDecision( weights={}, active_agents=[], total_exposure=0.0 ) # 1. 计算各智能体的期望效用 utilities = self._calculate_utilities(agents, price_data, ecosystem) # 2. 确定激活的智能体 active_agents = self._determine_active_agents(utilities) # 3. 计算初始权重 initial_weights = self._calculate_initial_weights(utilities, active_agents) # 4. 检查相关性并优化 optimized_weights = self._optimize_correlation( agents, initial_weights, price_data ) # 5. 归一化权重 final_weights = self._normalize_weights(optimized_weights) # 6. 构建AgentWeight对象 agent_weights = {} for name, weight in final_weights.items(): agent_weights[name] = AgentWeight( agent_name=name, weight=weight, expected_utility=utilities[name], win_probability=agents[name].get_win_probability(price_data, ecosystem), expected_return=agents[name].get_expected_return(price_data, ecosystem), risk_penalty=abs(agents[name].performance_stats.get("max_drawdown", 0.1)), recent_performance=self._calculate_recent_performance(agents[name]), is_active=name in active_agents ) # 7. 计算总暴露 total_exposure = sum( w.weight for w in agent_weights.values() if w.is_active ) decision = RoutingDecision( weights=agent_weights, active_agents=active_agents, total_exposure=min(1.0, total_exposure) ) self.routing_history.append(decision) return decision def _calculate_utilities( self, agents: Dict[str, AgentBase], price_data: pd.DataFrame, ecosystem: UnifiedEcosystem ) -> Dict[str, float]: """计算各智能体的期望效用""" utilities = {} for name, agent in agents.items(): if not agent.is_active: utilities[name] = float('-inf') continue # 使用智能体的calculate_utility方法 utility = agent.calculate_utility( price_data=price_data, ecosystem=ecosystem, lambda_risk=self.lambda_risk, alpha_recent=self.alpha_recent ) utilities[name] = utility return utilities def _determine_active_agents(self, utilities: Dict[str, float]) -> List[str]: """确定激活的智能体(E[U] > threshold)""" active = [ name for name, utility in utilities.items() if utility > self.activation_threshold and utility != float('-inf') ] # 如果没有智能体达到阈值,选择效用最高的一个 if not active and utilities: best_agent = max(utilities, key=utilities.get) if utilities[best_agent] != float('-inf'): active = [best_agent] return active def _calculate_initial_weights( self, utilities: Dict[str, float], active_agents: List[str] ) -> Dict[str, float]: """计算初始权重(基于效用比例)""" if not active_agents: return {} # 只考虑正效用 positive_utilities = { name: max(0, utilities[name]) for name in active_agents } total_utility = sum(positive_utilities.values()) if total_utility == 0: # 平均分配 return {name: 1.0 / len(active_agents) for name in active_agents} weights = { name: utility / total_utility for name, utility in positive_utilities.items() } return weights def _optimize_correlation( self, agents: Dict[str, AgentBase], weights: Dict[str, float], price_data: pd.DataFrame ) -> Dict[str, float]: """ 相关性优化 当智能体间历史信号相关性 > threshold 时,降低相关性高的智能体权重 """ if len(weights) < 2: return weights # 计算智能体间历史信号相关性 correlation_matrix = self._calculate_agent_correlations(agents, price_data) if correlation_matrix is None: return weights optimized_weights = weights.copy() # 找出高相关性对 high_corr_pairs = [] agent_names = list(weights.keys()) for i, name1 in enumerate(agent_names): for name2 in agent_names[i+1:]: if name1 in correlation_matrix.index and name2 in correlation_matrix.columns: corr = abs(correlation_matrix.loc[name1, name2]) if corr > self.correlation_threshold: high_corr_pairs.append((name1, name2, corr)) # 对高相关性对进行降权 for name1, name2, corr in high_corr_pairs: # 降低效用较低的智能体权重 if weights[name1] < weights[name2]: reduction = (corr - self.correlation_threshold) * self.max_correlation_reduction optimized_weights[name1] *= (1 - reduction) else: reduction = (corr - self.correlation_threshold) * self.max_correlation_reduction optimized_weights[name2] *= (1 - reduction) return optimized_weights def _normalize_weights(self, weights: Dict[str, float]) -> Dict[str, float]: """归一化权重使总和为1""" if not weights: return {} total = sum(weights.values()) if total == 0: return {name: 1.0 / len(weights) for name in weights} return {name: w / total for name, w in weights.items()} def _calculate_agent_correlations( self, agents: Dict[str, AgentBase], price_data: pd.DataFrame ) -> Optional[pd.DataFrame]: """计算智能体历史信号的相关性矩阵""" agent_signals = {} for name, agent in agents.items(): if len(agent.signal_history) >= 10: # 提取最近信号的方向序列 signals = [ 1 if s.direction.value == "long" else -1 if s.direction.value == "short" else 0 for s in agent.signal_history[-20:] ] agent_signals[name] = signals if len(agent_signals) < 2: return None # 创建DataFrame并计算相关性 df = pd.DataFrame(agent_signals) return df.corr() def _calculate_recent_performance(self, agent: AgentBase) -> float: """计算智能体近期表现""" recent_trades = agent.trade_history[-10:] if not recent_trades: return 0.0 wins = sum(1 for t in recent_trades if t.get("return", 0) > 0) return wins / len(recent_trades) * 0.1 # 归一化到0-0.1范围 def get_routing_summary(self) -> Dict[str, Any]: """获取路由历史摘要""" if not self.routing_history: return {} latest = self.routing_history[-1] return { "total_routings": len(self.routing_history), "latest_active_agents": latest.active_agents, "latest_total_exposure": latest.total_exposure, "avg_active_agents": np.mean([ len(r.active_agents) for r in self.routing_history ]), "avg_exposure": np.mean([ r.total_exposure for r in self.routing_history ]) }