""" 动量冲浪者智能体 基于价格突破和成交量确认生成短期动量信号 最佳生态:夏季繁荣 持有期:不超过5日 """ from datetime import datetime, timedelta from typing import Dict, Any, Optional import numpy as np import pandas as pd from agents.base import AgentBase, AgentSignal, SignalDirection, SignalStrength from core.ecosystem import MacroRegime, UnifiedEcosystem class MomentumSurferAgent(AgentBase): """ 动量冲浪者智能体 策略逻辑: 1. 价格突破20周期高点 + 成交量>1.5倍均量 = 买入 2. 快速止盈(3-5日) 3. 严格止损(-2%) 4. 避开有毒订单流时段 """ def __init__( self, config: Optional[Dict[str, Any]] = None, breakout_period: int = 20, volume_ratio: float = 1.5, max_hold_days: int = 5, stop_loss: float = 0.02 ): super().__init__( name="momentum_surfer", config=config, max_position=0.6, min_confidence=0.65 ) self.breakout_period = breakout_period self.volume_ratio = volume_ratio self.max_hold_days = max_hold_days self.stop_loss = stop_loss self.preferred_regimes = [MacroRegime.SUMMER] def generate_signal( self, price_data: pd.DataFrame, ecosystem: Optional[UnifiedEcosystem] = None ) -> Optional[AgentSignal]: """生成交易信号""" if len(price_data) < self.breakout_period + 5: return None # 有毒订单流检查 if ecosystem and hasattr(ecosystem, 'micro'): if ecosystem.micro.flow_toxicity.value in ["high", "medium"]: return None # 突破检测 breakout_signal = self._detect_breakout(price_data) if breakout_signal == 0: return None direction = SignalDirection.LONG if breakout_signal > 0 else SignalDirection.SHORT # 成交量确认 volume_confirmed = self._check_volume_confirmation(price_data) if not volume_confirmed: return None # 计算置信度 confidence = abs(breakout_signal) # 动能强度 momentum_strength = self._calculate_momentum_strength(price_data) confidence *= (1 + momentum_strength) confidence = min(1.0, confidence) if not self._validate_signal(direction, confidence, ecosystem): return None position_size = self._calculate_position_size(ecosystem, confidence) strength = ( SignalStrength.STRONG if confidence > 0.8 else SignalStrength.MEDIUM if confidence > 0.70 else SignalStrength.WEAK ) signal = AgentSignal( agent_name=self.name, direction=direction, strength=strength, confidence=confidence, suggested_position=position_size, expected_return=self.get_expected_return(price_data, ecosystem), win_probability=self.get_win_probability(price_data, ecosystem), timestamp=datetime.now(), valid_until=datetime.now() + timedelta(days=self.max_hold_days), metadata={ "breakout_strength": breakout_signal, "momentum_strength": momentum_strength, "max_hold_days": self.max_hold_days, "stop_loss": self.stop_loss } ) self.current_signal = signal self.signal_history.append(signal) return signal def get_expected_return( self, price_data: pd.DataFrame, ecosystem: Optional[UnifiedEcosystem] = None ) -> float: """计算预期收益(短期高收益率)""" base_return = 0.20 # 20%年化 # 短期爆发预期更高 daily_return = base_return / 252 expected_5day = (1 + daily_return) ** 5 - 1 return expected_5day * 10 # 换算回年化展示 def get_win_probability( self, price_data: pd.DataFrame, ecosystem: Optional[UnifiedEcosystem] = None ) -> float: """计算胜率""" base_prob = 0.50 # 趋势强度加成 momentum = self._calculate_momentum_strength(price_data) base_prob += momentum * 0.15 # 生态加成 if ecosystem and ecosystem.macro.regime == MacroRegime.SUMMER: base_prob += 0.12 elif ecosystem and ecosystem.macro.regime == MacroRegime.SPRING: base_prob += 0.05 return min(0.70, base_prob) def _detect_breakout(self, data: pd.DataFrame) -> float: """检测突破信号强度 (-1 到 1)""" current_price = data['close'].iloc[-1] high_20 = data['high'].iloc[-self.breakout_period:].max() low_20 = data['low'].iloc[-self.breakout_period:].min() prev_close = data['close'].iloc[-2] # 向上突破 if current_price > high_20 and current_price > prev_close * 1.01: strength = (current_price - high_20) / high_20 * 50 return min(1.0, 0.6 + strength) # 向下突破 if current_price < low_20 and current_price < prev_close * 0.99: strength = (low_20 - current_price) / low_20 * 50 return -min(1.0, 0.6 + strength) return 0.0 def _check_volume_confirmation(self, data: pd.DataFrame) -> bool: """检查成交量确认""" current_volume = data['volume'].iloc[-1] avg_volume = data['volume'].iloc[-20:].mean() if avg_volume == 0: return False return current_volume > avg_volume * self.volume_ratio def _calculate_momentum_strength(self, data: pd.DataFrame) -> float: """计算动能强度""" if len(data) < 10: return 0.0 # 使用ROC(变动率) roc = (data['close'].iloc[-1] - data['close'].iloc[-10]) / data['close'].iloc[-10] # 使用RSI delta = data['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) current_rsi = rsi.iloc[-1] if not pd.isna(rsi.iloc[-1]) else 50 # 综合动能 momentum = (roc * 10 + (current_rsi - 50) / 50) / 2 return max(-1, min(1, momentum)) def _calculate_position_size( self, ecosystem: Optional[UnifiedEcosystem], confidence: float ) -> float: """计算建议仓位(动量策略仓位较小)""" base_size = self.max_position * confidence * 0.8 if ecosystem: if ecosystem.macro.regime == MacroRegime.SUMMER: base_size *= 1.0 else: base_size *= 0.5 health_factor = ecosystem.meso.health_score / 100 base_size *= health_factor return min(self.max_position, base_size) def _check_regime_match(self, ecosystem: Any) -> float: """检查生态适配度""" if not ecosystem or not hasattr(ecosystem, 'macro'): return 0.7 if ecosystem.macro.regime == MacroRegime.SUMMER: return 1.2 elif ecosystem.macro.regime in [MacroRegime.SPRING]: return 0.9 else: return 0.5