| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- """
- 动量冲浪者智能体
- 基于价格突破和成交量确认生成短期动量信号
- 最佳生态:夏季繁荣
- 持有期:不超过5日
- """
- from datetime import datetime, timedelta
- from typing import Dict, Any, Optional
- import numpy as np
- import pandas as pd
- from agents.base import AgentBase, AgentSignal, SignalDirection, SignalStrength
- from core.ecosystem import MacroRegime, UnifiedEcosystem
- class MomentumSurferAgent(AgentBase):
- """
- 动量冲浪者智能体
- 策略逻辑:
- 1. 价格突破20周期高点 + 成交量>1.5倍均量 = 买入
- 2. 快速止盈(3-5日)
- 3. 严格止损(-2%)
- 4. 避开有毒订单流时段
- """
- def __init__(
- self,
- config: Optional[Dict[str, Any]] = None,
- breakout_period: int = 20,
- volume_ratio: float = 1.5,
- max_hold_days: int = 5,
- stop_loss: float = 0.02
- ):
- super().__init__(
- name="momentum_surfer",
- config=config,
- max_position=0.6,
- min_confidence=0.65
- )
- self.breakout_period = breakout_period
- self.volume_ratio = volume_ratio
- self.max_hold_days = max_hold_days
- self.stop_loss = stop_loss
- self.preferred_regimes = [MacroRegime.SUMMER]
- def generate_signal(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[UnifiedEcosystem] = None
- ) -> Optional[AgentSignal]:
- """生成交易信号"""
- if len(price_data) < self.breakout_period + 5:
- return None
- # 有毒订单流检查
- if ecosystem and hasattr(ecosystem, 'micro'):
- if ecosystem.micro.flow_toxicity.value in ["high", "medium"]:
- return None
- # 突破检测
- breakout_signal = self._detect_breakout(price_data)
- if breakout_signal == 0:
- return None
- direction = SignalDirection.LONG if breakout_signal > 0 else SignalDirection.SHORT
- # 成交量确认
- volume_confirmed = self._check_volume_confirmation(price_data)
- if not volume_confirmed:
- return None
- # 计算置信度
- confidence = abs(breakout_signal)
- # 动能强度
- momentum_strength = self._calculate_momentum_strength(price_data)
- confidence *= (1 + momentum_strength)
- confidence = min(1.0, confidence)
- if not self._validate_signal(direction, confidence, ecosystem):
- return None
- position_size = self._calculate_position_size(ecosystem, confidence)
- strength = (
- SignalStrength.STRONG if confidence > 0.8
- else SignalStrength.MEDIUM if confidence > 0.70
- else SignalStrength.WEAK
- )
- signal = AgentSignal(
- agent_name=self.name,
- direction=direction,
- strength=strength,
- confidence=confidence,
- suggested_position=position_size,
- expected_return=self.get_expected_return(price_data, ecosystem),
- win_probability=self.get_win_probability(price_data, ecosystem),
- timestamp=datetime.now(),
- valid_until=datetime.now() + timedelta(days=self.max_hold_days),
- metadata={
- "breakout_strength": breakout_signal,
- "momentum_strength": momentum_strength,
- "max_hold_days": self.max_hold_days,
- "stop_loss": self.stop_loss
- }
- )
- self.current_signal = signal
- self.signal_history.append(signal)
- return signal
- def get_expected_return(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[UnifiedEcosystem] = None
- ) -> float:
- """计算预期收益(短期高收益率)"""
- base_return = 0.20 # 20%年化
- # 短期爆发预期更高
- daily_return = base_return / 252
- expected_5day = (1 + daily_return) ** 5 - 1
- return expected_5day * 10 # 换算回年化展示
- def get_win_probability(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[UnifiedEcosystem] = None
- ) -> float:
- """计算胜率"""
- base_prob = 0.50
- # 趋势强度加成
- momentum = self._calculate_momentum_strength(price_data)
- base_prob += momentum * 0.15
- # 生态加成
- if ecosystem and ecosystem.macro.regime == MacroRegime.SUMMER:
- base_prob += 0.12
- elif ecosystem and ecosystem.macro.regime == MacroRegime.SPRING:
- base_prob += 0.05
- return min(0.70, base_prob)
- def _detect_breakout(self, data: pd.DataFrame) -> float:
- """检测突破信号强度 (-1 到 1)"""
- current_price = data['close'].iloc[-1]
- high_20 = data['high'].iloc[-self.breakout_period:].max()
- low_20 = data['low'].iloc[-self.breakout_period:].min()
- prev_close = data['close'].iloc[-2]
- # 向上突破
- if current_price > high_20 and current_price > prev_close * 1.01:
- strength = (current_price - high_20) / high_20 * 50
- return min(1.0, 0.6 + strength)
- # 向下突破
- if current_price < low_20 and current_price < prev_close * 0.99:
- strength = (low_20 - current_price) / low_20 * 50
- return -min(1.0, 0.6 + strength)
- return 0.0
- def _check_volume_confirmation(self, data: pd.DataFrame) -> bool:
- """检查成交量确认"""
- current_volume = data['volume'].iloc[-1]
- avg_volume = data['volume'].iloc[-20:].mean()
- if avg_volume == 0:
- return False
- return current_volume > avg_volume * self.volume_ratio
- def _calculate_momentum_strength(self, data: pd.DataFrame) -> float:
- """计算动能强度"""
- if len(data) < 10:
- return 0.0
- # 使用ROC(变动率)
- roc = (data['close'].iloc[-1] - data['close'].iloc[-10]) / data['close'].iloc[-10]
- # 使用RSI
- delta = data['close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
- rs = gain / loss
- rsi = 100 - (100 / (1 + rs))
- current_rsi = rsi.iloc[-1] if not pd.isna(rsi.iloc[-1]) else 50
- # 综合动能
- momentum = (roc * 10 + (current_rsi - 50) / 50) / 2
- return max(-1, min(1, momentum))
- def _calculate_position_size(
- self,
- ecosystem: Optional[UnifiedEcosystem],
- confidence: float
- ) -> float:
- """计算建议仓位(动量策略仓位较小)"""
- base_size = self.max_position * confidence * 0.8
- if ecosystem:
- if ecosystem.macro.regime == MacroRegime.SUMMER:
- base_size *= 1.0
- else:
- base_size *= 0.5
- health_factor = ecosystem.meso.health_score / 100
- base_size *= health_factor
- return min(self.max_position, base_size)
- def _check_regime_match(self, ecosystem: Any) -> float:
- """检查生态适配度"""
- if not ecosystem or not hasattr(ecosystem, 'macro'):
- return 0.7
- if ecosystem.macro.regime == MacroRegime.SUMMER:
- return 1.2
- elif ecosystem.macro.regime in [MacroRegime.SPRING]:
- return 0.9
- else:
- return 0.5
|