""" 智能体基类 定义所有策略智能体的标准接口和通用功能 """ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional, Any, Tuple from enum import Enum import uuid import numpy as np import pandas as pd class SignalDirection(Enum): """信号方向""" LONG = "long" SHORT = "short" NEUTRAL = "neutral" class SignalStrength(Enum): """信号强度""" STRONG = "strong" MEDIUM = "medium" WEAK = "weak" @dataclass class AgentSignal: """智能体信号数据结构""" agent_name: str direction: SignalDirection strength: SignalStrength confidence: float # 0-1 suggested_position: float # 0-1 expected_return: float win_probability: float timestamp: datetime valid_until: Optional[datetime] = None metadata: Dict[str, Any] = field(default_factory=dict) def is_valid(self) -> bool: """检查信号是否有效""" if self.valid_until is None: return True return datetime.now() < self.valid_until @dataclass class AgentHealth: """智能体健康度数据结构""" overall_score: float # 0-100 sharpe_ratio: float regime_adaptation: float # 0-100 signal_stability: float # 0-100 compute_efficiency: float # 0-100 status: str # "green", "yellow", "icu", "archived" last_evaluation: datetime recommendations: List[str] = field(default_factory=list) class AgentBase(ABC): """ 智能体基类 所有策略智能体必须继承此类并实现抽象方法 """ def __init__( self, name: str, config: Optional[Dict[str, Any]] = None, max_position: float = 1.0, min_confidence: float = 0.5 ): self.name = name self.config = config or {} self.max_position = max_position self.min_confidence = min_confidence self.agent_id = str(uuid.uuid4())[:8] # 历史记录 self.signal_history: List[AgentSignal] = [] self.trade_history: List[Dict] = [] self.health_history: List[AgentHealth] = [] # 状态 self.is_active = True self.current_signal: Optional[AgentSignal] = None self.performance_stats = { "total_trades": 0, "winning_trades": 0, "total_return": 0.0, "max_drawdown": 0.0 } @abstractmethod def generate_signal( self, price_data: pd.DataFrame, ecosystem: Optional[Any] = None ) -> Optional[AgentSignal]: """ 生成交易信号 Args: price_data: 价格数据 ecosystem: 当前市场生态(可选) Returns: AgentSignal: 交易信号,无信号时返回None """ pass @abstractmethod def get_expected_return( self, price_data: pd.DataFrame, ecosystem: Optional[Any] = None ) -> float: """ 计算预期收益 Returns: float: 预期收益率(年化) """ pass @abstractmethod def get_win_probability( self, price_data: pd.DataFrame, ecosystem: Optional[Any] = None ) -> float: """ 计算胜率 Returns: float: 胜率 (0-1) """ pass def get_health_score(self) -> AgentHealth: """ 计算健康度评分 默认实现,子类可覆盖 """ if len(self.trade_history) < 20: return AgentHealth( overall_score=50.0, sharpe_ratio=0.0, regime_adaptation=50.0, signal_stability=50.0, compute_efficiency=100.0, status="yellow", last_evaluation=datetime.now(), recommendations=["样本不足,需要更多交易数据"] ) # 计算近期夏普比率 recent_returns = [t.get("return", 0) for t in self.trade_history[-20:]] sharpe = self._calculate_sharpe(recent_returns) # 生态适应性(简化计算) regime_adaptation = self._calculate_regime_adaptation() # 信号稳定性 signal_stability = self._calculate_signal_stability() # 计算效率(固定值,子类可覆盖) compute_efficiency = 95.0 # 综合评分 overall = ( sharpe * 30 + regime_adaptation * 0.3 + signal_stability * 0.2 + compute_efficiency * 0.1 ) overall = min(100, max(0, overall + 50)) # 调整到0-100 # 确定状态 if overall >= 80: status = "green" recommendations = [] elif overall >= 60: status = "yellow" recommendations = ["健康度一般,建议监控"] elif overall >= 30: status = "yellow" recommendations = ["健康度偏低,需要优化参数"] else: status = "icu" recommendations = ["健康度严重不足,建议暂停交易"] health = AgentHealth( overall_score=overall, sharpe_ratio=sharpe, regime_adaptation=regime_adaptation, signal_stability=signal_stability, compute_efficiency=compute_efficiency, status=status, last_evaluation=datetime.now(), recommendations=recommendations ) self.health_history.append(health) return health def calculate_utility( self, price_data: pd.DataFrame, ecosystem: Any, lambda_risk: float = 0.5, alpha_recent: float = 0.3 ) -> float: """ 计算期望效用 E[U] E[U] = P(Win|Regime) * Expected_Return - λ * Risk_Penalty + α * Recent_Performance Args: price_data: 价格数据 ecosystem: 市场生态 lambda_risk: 风险惩罚系数 alpha_recent: 近期表现系数 Returns: float: 期望效用 """ # 基础参数 p_win = self.get_win_probability(price_data, ecosystem) expected_return = self.get_expected_return(price_data, ecosystem) # 风险惩罚(使用最大回撤) risk_penalty = abs(self.performance_stats.get("max_drawdown", 0.1)) # 近期表现(最近10笔交易的胜率) recent_trades = self.trade_history[-10:] if recent_trades: recent_win_rate = sum(1 for t in recent_trades if t.get("return", 0) > 0) / len(recent_trades) recent_performance = recent_win_rate * 0.1 # 归一化 else: recent_performance = 0.0 # 生态适配加成 regime_match = self._check_regime_match(ecosystem) utility = ( p_win * expected_return - lambda_risk * risk_penalty + alpha_recent * recent_performance ) * regime_match return utility def update_trade_result(self, trade_result: Dict): """更新交易结果""" self.trade_history.append(trade_result) self.performance_stats["total_trades"] += 1 if trade_result.get("return", 0) > 0: self.performance_stats["winning_trades"] += 1 self.performance_stats["total_return"] += trade_result.get("return", 0) # 更新最大回撤 drawdown = trade_result.get("drawdown", 0) if drawdown < self.performance_stats["max_drawdown"]: self.performance_stats["max_drawdown"] = drawdown def activate(self): """激活智能体""" self.is_active = True def deactivate(self): """停用智能体""" self.is_active = False def get_performance_summary(self) -> Dict[str, Any]: """获取业绩摘要""" total = self.performance_stats["total_trades"] wins = self.performance_stats["winning_trades"] return { "agent_name": self.name, "agent_id": self.agent_id, "is_active": self.is_active, "total_trades": total, "win_rate": wins / total if total > 0 else 0.0, "total_return": self.performance_stats["total_return"], "max_drawdown": self.performance_stats["max_drawdown"], "current_signal": self.current_signal.direction.value if self.current_signal else None } # 辅助方法 def _calculate_sharpe(self, returns: List[float], risk_free_rate: float = 0.03) -> float: """计算夏普比率""" if len(returns) < 2: return 0.0 returns_array = np.array(returns) excess_returns = returns_array - risk_free_rate / 252 std = np.std(excess_returns, ddof=1) if std == 0: return 0.0 return np.mean(excess_returns) / std * np.sqrt(252) def _calculate_regime_adaptation(self) -> float: """计算生态适应性(简化实现)""" if len(self.trade_history) < 20: return 50.0 # 假设有生态信息,计算不同生态下的表现差异 # 简化:返回固定值,子类可覆盖 return 60.0 def _calculate_signal_stability(self) -> float: """计算信号稳定性""" if len(self.signal_history) < 10: return 50.0 recent_signals = self.signal_history[-20:] if len(recent_signals) < 2: return 50.0 # 计算信号方向变化的频率 direction_changes = sum( 1 for i in range(1, len(recent_signals)) if recent_signals[i].direction != recent_signals[i-1].direction ) stability = 1 - (direction_changes / (len(recent_signals) - 1)) return stability * 100 def _check_regime_match(self, ecosystem: Any) -> float: """检查当前生态适配度""" # 默认返回1.0,子类可覆盖 return 1.0 def _validate_signal( self, direction: SignalDirection, confidence: float, ecosystem: Optional[Any] = None ) -> bool: """验证信号有效性""" if confidence < self.min_confidence: return False # 检查生态毒性 if ecosystem and hasattr(ecosystem, 'micro'): if ecosystem.micro.flow_toxicity.value in ["high", "medium"]: if confidence < 0.8: # 有毒环境下需要更高置信度 return False return True