| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- """
- 均值回归者智能体 - 简化高效版
- 核心逻辑:
- 1. RSI<30超卖区 = 买入信号
- 2. 价格偏离20日均线>5% = 加仓信号
- 3. RSI>70或回归均线 = 卖出
- 4. 仅Spring/Winter交易,仓位60%
- """
- from datetime import datetime, timedelta
- from typing import Dict, Any, Optional
- import numpy as np
- import pandas as pd
- from agents.base import AgentBase, AgentSignal, SignalDirection, SignalStrength
- from core.ecosystem import MacroRegime, UnifiedEcosystem
- class MeanReversionAgent(AgentBase):
- """
- 均值回归者 - Spring/Winter专用
- """
- def __init__(
- self,
- config: Optional[Dict[str, Any]] = None,
- rsi_period: int = 14,
- rsi_oversold: float = 35.0, # RSI超卖线
- rsi_overbought: float = 65.0, # RSI超买线
- ma_period: int = 20,
- deviation_threshold: float = 0.05 # 偏离5%
- ):
- super().__init__(
- name="mean_reversion",
- config=config,
- max_position=0.6, # 最大60%仓位
- min_confidence=0.5
- )
- self.rsi_period = rsi_period
- self.rsi_oversold = rsi_oversold
- self.rsi_overbought = rsi_overbought
- self.ma_period = ma_period
- self.deviation_threshold = deviation_threshold
- self.preferred_regimes = [MacroRegime.SPRING, MacroRegime.WINTER]
- self.in_position = False
- self.entry_price = None
- def generate_signal(
- self,
- price_data: pd.DataFrame,
- ecosystem: Optional[UnifiedEcosystem] = None
- ) -> Optional[AgentSignal]:
- """生成交易信号 - RSI超卖反弹"""
- if len(price_data) < max(self.rsi_period, self.ma_period) + 5:
- return None
- close = price_data['close']
- current_price = close.iloc[-1]
- # 计算RSI
- rsi = self._calculate_rsi(close)
- # 计算均线偏离
- ma = close.rolling(self.ma_period).mean()
- ma_deviation = (current_price - ma.iloc[-1]) / ma.iloc[-1]
- direction = SignalDirection.NEUTRAL
- confidence = 0.0
- position_size = 0.0
- signal_type = "neutral"
- # 持仓中,检查出场
- if self.in_position:
- # RSI超买或回归均线,出场
- if rsi > self.rsi_overbought or abs(ma_deviation) < 0.02:
- direction = SignalDirection.NEUTRAL
- confidence = 0.0
- position_size = 0.0
- signal_type = "exit_rsi_mean"
- self._reset_state()
- else:
- # 持仓中
- direction = SignalDirection.LONG
- confidence = 0.6
- position_size = 0.6
- signal_type = "hold"
- # 空仓中,检查入场(仅Spring/Winter)
- else:
- # 生态过滤
- if ecosystem and ecosystem.macro.regime not in self.preferred_regimes:
- return None
- # RSI超卖 + 价格偏离均线 = 买入
- if rsi < self.rsi_oversold and ma_deviation < -self.deviation_threshold:
- direction = SignalDirection.LONG
- confidence = min(1.0, (self.rsi_oversold - rsi) / 20 + 0.6)
- position_size = 0.6
- signal_type = "entry_rsi_oversold"
- self.in_position = True
- self.entry_price = current_price
- # 仅偏离无RSI = 轻仓试探
- elif ma_deviation < -self.deviation_threshold * 1.5:
- direction = SignalDirection.LONG
- confidence = 0.5
- position_size = 0.3
- signal_type = "entry_deviation"
- self.in_position = True
- self.entry_price = current_price
- # 无信号
- if direction == SignalDirection.NEUTRAL and position_size == 0:
- return None
- return AgentSignal(
- agent_name=self.name,
- direction=direction,
- strength=SignalStrength.STRONG if confidence > 0.7 else SignalStrength.MEDIUM,
- confidence=confidence,
- suggested_position=position_size,
- expected_return=0.12,
- win_probability=0.55,
- timestamp=datetime.now(),
- valid_until=datetime.now() + timedelta(hours=24),
- metadata={
- "signal_type": signal_type,
- "rsi": rsi,
- "ma_deviation": ma_deviation,
- "current_price": current_price,
- "regime": ecosystem.macro.regime.value if ecosystem else "unknown"
- }
- )
- def _calculate_rsi(self, prices: pd.Series) -> float:
- """计算RSI"""
- if len(prices) < self.rsi_period + 1:
- return 50.0
- deltas = prices.diff()
- gains = deltas.clip(lower=0)
- losses = (-deltas).clip(lower=0)
- avg_gain = gains.rolling(self.rsi_period).mean()
- avg_loss = losses.rolling(self.rsi_period).mean()
- rs = avg_gain.iloc[-1] / avg_loss.iloc[-1] if avg_loss.iloc[-1] != 0 else 0
- rsi = 100 - (100 / (1 + rs))
- return rsi
- def _reset_state(self):
- """重置状态"""
- self.in_position = False
- self.entry_price = None
- def get_expected_return(self, price_data: pd.DataFrame, ecosystem=None) -> float:
- return 0.12
- def get_win_probability(self, price_data: pd.DataFrame, ecosystem=None) -> float:
- return 0.55
- def calculate_utility(self, price_data, ecosystem, lambda_risk=0.3, alpha_recent=0.3):
- """Spring/Winter高优先级"""
- if ecosystem and ecosystem.macro.regime == MacroRegime.SPRING:
- return 0.48 # Spring最高优先级(略低于Summer的breakout)
- elif ecosystem and ecosystem.macro.regime == MacroRegime.WINTER:
- return 0.40 # Winter高优先级
- elif ecosystem and ecosystem.macro.regime == MacroRegime.SUMMER:
- return 0.05 # Summer几乎不交易
- return 0.20 # 其他中等优先级
|