| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- """
- 动态路由算法
- 基于期望效用最大化原则,动态计算各智能体权重,激活高效用智能体
- """
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple, Any
- from datetime import datetime
- import uuid
- import numpy as np
- import pandas as pd
- from agents.base import AgentBase, AgentSignal
- from core.ecosystem import UnifiedEcosystem
- @dataclass
- class AgentWeight:
- """智能体权重数据结构"""
- agent_name: str
- weight: float # 0-1
- expected_utility: float
- win_probability: float
- expected_return: float
- risk_penalty: float
- recent_performance: float
- correlation_penalty: float = 0.0
- is_active: bool = True
- @dataclass
- class RoutingDecision:
- """路由决策结果"""
- weights: Dict[str, AgentWeight]
- active_agents: List[str]
- total_exposure: float
- correlation_matrix: Optional[pd.DataFrame] = None
- timestamp: datetime = field(default_factory=datetime.now)
- routing_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
- class DynamicAgentRouter:
- """
- 动态智能体路由器
- 核心算法:
- E[U] = P(Win|Regime) * Expected_Return - λ * Risk_Penalty + α * Recent_Performance
- 然后:
- 1. 仅激活 E[U] > threshold 的智能体
- 2. 仓位按 E[U] 比例分配
- 3. 当智能体间相关性 > threshold 时,进行组合优化降相关
- """
- def __init__(
- self,
- activation_threshold: float = 0.0, # 激进: 无门槛,只要有信号就参与
- correlation_threshold: float = 0.8, # 更高相关性阈值才降权
- lambda_risk: float = 0.3, # 降低风险惩罚,追求收益
- alpha_recent: float = 0.35, # 更重视近期表现
- max_correlation_reduction: float = 0.3 # 温和降相关
- ):
- self.activation_threshold = activation_threshold
- self.correlation_threshold = correlation_threshold
- self.lambda_risk = lambda_risk
- self.alpha_recent = alpha_recent
- self.max_correlation_reduction = max_correlation_reduction
- self.routing_history: List[RoutingDecision] = []
- def route(
- self,
- agents: Dict[str, AgentBase],
- price_data: pd.DataFrame,
- ecosystem: UnifiedEcosystem
- ) -> RoutingDecision:
- """
- 执行动态路由
- Args:
- agents: 所有可用的智能体
- price_data: 当前价格数据
- ecosystem: 当前市场生态
- Returns:
- RoutingDecision: 路由决策结果
- """
- if not agents:
- return RoutingDecision(
- weights={},
- active_agents=[],
- total_exposure=0.0
- )
- # 1. 计算各智能体的期望效用
- utilities = self._calculate_utilities(agents, price_data, ecosystem)
- # 2. 确定激活的智能体
- active_agents = self._determine_active_agents(utilities)
- # 3. 计算初始权重
- initial_weights = self._calculate_initial_weights(utilities, active_agents)
- # 4. 检查相关性并优化
- optimized_weights = self._optimize_correlation(
- agents, initial_weights, price_data
- )
- # 5. 归一化权重
- final_weights = self._normalize_weights(optimized_weights)
- # 6. 构建AgentWeight对象
- agent_weights = {}
- for name, weight in final_weights.items():
- agent_weights[name] = AgentWeight(
- agent_name=name,
- weight=weight,
- expected_utility=utilities[name],
- win_probability=agents[name].get_win_probability(price_data, ecosystem),
- expected_return=agents[name].get_expected_return(price_data, ecosystem),
- risk_penalty=abs(agents[name].performance_stats.get("max_drawdown", 0.1)),
- recent_performance=self._calculate_recent_performance(agents[name]),
- is_active=name in active_agents
- )
- # 7. 计算总暴露
- total_exposure = sum(
- w.weight for w in agent_weights.values() if w.is_active
- )
- decision = RoutingDecision(
- weights=agent_weights,
- active_agents=active_agents,
- total_exposure=min(1.0, total_exposure)
- )
- self.routing_history.append(decision)
- return decision
- def _calculate_utilities(
- self,
- agents: Dict[str, AgentBase],
- price_data: pd.DataFrame,
- ecosystem: UnifiedEcosystem
- ) -> Dict[str, float]:
- """计算各智能体的期望效用"""
- utilities = {}
- for name, agent in agents.items():
- if not agent.is_active:
- utilities[name] = float('-inf')
- continue
- # 使用智能体的calculate_utility方法
- utility = agent.calculate_utility(
- price_data=price_data,
- ecosystem=ecosystem,
- lambda_risk=self.lambda_risk,
- alpha_recent=self.alpha_recent
- )
- utilities[name] = utility
- return utilities
- def _determine_active_agents(self, utilities: Dict[str, float]) -> List[str]:
- """确定激活的智能体(E[U] > threshold)"""
- active = [
- name for name, utility in utilities.items()
- if utility > self.activation_threshold and utility != float('-inf')
- ]
- # 如果没有智能体达到阈值,选择效用最高的一个
- if not active and utilities:
- best_agent = max(utilities, key=utilities.get)
- if utilities[best_agent] != float('-inf'):
- active = [best_agent]
- return active
- def _calculate_initial_weights(
- self,
- utilities: Dict[str, float],
- active_agents: List[str]
- ) -> Dict[str, float]:
- """计算初始权重(基于效用比例)"""
- if not active_agents:
- return {}
- # 只考虑正效用
- positive_utilities = {
- name: max(0, utilities[name]) for name in active_agents
- }
- total_utility = sum(positive_utilities.values())
- if total_utility == 0:
- # 平均分配
- return {name: 1.0 / len(active_agents) for name in active_agents}
- weights = {
- name: utility / total_utility
- for name, utility in positive_utilities.items()
- }
- return weights
- def _optimize_correlation(
- self,
- agents: Dict[str, AgentBase],
- weights: Dict[str, float],
- price_data: pd.DataFrame
- ) -> Dict[str, float]:
- """
- 相关性优化
- 当智能体间历史信号相关性 > threshold 时,降低相关性高的智能体权重
- """
- if len(weights) < 2:
- return weights
- # 计算智能体间历史信号相关性
- correlation_matrix = self._calculate_agent_correlations(agents, price_data)
- if correlation_matrix is None:
- return weights
- optimized_weights = weights.copy()
- # 找出高相关性对
- high_corr_pairs = []
- agent_names = list(weights.keys())
- for i, name1 in enumerate(agent_names):
- for name2 in agent_names[i+1:]:
- if name1 in correlation_matrix.index and name2 in correlation_matrix.columns:
- corr = abs(correlation_matrix.loc[name1, name2])
- if corr > self.correlation_threshold:
- high_corr_pairs.append((name1, name2, corr))
- # 对高相关性对进行降权
- for name1, name2, corr in high_corr_pairs:
- # 降低效用较低的智能体权重
- if weights[name1] < weights[name2]:
- reduction = (corr - self.correlation_threshold) * self.max_correlation_reduction
- optimized_weights[name1] *= (1 - reduction)
- else:
- reduction = (corr - self.correlation_threshold) * self.max_correlation_reduction
- optimized_weights[name2] *= (1 - reduction)
- return optimized_weights
- def _normalize_weights(self, weights: Dict[str, float]) -> Dict[str, float]:
- """归一化权重使总和为1"""
- if not weights:
- return {}
- total = sum(weights.values())
- if total == 0:
- return {name: 1.0 / len(weights) for name in weights}
- return {name: w / total for name, w in weights.items()}
- def _calculate_agent_correlations(
- self,
- agents: Dict[str, AgentBase],
- price_data: pd.DataFrame
- ) -> Optional[pd.DataFrame]:
- """计算智能体历史信号的相关性矩阵"""
- agent_signals = {}
- for name, agent in agents.items():
- if len(agent.signal_history) >= 10:
- # 提取最近信号的方向序列
- signals = [
- 1 if s.direction.value == "long" else -1 if s.direction.value == "short" else 0
- for s in agent.signal_history[-20:]
- ]
- agent_signals[name] = signals
- if len(agent_signals) < 2:
- return None
- # 创建DataFrame并计算相关性
- df = pd.DataFrame(agent_signals)
- return df.corr()
- def _calculate_recent_performance(self, agent: AgentBase) -> float:
- """计算智能体近期表现"""
- recent_trades = agent.trade_history[-10:]
- if not recent_trades:
- return 0.0
- wins = sum(1 for t in recent_trades if t.get("return", 0) > 0)
- return wins / len(recent_trades) * 0.1 # 归一化到0-0.1范围
- def get_routing_summary(self) -> Dict[str, Any]:
- """获取路由历史摘要"""
- if not self.routing_history:
- return {}
- latest = self.routing_history[-1]
- return {
- "total_routings": len(self.routing_history),
- "latest_active_agents": latest.active_agents,
- "latest_total_exposure": latest.total_exposure,
- "avg_active_agents": np.mean([
- len(r.active_agents) for r in self.routing_history
- ]),
- "avg_exposure": np.mean([
- r.total_exposure for r in self.routing_history
- ])
- }
|