router.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. """
  2. 动态路由算法
  3. 基于期望效用最大化原则,动态计算各智能体权重,激活高效用智能体
  4. """
  5. from dataclasses import dataclass, field
  6. from typing import Dict, List, Optional, Tuple, Any
  7. from datetime import datetime
  8. import uuid
  9. import numpy as np
  10. import pandas as pd
  11. from agents.base import AgentBase, AgentSignal
  12. from core.ecosystem import UnifiedEcosystem
  13. @dataclass
  14. class AgentWeight:
  15. """智能体权重数据结构"""
  16. agent_name: str
  17. weight: float # 0-1
  18. expected_utility: float
  19. win_probability: float
  20. expected_return: float
  21. risk_penalty: float
  22. recent_performance: float
  23. correlation_penalty: float = 0.0
  24. is_active: bool = True
  25. @dataclass
  26. class RoutingDecision:
  27. """路由决策结果"""
  28. weights: Dict[str, AgentWeight]
  29. active_agents: List[str]
  30. total_exposure: float
  31. correlation_matrix: Optional[pd.DataFrame] = None
  32. timestamp: datetime = field(default_factory=datetime.now)
  33. routing_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
  34. class DynamicAgentRouter:
  35. """
  36. 动态智能体路由器
  37. 核心算法:
  38. E[U] = P(Win|Regime) * Expected_Return - λ * Risk_Penalty + α * Recent_Performance
  39. 然后:
  40. 1. 仅激活 E[U] > threshold 的智能体
  41. 2. 仓位按 E[U] 比例分配
  42. 3. 当智能体间相关性 > threshold 时,进行组合优化降相关
  43. """
  44. def __init__(
  45. self,
  46. activation_threshold: float = 0.0, # 激进: 无门槛,只要有信号就参与
  47. correlation_threshold: float = 0.8, # 更高相关性阈值才降权
  48. lambda_risk: float = 0.3, # 降低风险惩罚,追求收益
  49. alpha_recent: float = 0.35, # 更重视近期表现
  50. max_correlation_reduction: float = 0.3 # 温和降相关
  51. ):
  52. self.activation_threshold = activation_threshold
  53. self.correlation_threshold = correlation_threshold
  54. self.lambda_risk = lambda_risk
  55. self.alpha_recent = alpha_recent
  56. self.max_correlation_reduction = max_correlation_reduction
  57. self.routing_history: List[RoutingDecision] = []
  58. def route(
  59. self,
  60. agents: Dict[str, AgentBase],
  61. price_data: pd.DataFrame,
  62. ecosystem: UnifiedEcosystem
  63. ) -> RoutingDecision:
  64. """
  65. 执行动态路由
  66. Args:
  67. agents: 所有可用的智能体
  68. price_data: 当前价格数据
  69. ecosystem: 当前市场生态
  70. Returns:
  71. RoutingDecision: 路由决策结果
  72. """
  73. if not agents:
  74. return RoutingDecision(
  75. weights={},
  76. active_agents=[],
  77. total_exposure=0.0
  78. )
  79. # 1. 计算各智能体的期望效用
  80. utilities = self._calculate_utilities(agents, price_data, ecosystem)
  81. # 2. 确定激活的智能体
  82. active_agents = self._determine_active_agents(utilities)
  83. # 3. 计算初始权重
  84. initial_weights = self._calculate_initial_weights(utilities, active_agents)
  85. # 4. 检查相关性并优化
  86. optimized_weights = self._optimize_correlation(
  87. agents, initial_weights, price_data
  88. )
  89. # 5. 归一化权重
  90. final_weights = self._normalize_weights(optimized_weights)
  91. # 6. 构建AgentWeight对象
  92. agent_weights = {}
  93. for name, weight in final_weights.items():
  94. agent_weights[name] = AgentWeight(
  95. agent_name=name,
  96. weight=weight,
  97. expected_utility=utilities[name],
  98. win_probability=agents[name].get_win_probability(price_data, ecosystem),
  99. expected_return=agents[name].get_expected_return(price_data, ecosystem),
  100. risk_penalty=abs(agents[name].performance_stats.get("max_drawdown", 0.1)),
  101. recent_performance=self._calculate_recent_performance(agents[name]),
  102. is_active=name in active_agents
  103. )
  104. # 7. 计算总暴露
  105. total_exposure = sum(
  106. w.weight for w in agent_weights.values() if w.is_active
  107. )
  108. decision = RoutingDecision(
  109. weights=agent_weights,
  110. active_agents=active_agents,
  111. total_exposure=min(1.0, total_exposure)
  112. )
  113. self.routing_history.append(decision)
  114. return decision
  115. def _calculate_utilities(
  116. self,
  117. agents: Dict[str, AgentBase],
  118. price_data: pd.DataFrame,
  119. ecosystem: UnifiedEcosystem
  120. ) -> Dict[str, float]:
  121. """计算各智能体的期望效用"""
  122. utilities = {}
  123. for name, agent in agents.items():
  124. if not agent.is_active:
  125. utilities[name] = float('-inf')
  126. continue
  127. # 使用智能体的calculate_utility方法
  128. utility = agent.calculate_utility(
  129. price_data=price_data,
  130. ecosystem=ecosystem,
  131. lambda_risk=self.lambda_risk,
  132. alpha_recent=self.alpha_recent
  133. )
  134. utilities[name] = utility
  135. return utilities
  136. def _determine_active_agents(self, utilities: Dict[str, float]) -> List[str]:
  137. """确定激活的智能体(E[U] > threshold)"""
  138. active = [
  139. name for name, utility in utilities.items()
  140. if utility > self.activation_threshold and utility != float('-inf')
  141. ]
  142. # 如果没有智能体达到阈值,选择效用最高的一个
  143. if not active and utilities:
  144. best_agent = max(utilities, key=utilities.get)
  145. if utilities[best_agent] != float('-inf'):
  146. active = [best_agent]
  147. return active
  148. def _calculate_initial_weights(
  149. self,
  150. utilities: Dict[str, float],
  151. active_agents: List[str]
  152. ) -> Dict[str, float]:
  153. """计算初始权重(基于效用比例)"""
  154. if not active_agents:
  155. return {}
  156. # 只考虑正效用
  157. positive_utilities = {
  158. name: max(0, utilities[name]) for name in active_agents
  159. }
  160. total_utility = sum(positive_utilities.values())
  161. if total_utility == 0:
  162. # 平均分配
  163. return {name: 1.0 / len(active_agents) for name in active_agents}
  164. weights = {
  165. name: utility / total_utility
  166. for name, utility in positive_utilities.items()
  167. }
  168. return weights
  169. def _optimize_correlation(
  170. self,
  171. agents: Dict[str, AgentBase],
  172. weights: Dict[str, float],
  173. price_data: pd.DataFrame
  174. ) -> Dict[str, float]:
  175. """
  176. 相关性优化
  177. 当智能体间历史信号相关性 > threshold 时,降低相关性高的智能体权重
  178. """
  179. if len(weights) < 2:
  180. return weights
  181. # 计算智能体间历史信号相关性
  182. correlation_matrix = self._calculate_agent_correlations(agents, price_data)
  183. if correlation_matrix is None:
  184. return weights
  185. optimized_weights = weights.copy()
  186. # 找出高相关性对
  187. high_corr_pairs = []
  188. agent_names = list(weights.keys())
  189. for i, name1 in enumerate(agent_names):
  190. for name2 in agent_names[i+1:]:
  191. if name1 in correlation_matrix.index and name2 in correlation_matrix.columns:
  192. corr = abs(correlation_matrix.loc[name1, name2])
  193. if corr > self.correlation_threshold:
  194. high_corr_pairs.append((name1, name2, corr))
  195. # 对高相关性对进行降权
  196. for name1, name2, corr in high_corr_pairs:
  197. # 降低效用较低的智能体权重
  198. if weights[name1] < weights[name2]:
  199. reduction = (corr - self.correlation_threshold) * self.max_correlation_reduction
  200. optimized_weights[name1] *= (1 - reduction)
  201. else:
  202. reduction = (corr - self.correlation_threshold) * self.max_correlation_reduction
  203. optimized_weights[name2] *= (1 - reduction)
  204. return optimized_weights
  205. def _normalize_weights(self, weights: Dict[str, float]) -> Dict[str, float]:
  206. """归一化权重使总和为1"""
  207. if not weights:
  208. return {}
  209. total = sum(weights.values())
  210. if total == 0:
  211. return {name: 1.0 / len(weights) for name in weights}
  212. return {name: w / total for name, w in weights.items()}
  213. def _calculate_agent_correlations(
  214. self,
  215. agents: Dict[str, AgentBase],
  216. price_data: pd.DataFrame
  217. ) -> Optional[pd.DataFrame]:
  218. """计算智能体历史信号的相关性矩阵"""
  219. agent_signals = {}
  220. for name, agent in agents.items():
  221. if len(agent.signal_history) >= 10:
  222. # 提取最近信号的方向序列
  223. signals = [
  224. 1 if s.direction.value == "long" else -1 if s.direction.value == "short" else 0
  225. for s in agent.signal_history[-20:]
  226. ]
  227. agent_signals[name] = signals
  228. if len(agent_signals) < 2:
  229. return None
  230. # 创建DataFrame并计算相关性
  231. df = pd.DataFrame(agent_signals)
  232. return df.corr()
  233. def _calculate_recent_performance(self, agent: AgentBase) -> float:
  234. """计算智能体近期表现"""
  235. recent_trades = agent.trade_history[-10:]
  236. if not recent_trades:
  237. return 0.0
  238. wins = sum(1 for t in recent_trades if t.get("return", 0) > 0)
  239. return wins / len(recent_trades) * 0.1 # 归一化到0-0.1范围
  240. def get_routing_summary(self) -> Dict[str, Any]:
  241. """获取路由历史摘要"""
  242. if not self.routing_history:
  243. return {}
  244. latest = self.routing_history[-1]
  245. return {
  246. "total_routings": len(self.routing_history),
  247. "latest_active_agents": latest.active_agents,
  248. "latest_total_exposure": latest.total_exposure,
  249. "avg_active_agents": np.mean([
  250. len(r.active_agents) for r in self.routing_history
  251. ]),
  252. "avg_exposure": np.mean([
  253. r.total_exposure for r in self.routing_history
  254. ])
  255. }