| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- """
- 统一信号引擎 (Unified Signal Engine)
- 为三品种提供统一的入场/出场信号:
- 1. 趋势确认:价格 > 20MA > 60MA,且均线向上
- 2. 动量确认:RSI 50-70,且5日涨幅 > 50% 20日涨幅
- 3. 量能确认:成交量 > 1.2倍20日均量
- """
- from typing import Dict, Optional, Tuple
- from dataclasses import dataclass
- from datetime import datetime
- import pandas as pd
- import numpy as np
- @dataclass
- class SignalResult:
- """信号结果"""
- signal: str # "enter_long", "exit", "hold", "neutral"
- confidence: float # 0-1
- trend_confirmed: bool
- momentum_confirmed: bool
- volume_confirmed: bool
- # 详细数据
- rsi: float
- price_vs_20ma: float
- ma20_slope: float
- ma60_slope: float
- volume_ratio: float
- return_5d: float
- return_20d: float
- class UnifiedSignalEngine:
- """
- 统一信号引擎
- 所有品种使用同一套信号规则,避免过拟合
- """
- def __init__(
- self,
- rsi_period: int = 14,
- rsi_lower: float = 45, # 放宽至45
- rsi_upper: float = 75, # 放宽至75
- volume_threshold: float = 1.0, # 放宽至1.0(持平即可)
- ma_fast: int = 20,
- ma_slow: int = 60
- ):
- self.rsi_period = rsi_period
- self.rsi_lower = rsi_lower
- self.rsi_upper = rsi_upper
- self.volume_threshold = volume_threshold
- self.ma_fast = ma_fast
- self.ma_slow = ma_slow
- # 当前持仓状态
- self.in_position = False
- self.entry_price = None
- self.highest_price = None
- def generate_signal(
- self,
- df: pd.DataFrame,
- current_date: Optional[datetime] = None
- ) -> SignalResult:
- """
- 生成交易信号
- Args:
- df: 品种数据(OHLCV)
- current_date: 当前日期(回测用)
- Returns:
- SignalResult: 信号结果
- """
- # 获取数据窗口
- if current_date is not None:
- df = df[df.index <= current_date]
- if len(df) < self.ma_slow + 5:
- return self._create_neutral_result()
- close = df['close']
- volume = df['volume']
- # 计算指标
- ma20 = close.rolling(self.ma_fast).mean()
- ma60 = close.rolling(self.ma_slow).mean()
- rsi = self._calculate_rsi(close)
- current_price = close.iloc[-1]
- current_ma20 = ma20.iloc[-1]
- current_ma60 = ma60.iloc[-1]
- current_volume = volume.iloc[-1]
- avg_volume = volume.iloc[-20:].mean()
- # 1. 趋势确认
- trend_confirmed = self._check_trend(
- current_price, current_ma20, current_ma60, ma20, ma60
- )
- # 2. 动量确认
- momentum_confirmed, rsi_value = self._check_momentum(close)
- # 3. 量能确认
- volume_confirmed, volume_ratio = self._check_volume(current_volume, avg_volume)
- # 计算收益率
- return_5d = (close.iloc[-1] - close.iloc[-5]) / close.iloc[-5] if len(close) >= 5 else 0
- return_20d = (close.iloc[-1] - close.iloc[-20]) / close.iloc[-20] if len(close) >= 20 else 0
- # 生成信号
- if not self.in_position:
- # 空仓:检查入场条件
- if trend_confirmed and momentum_confirmed and volume_confirmed:
- signal = "enter_long"
- confidence = self._calculate_confidence(
- trend_confirmed, momentum_confirmed, volume_confirmed,
- rsi_value, volume_ratio
- )
- self.in_position = True
- self.entry_price = current_price
- self.highest_price = current_price
- else:
- signal = "neutral"
- confidence = 0.0
- else:
- # 持仓:检查出场条件
- self.highest_price = max(self.highest_price, current_price)
- # 更新最高价
- if current_price > self.highest_price:
- self.highest_price = current_price
- # 检查出场条件
- should_exit = self._check_exit(
- current_price, close, ma20, rsi_value
- )
- if should_exit:
- signal = "exit"
- confidence = 1.0
- self._reset_position()
- else:
- signal = "hold"
- confidence = 0.5
- return SignalResult(
- signal=signal,
- confidence=confidence,
- trend_confirmed=trend_confirmed,
- momentum_confirmed=momentum_confirmed,
- volume_confirmed=volume_confirmed,
- rsi=rsi_value,
- price_vs_20ma=(current_price - current_ma20) / current_ma20,
- ma20_slope=(ma20.iloc[-1] - ma20.iloc[-5]) / ma20.iloc[-5] if len(ma20) >= 5 else 0,
- ma60_slope=(ma60.iloc[-1] - ma60.iloc[-5]) / ma60.iloc[-5] if len(ma60) >= 5 else 0,
- volume_ratio=volume_ratio,
- return_5d=return_5d,
- return_20d=return_20d
- )
- def _check_trend(
- self,
- price: float,
- ma20: float,
- ma60: float,
- ma20_series: pd.Series,
- ma60_series: pd.Series
- ) -> bool:
- """检查趋势确认条件"""
- # 价格 > 20MA > 60MA
- price_above_ma = price > ma20 > ma60
- # 60MA斜率 > -0.001(趋势向上或走平)
- ma60_slope = (ma60_series.iloc[-1] - ma60_series.iloc[-5]) / ma60_series.iloc[-5] \
- if len(ma60_series) >= 5 else 0
- ma_slope_positive = ma60_slope > -0.001
- return price_above_ma and ma_slope_positive
- def _check_momentum(self, close: pd.Series) -> Tuple[bool, float]:
- """检查动量确认条件"""
- rsi = self._calculate_rsi(close)
- # RSI在50-70之间(强势但非超买)
- rsi_in_range = self.rsi_lower <= rsi <= self.rsi_upper
- # 5日涨幅 > 50% 20日涨幅(动能加速)
- if len(close) >= 20:
- return_5d = (close.iloc[-1] - close.iloc[-5]) / close.iloc[-5]
- return_20d = (close.iloc[-1] - close.iloc[-20]) / close.iloc[-20]
- momentum_accelerating = return_5d > return_20d * 0.5
- else:
- momentum_accelerating = False
- return rsi_in_range and momentum_accelerating, rsi
- def _check_volume(self, current_vol: float, avg_vol: float) -> Tuple[bool, float]:
- """检查量能确认条件"""
- if avg_vol == 0:
- return False, 0
- volume_ratio = current_vol / avg_vol
- return volume_ratio >= self.volume_threshold, volume_ratio
- def _check_exit(
- self,
- current_price: float,
- close: pd.Series,
- ma20: pd.Series,
- rsi: float
- ) -> bool:
- """检查出场条件"""
- # 1. 趋势反转:价格跌破20日均线
- if current_price < ma20.iloc[-1]:
- return True
- # 2. 动量衰竭:RSI从高位跌破50
- if len(close) >= 2:
- prev_rsi = self._calculate_rsi(close.iloc[:-1])
- if prev_rsi > 60 and rsi < 50:
- return True
- # 3. 移动止盈:从最高点回撤10%
- if self.highest_price and self.entry_price:
- drawdown_from_peak = (self.highest_price - current_price) / self.highest_price
- if drawdown_from_peak >= 0.10:
- return True
- return False
- def _calculate_confidence(
- self,
- trend: bool,
- momentum: bool,
- volume: bool,
- rsi: float,
- volume_ratio: float
- ) -> float:
- """计算信号置信度"""
- # 基础分
- base = 0.5
- # 三条件都满足
- if trend and momentum and volume:
- base += 0.3
- # RSI越强越好(但不超过70)
- if 55 <= rsi <= 65:
- base += 0.1
- # 量能越大越好
- if volume_ratio > 1.5:
- base += 0.1
- return min(1.0, base)
- def _calculate_rsi(self, prices: pd.Series) -> float:
- """计算RSI"""
- if len(prices) < self.rsi_period + 1:
- return 50.0
- deltas = prices.diff()
- gains = deltas.clip(lower=0)
- losses = (-deltas).clip(lower=0)
- avg_gain = gains.rolling(self.rsi_period).mean()
- avg_loss = losses.rolling(self.rsi_period).mean()
- rs = avg_gain.iloc[-1] / avg_loss.iloc[-1] if avg_loss.iloc[-1] != 0 else 0
- rsi = 100 - (100 / (1 + rs))
- return rsi
- def _create_neutral_result(self) -> SignalResult:
- """创建中性信号结果"""
- return SignalResult(
- signal="neutral",
- confidence=0.0,
- trend_confirmed=False,
- momentum_confirmed=False,
- volume_confirmed=False,
- rsi=50.0,
- price_vs_20ma=0.0,
- ma20_slope=0.0,
- ma60_slope=0.0,
- volume_ratio=1.0,
- return_5d=0.0,
- return_20d=0.0
- )
- def _reset_position(self):
- """重置持仓状态"""
- self.in_position = False
- self.entry_price = None
- self.highest_price = None
- def set_position_state(self, in_position: bool, entry_price: Optional[float] = None):
- """设置持仓状态(用于回测恢复)"""
- self.in_position = in_position
- self.entry_price = entry_price
- self.highest_price = entry_price
|