| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- """
- 智能体基类
- 定义所有策略智能体的标准接口和通用功能
- """
- from abc import ABC, abstractmethod
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import Dict, List, Optional, Any, Tuple
- from enum import Enum
- import uuid
- import numpy as np
- import pandas as pd
- class SignalDirection(Enum):
- """信号方向"""
- LONG = "long"
- SHORT = "short"
- NEUTRAL = "neutral"
- class SignalStrength(Enum):
- """信号强度"""
- STRONG = "strong"
- MEDIUM = "medium"
- WEAK = "weak"
- @dataclass
- class AgentSignal:
- """智能体信号数据结构"""
- agent_name: str
- direction: SignalDirection
- strength: SignalStrength
- confidence: float # 0-1
- suggested_position: float # 0-1
- expected_return: float
- win_probability: float
- timestamp: datetime
- valid_until: Optional[datetime] = None
- metadata: Dict[str, Any] = field(default_factory=dict)
- def is_valid(self) -> bool:
- """检查信号是否有效"""
- if self.valid_until is None:
- return True
- return datetime.now() < self.valid_until
- @dataclass
- class AgentHealth:
- """智能体健康度数据结构"""
- overall_score: float # 0-100
- sharpe_ratio: float
- regime_adaptation: float # 0-100
- signal_stability: float # 0-100
- compute_efficiency: float # 0-100
- status: str # "green", "yellow", "icu", "archived"
- last_evaluation: datetime
- recommendations: List[str] = field(default_factory=list)
- class AgentBase(ABC):
- """
- 智能体基类
- 所有策略智能体必须继承此类并实现抽象方法
- """
- def __init__(
- self,
- name: str,
- config: Optional[Dict[str, Any]] = None,
- max_position: float = 1.0,
- min_confidence: float = 0.5
- ):
- self.name = name
- self.config = config or {}
- self.max_position = max_position
- self.min_confidence = min_confidence
- self.agent_id = str(uuid.uuid4())[:8]
- # 历史记录
- self.signal_history: List[AgentSignal] = []
- self.trade_history: List[Dict] = []
- self.health_history: List[AgentHealth] = []
- # 状态
- self.is_active = True
- self.current_signal: Optional[AgentSignal] = None
- self.performance_stats = {
- "total_trades": 0,
- "winning_trades": 0,
- "total_return": 0.0,
- "max_drawdown": 0.0
- }
- @abstractmethod
- def generate_signal(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[Any] = None
- ) -> Optional[AgentSignal]:
- """
- 生成交易信号
- Args:
- price_data: 价格数据
- ecosystem: 当前市场生态(可选)
- Returns:
- AgentSignal: 交易信号,无信号时返回None
- """
- pass
- @abstractmethod
- def get_expected_return(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[Any] = None
- ) -> float:
- """
- 计算预期收益
- Returns:
- float: 预期收益率(年化)
- """
- pass
- @abstractmethod
- def get_win_probability(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[Any] = None
- ) -> float:
- """
- 计算胜率
- Returns:
- float: 胜率 (0-1)
- """
- pass
- def get_health_score(self) -> AgentHealth:
- """
- 计算健康度评分
- 默认实现,子类可覆盖
- """
- if len(self.trade_history) < 20:
- return AgentHealth(
- overall_score=50.0,
- sharpe_ratio=0.0,
- regime_adaptation=50.0,
- signal_stability=50.0,
- compute_efficiency=100.0,
- status="yellow",
- last_evaluation=datetime.now(),
- recommendations=["样本不足,需要更多交易数据"]
- )
- # 计算近期夏普比率
- recent_returns = [t.get("return", 0) for t in self.trade_history[-20:]]
- sharpe = self._calculate_sharpe(recent_returns)
- # 生态适应性(简化计算)
- regime_adaptation = self._calculate_regime_adaptation()
- # 信号稳定性
- signal_stability = self._calculate_signal_stability()
- # 计算效率(固定值,子类可覆盖)
- compute_efficiency = 95.0
- # 综合评分
- overall = (
- sharpe * 30 +
- regime_adaptation * 0.3 +
- signal_stability * 0.2 +
- compute_efficiency * 0.1
- )
- overall = min(100, max(0, overall + 50)) # 调整到0-100
- # 确定状态
- if overall >= 80:
- status = "green"
- recommendations = []
- elif overall >= 60:
- status = "yellow"
- recommendations = ["健康度一般,建议监控"]
- elif overall >= 30:
- status = "yellow"
- recommendations = ["健康度偏低,需要优化参数"]
- else:
- status = "icu"
- recommendations = ["健康度严重不足,建议暂停交易"]
- health = AgentHealth(
- overall_score=overall,
- sharpe_ratio=sharpe,
- regime_adaptation=regime_adaptation,
- signal_stability=signal_stability,
- compute_efficiency=compute_efficiency,
- status=status,
- last_evaluation=datetime.now(),
- recommendations=recommendations
- )
- self.health_history.append(health)
- return health
- def calculate_utility(
- self,
- price_data: pd.DataFrame,
- ecosystem: Any,
- lambda_risk: float = 0.5,
- alpha_recent: float = 0.3
- ) -> float:
- """
- 计算期望效用 E[U]
- E[U] = P(Win|Regime) * Expected_Return - λ * Risk_Penalty + α * Recent_Performance
- Args:
- price_data: 价格数据
- ecosystem: 市场生态
- lambda_risk: 风险惩罚系数
- alpha_recent: 近期表现系数
- Returns:
- float: 期望效用
- """
- # 基础参数
- p_win = self.get_win_probability(price_data, ecosystem)
- expected_return = self.get_expected_return(price_data, ecosystem)
- # 风险惩罚(使用最大回撤)
- risk_penalty = abs(self.performance_stats.get("max_drawdown", 0.1))
- # 近期表现(最近10笔交易的胜率)
- recent_trades = self.trade_history[-10:]
- if recent_trades:
- recent_win_rate = sum(1 for t in recent_trades if t.get("return", 0) > 0) / len(recent_trades)
- recent_performance = recent_win_rate * 0.1 # 归一化
- else:
- recent_performance = 0.0
- # 生态适配加成
- regime_match = self._check_regime_match(ecosystem)
- utility = (
- p_win * expected_return
- - lambda_risk * risk_penalty
- + alpha_recent * recent_performance
- ) * regime_match
- return utility
- def update_trade_result(self, trade_result: Dict):
- """更新交易结果"""
- self.trade_history.append(trade_result)
- self.performance_stats["total_trades"] += 1
- if trade_result.get("return", 0) > 0:
- self.performance_stats["winning_trades"] += 1
- self.performance_stats["total_return"] += trade_result.get("return", 0)
- # 更新最大回撤
- drawdown = trade_result.get("drawdown", 0)
- if drawdown < self.performance_stats["max_drawdown"]:
- self.performance_stats["max_drawdown"] = drawdown
- def activate(self):
- """激活智能体"""
- self.is_active = True
- def deactivate(self):
- """停用智能体"""
- self.is_active = False
- def get_performance_summary(self) -> Dict[str, Any]:
- """获取业绩摘要"""
- total = self.performance_stats["total_trades"]
- wins = self.performance_stats["winning_trades"]
- return {
- "agent_name": self.name,
- "agent_id": self.agent_id,
- "is_active": self.is_active,
- "total_trades": total,
- "win_rate": wins / total if total > 0 else 0.0,
- "total_return": self.performance_stats["total_return"],
- "max_drawdown": self.performance_stats["max_drawdown"],
- "current_signal": self.current_signal.direction.value if self.current_signal else None
- }
- # 辅助方法
- def _calculate_sharpe(self, returns: List[float], risk_free_rate: float = 0.03) -> float:
- """计算夏普比率"""
- if len(returns) < 2:
- return 0.0
- returns_array = np.array(returns)
- excess_returns = returns_array - risk_free_rate / 252
- std = np.std(excess_returns, ddof=1)
- if std == 0:
- return 0.0
- return np.mean(excess_returns) / std * np.sqrt(252)
- def _calculate_regime_adaptation(self) -> float:
- """计算生态适应性(简化实现)"""
- if len(self.trade_history) < 20:
- return 50.0
- # 假设有生态信息,计算不同生态下的表现差异
- # 简化:返回固定值,子类可覆盖
- return 60.0
- def _calculate_signal_stability(self) -> float:
- """计算信号稳定性"""
- if len(self.signal_history) < 10:
- return 50.0
- recent_signals = self.signal_history[-20:]
- if len(recent_signals) < 2:
- return 50.0
- # 计算信号方向变化的频率
- direction_changes = sum(
- 1 for i in range(1, len(recent_signals))
- if recent_signals[i].direction != recent_signals[i-1].direction
- )
- stability = 1 - (direction_changes / (len(recent_signals) - 1))
- return stability * 100
- def _check_regime_match(self, ecosystem: Any) -> float:
- """检查当前生态适配度"""
- # 默认返回1.0,子类可覆盖
- return 1.0
- def _validate_signal(
- self,
- direction: SignalDirection,
- confidence: float,
- ecosystem: Optional[Any] = None
- ) -> bool:
- """验证信号有效性"""
- if confidence < self.min_confidence:
- return False
- # 检查生态毒性
- if ecosystem and hasattr(ecosystem, 'micro'):
- if ecosystem.micro.flow_toxicity.value in ["high", "medium"]:
- if confidence < 0.8: # 有毒环境下需要更高置信度
- return False
- return True
|