| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- """
- 趋势猎手智能体 - 稳健趋势跟踪
- 核心逻辑:
- 1. 双均线金叉入场
- 2. ADX>20确认趋势
- 3. 价格>MA20确认方向
- 4. 生态过滤:只在Summer/Spring交易
- """
- from datetime import datetime, timedelta
- from typing import Dict, Any, Optional
- import numpy as np
- import pandas as pd
- from agents.base import AgentBase, AgentSignal, SignalDirection, SignalStrength
- from core.ecosystem import MacroRegime, UnifiedEcosystem
- class TrendHunterAgent(AgentBase):
- """
- 趋势猎手 - 稳健版本
- """
- def __init__(
- self,
- config: Optional[Dict[str, Any]] = None,
- ma_fast: int = 5, # 5日均线
- ma_slow: int = 20, # 20日均线
- adx_period: int = 14,
- adx_threshold: float = 20.0, # ADX门槛
- ):
- super().__init__(
- name="trend_hunter",
- config=config,
- max_position=1.0,
- min_confidence=0.55
- )
- self.ma_fast = ma_fast
- self.ma_slow = ma_slow
- self.adx_period = adx_period
- self.adx_threshold = adx_threshold
- self.preferred_regimes = [MacroRegime.SUMMER, MacroRegime.SPRING]
- def generate_signal(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[UnifiedEcosystem] = None
- ) -> Optional[AgentSignal]:
- """生成交易信号"""
- if len(price_data) < self.ma_slow + 10:
- return None
- close = price_data['close']
- high = price_data['high']
- low = price_data['low']
- # 计算指标
- ma_fast = close.rolling(self.ma_fast).mean()
- ma_slow = close.rolling(self.ma_slow).mean()
- adx = self._calculate_adx(price_data)
- current_price = close.iloc[-1]
- current_ma_fast = ma_fast.iloc[-1]
- current_ma_slow = ma_slow.iloc[-1]
- prev_ma_fast = ma_fast.iloc[-2]
- prev_ma_slow = ma_slow.iloc[-2]
- # 金叉/死叉判断
- golden_cross = (prev_ma_fast <= prev_ma_slow) and (current_ma_fast > current_ma_slow)
- death_cross = (prev_ma_fast >= prev_ma_slow) and (current_ma_fast < current_ma_slow)
- # 趋势强度
- ma_diff = (current_ma_fast - current_ma_slow) / current_ma_slow
- direction = SignalDirection.NEUTRAL
- confidence = 0.0
- position_size = 0.0
- # 只在Summer/Spring交易
- if ecosystem and ecosystem.macro.regime in self.preferred_regimes:
- health = ecosystem.meso.health_score / 100
- # 入场:简化条件 - 只要在均线上方 + 健康度>50
- if current_price > current_ma_slow and current_ma_fast > current_ma_slow and health > 0.5:
- # 金叉时高仓位,否则维持仓位
- if golden_cross:
- direction = SignalDirection.LONG
- confidence = min(1.0, 0.7 * health)
- position_size = confidence
- else:
- direction = SignalDirection.LONG
- confidence = 0.5 * health
- position_size = confidence * 0.6
- # 出场:死叉或跌破慢线
- elif death_cross or current_price < current_ma_slow * 0.98: # 允许2%回撤
- direction = SignalDirection.NEUTRAL
- confidence = 0.0
- position_size = 0.0
- # 非目标生态,观望
- else:
- direction = SignalDirection.NEUTRAL
- confidence = 0.0
- position_size = 0.0
- # 无信号时返回None
- if direction == SignalDirection.NEUTRAL and position_size == 0:
- return None
- # 确定强度
- if confidence > 0.75:
- strength = SignalStrength.STRONG
- elif confidence > 0.6:
- strength = SignalStrength.MEDIUM
- else:
- strength = SignalStrength.WEAK
- signal = AgentSignal(
- agent_name=self.name,
- direction=direction,
- strength=strength,
- confidence=confidence,
- suggested_position=position_size,
- expected_return=self.get_expected_return(price_data, ecosystem),
- win_probability=self.get_win_probability(price_data, ecosystem),
- timestamp=datetime.now(),
- valid_until=datetime.now() + timedelta(hours=24),
- metadata={
- "current_price": current_price,
- "ma_fast": current_ma_fast,
- "ma_slow": current_ma_slow,
- "adx": adx,
- "golden_cross": golden_cross,
- "death_cross": death_cross,
- "ma_diff": ma_diff,
- "health": ecosystem.meso.health_score if ecosystem else 0
- }
- )
- self.current_signal = signal
- self.signal_history.append(signal)
- return signal
- def _calculate_adx(self, data: pd.DataFrame) -> float:
- """计算ADX指标"""
- if len(data) < self.adx_period * 2:
- return 20.0
- high = data['high']
- low = data['low']
- close = data['close']
- # +DM和-DM
- plus_dm = high.diff()
- minus_dm = -low.diff()
- plus_dm = plus_dm.clip(lower=0)
- minus_dm = minus_dm.clip(lower=0)
- # TR
- tr1 = high - low
- tr2 = (high - close.shift(1)).abs()
- tr3 = (low - close.shift(1)).abs()
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
- # ATR
- atr = tr.rolling(self.adx_period).mean()
- # +DI和-DI
- plus_di = 100 * (plus_dm.rolling(self.adx_period).mean() / atr)
- minus_di = 100 * (minus_dm.rolling(self.adx_period).mean() / atr)
- # DX和ADX
- dx = 100 * (plus_di - minus_di).abs() / (plus_di + minus_di)
- adx = dx.rolling(self.adx_period).mean()
- return adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0
- def get_expected_return(self, price_data: pd.DataFrame, ecosystem=None) -> float:
- """预期收益"""
- if ecosystem and ecosystem.macro.regime == MacroRegime.SUMMER:
- return 0.35
- elif ecosystem and ecosystem.macro.regime == MacroRegime.SPRING:
- return 0.25
- return 0.15
- def get_win_probability(self, price_data: pd.DataFrame, ecosystem=None) -> float:
- """胜率估算"""
- base_prob = 0.55
- if ecosystem:
- if ecosystem.macro.regime == MacroRegime.SUMMER:
- base_prob += 0.12
- elif ecosystem.macro.regime == MacroRegime.SPRING:
- base_prob += 0.08
- health = ecosystem.meso.health_score / 100
- base_prob += (health - 0.5) * 0.1
- return min(0.80, base_prob)
- def _calculate_position_size(
- self,
- ecosystem: Optional[UnifiedEcosystem],
- confidence: float
- ) -> float:
- """计算仓位"""
- return self.max_position * confidence
|