""" 统一信号引擎 (Unified Signal Engine) 为三品种提供统一的入场/出场信号: 1. 趋势确认:价格 > 20MA > 60MA,且均线向上 2. 动量确认:RSI 50-70,且5日涨幅 > 50% 20日涨幅 3. 量能确认:成交量 > 1.2倍20日均量 """ from typing import Dict, Optional, Tuple from dataclasses import dataclass from datetime import datetime import pandas as pd import numpy as np @dataclass class SignalResult: """信号结果""" signal: str # "enter_long", "exit", "hold", "neutral" confidence: float # 0-1 trend_confirmed: bool momentum_confirmed: bool volume_confirmed: bool # 详细数据 rsi: float price_vs_20ma: float ma20_slope: float ma60_slope: float volume_ratio: float return_5d: float return_20d: float class UnifiedSignalEngine: """ 统一信号引擎 所有品种使用同一套信号规则,避免过拟合 """ def __init__( self, rsi_period: int = 14, rsi_lower: float = 45, # 放宽至45 rsi_upper: float = 75, # 放宽至75 volume_threshold: float = 1.0, # 放宽至1.0(持平即可) ma_fast: int = 20, ma_slow: int = 60 ): self.rsi_period = rsi_period self.rsi_lower = rsi_lower self.rsi_upper = rsi_upper self.volume_threshold = volume_threshold self.ma_fast = ma_fast self.ma_slow = ma_slow # 当前持仓状态 self.in_position = False self.entry_price = None self.highest_price = None def generate_signal( self, df: pd.DataFrame, current_date: Optional[datetime] = None ) -> SignalResult: """ 生成交易信号 Args: df: 品种数据(OHLCV) current_date: 当前日期(回测用) Returns: SignalResult: 信号结果 """ # 获取数据窗口 if current_date is not None: df = df[df.index <= current_date] if len(df) < self.ma_slow + 5: return self._create_neutral_result() close = df['close'] volume = df['volume'] # 计算指标 ma20 = close.rolling(self.ma_fast).mean() ma60 = close.rolling(self.ma_slow).mean() rsi = self._calculate_rsi(close) current_price = close.iloc[-1] current_ma20 = ma20.iloc[-1] current_ma60 = ma60.iloc[-1] current_volume = volume.iloc[-1] avg_volume = volume.iloc[-20:].mean() # 1. 趋势确认 trend_confirmed = self._check_trend( current_price, current_ma20, current_ma60, ma20, ma60 ) # 2. 动量确认 momentum_confirmed, rsi_value = self._check_momentum(close) # 3. 量能确认 volume_confirmed, volume_ratio = self._check_volume(current_volume, avg_volume) # 计算收益率 return_5d = (close.iloc[-1] - close.iloc[-5]) / close.iloc[-5] if len(close) >= 5 else 0 return_20d = (close.iloc[-1] - close.iloc[-20]) / close.iloc[-20] if len(close) >= 20 else 0 # 生成信号 if not self.in_position: # 空仓:检查入场条件 if trend_confirmed and momentum_confirmed and volume_confirmed: signal = "enter_long" confidence = self._calculate_confidence( trend_confirmed, momentum_confirmed, volume_confirmed, rsi_value, volume_ratio ) self.in_position = True self.entry_price = current_price self.highest_price = current_price else: signal = "neutral" confidence = 0.0 else: # 持仓:检查出场条件 self.highest_price = max(self.highest_price, current_price) # 更新最高价 if current_price > self.highest_price: self.highest_price = current_price # 检查出场条件 should_exit = self._check_exit( current_price, close, ma20, rsi_value ) if should_exit: signal = "exit" confidence = 1.0 self._reset_position() else: signal = "hold" confidence = 0.5 return SignalResult( signal=signal, confidence=confidence, trend_confirmed=trend_confirmed, momentum_confirmed=momentum_confirmed, volume_confirmed=volume_confirmed, rsi=rsi_value, price_vs_20ma=(current_price - current_ma20) / current_ma20, ma20_slope=(ma20.iloc[-1] - ma20.iloc[-5]) / ma20.iloc[-5] if len(ma20) >= 5 else 0, ma60_slope=(ma60.iloc[-1] - ma60.iloc[-5]) / ma60.iloc[-5] if len(ma60) >= 5 else 0, volume_ratio=volume_ratio, return_5d=return_5d, return_20d=return_20d ) def _check_trend( self, price: float, ma20: float, ma60: float, ma20_series: pd.Series, ma60_series: pd.Series ) -> bool: """检查趋势确认条件""" # 价格 > 20MA > 60MA price_above_ma = price > ma20 > ma60 # 60MA斜率 > -0.001(趋势向上或走平) ma60_slope = (ma60_series.iloc[-1] - ma60_series.iloc[-5]) / ma60_series.iloc[-5] \ if len(ma60_series) >= 5 else 0 ma_slope_positive = ma60_slope > -0.001 return price_above_ma and ma_slope_positive def _check_momentum(self, close: pd.Series) -> Tuple[bool, float]: """检查动量确认条件""" rsi = self._calculate_rsi(close) # RSI在50-70之间(强势但非超买) rsi_in_range = self.rsi_lower <= rsi <= self.rsi_upper # 5日涨幅 > 50% 20日涨幅(动能加速) if len(close) >= 20: return_5d = (close.iloc[-1] - close.iloc[-5]) / close.iloc[-5] return_20d = (close.iloc[-1] - close.iloc[-20]) / close.iloc[-20] momentum_accelerating = return_5d > return_20d * 0.5 else: momentum_accelerating = False return rsi_in_range and momentum_accelerating, rsi def _check_volume(self, current_vol: float, avg_vol: float) -> Tuple[bool, float]: """检查量能确认条件""" if avg_vol == 0: return False, 0 volume_ratio = current_vol / avg_vol return volume_ratio >= self.volume_threshold, volume_ratio def _check_exit( self, current_price: float, close: pd.Series, ma20: pd.Series, rsi: float ) -> bool: """检查出场条件""" # 1. 趋势反转:价格跌破20日均线 if current_price < ma20.iloc[-1]: return True # 2. 动量衰竭:RSI从高位跌破50 if len(close) >= 2: prev_rsi = self._calculate_rsi(close.iloc[:-1]) if prev_rsi > 60 and rsi < 50: return True # 3. 移动止盈:从最高点回撤10% if self.highest_price and self.entry_price: drawdown_from_peak = (self.highest_price - current_price) / self.highest_price if drawdown_from_peak >= 0.10: return True return False def _calculate_confidence( self, trend: bool, momentum: bool, volume: bool, rsi: float, volume_ratio: float ) -> float: """计算信号置信度""" # 基础分 base = 0.5 # 三条件都满足 if trend and momentum and volume: base += 0.3 # RSI越强越好(但不超过70) if 55 <= rsi <= 65: base += 0.1 # 量能越大越好 if volume_ratio > 1.5: base += 0.1 return min(1.0, base) def _calculate_rsi(self, prices: pd.Series) -> float: """计算RSI""" if len(prices) < self.rsi_period + 1: return 50.0 deltas = prices.diff() gains = deltas.clip(lower=0) losses = (-deltas).clip(lower=0) avg_gain = gains.rolling(self.rsi_period).mean() avg_loss = losses.rolling(self.rsi_period).mean() rs = avg_gain.iloc[-1] / avg_loss.iloc[-1] if avg_loss.iloc[-1] != 0 else 0 rsi = 100 - (100 / (1 + rs)) return rsi def _create_neutral_result(self) -> SignalResult: """创建中性信号结果""" return SignalResult( signal="neutral", confidence=0.0, trend_confirmed=False, momentum_confirmed=False, volume_confirmed=False, rsi=50.0, price_vs_20ma=0.0, ma20_slope=0.0, ma60_slope=0.0, volume_ratio=1.0, return_5d=0.0, return_20d=0.0 ) def _reset_position(self): """重置持仓状态""" self.in_position = False self.entry_price = None self.highest_price = None def set_position_state(self, in_position: bool, entry_price: Optional[float] = None): """设置持仓状态(用于回测恢复)""" self.in_position = in_position self.entry_price = entry_price self.highest_price = entry_price