| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- """
- 一级信号采集器
- 采集所有原始信号并标准化为[-1, 1]区间
- """
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Any
- from datetime import datetime
- import pandas as pd
- import numpy as np
- @dataclass
- class PrimarySignal:
- """一级信号数据结构"""
- source: str # 信号来源
- signal_type: str # 信号类型
- value: float # 信号值 (-1 to 1)
- raw_value: float # 原始值
- timestamp: datetime
- metadata: Dict[str, Any] = field(default_factory=dict)
- class SignalCollector:
- """
- 信号采集器
- 从各模块采集一级信号:
- - 趋势质量评分
- - 卡尔曼滤波趋势
- - RSRS阻力支撑强度
- - HMM状态概率
- - 生态识别结果
- """
- def __init__(self):
- self.signal_history: List[PrimarySignal] = []
- def collect_from_ecosystem(self, ecosystem: Any) -> List[PrimarySignal]:
- """从生态识别模块采集信号"""
- signals = []
- # 宏观生态信号
- if hasattr(ecosystem, 'macro'):
- macro_signal = self._encode_macro_regime(ecosystem.macro)
- signals.append(PrimarySignal(
- source="ecosystem",
- signal_type="macro_regime",
- value=macro_signal,
- raw_value=ecosystem.macro.regime.value,
- timestamp=datetime.now(),
- metadata={"confidence": ecosystem.macro.confidence}
- ))
- # 中观健康度信号
- if hasattr(ecosystem, 'meso'):
- health_signal = (ecosystem.meso.health_score - 50) / 50 # 标准化到-1,1
- signals.append(PrimarySignal(
- source="ecosystem",
- signal_type="health_score",
- value=health_signal,
- raw_value=ecosystem.meso.health_score,
- timestamp=datetime.now(),
- metadata={"level": ecosystem.meso.health_level.value}
- ))
- # 微观状态信号
- if hasattr(ecosystem, 'micro'):
- micro_signal = self._encode_micro_state(ecosystem.micro)
- signals.append(PrimarySignal(
- source="ecosystem",
- signal_type="micro_state",
- value=micro_signal,
- raw_value=ecosystem.micro.state.value,
- timestamp=datetime.now()
- ))
- return signals
- def collect_from_indicators(
- self,
- price_data: pd.DataFrame,
- trend_quality_score: Optional[float] = None,
- kalman_trend: Optional[float] = None,
- rsrs_score: Optional[float] = None
- ) -> List[PrimarySignal]:
- """从技术指标采集信号"""
- signals = []
- # 趋势质量信号
- if trend_quality_score is not None:
- signals.append(PrimarySignal(
- source="indicator",
- signal_type="trend_quality",
- value=(trend_quality_score - 50) / 50,
- raw_value=trend_quality_score,
- timestamp=datetime.now()
- ))
- # 卡尔曼趋势信号
- if kalman_trend is not None:
- signals.append(PrimarySignal(
- source="indicator",
- signal_type="kalman_trend",
- value=np.clip(kalman_trend * 100, -1, 1),
- raw_value=kalman_trend,
- timestamp=datetime.now()
- ))
- # RSRS信号
- if rsrs_score is not None:
- signals.append(PrimarySignal(
- source="indicator",
- signal_type="rsrs",
- value=(rsrs_score - 0.5) * 2, # 0-1 映射到 -1,1
- raw_value=rsrs_score,
- timestamp=datetime.now()
- ))
- # MACD信号
- macd_signal = self._calculate_macd_signal(price_data)
- if macd_signal is not None:
- signals.append(PrimarySignal(
- source="indicator",
- signal_type="macd",
- value=macd_signal,
- raw_value=macd_signal,
- timestamp=datetime.now()
- ))
- return signals
- def collect_from_agents(self, agent_signals: Dict[str, Any]) -> List[PrimarySignal]:
- """从智能体采集信号"""
- signals = []
- for agent_name, signal in agent_signals.items():
- if signal is None:
- continue
- # 转换方向为数值
- direction_value = {
- "long": 1.0,
- "short": -1.0,
- "neutral": 0.0
- }.get(signal.direction.value, 0.0)
- signals.append(PrimarySignal(
- source=f"agent_{agent_name}",
- signal_type="agent_direction",
- value=direction_value * signal.confidence,
- raw_value=direction_value,
- timestamp=datetime.now(),
- metadata={
- "confidence": signal.confidence,
- "strength": signal.strength.value if hasattr(signal, 'strength') else "medium"
- }
- ))
- return signals
- def _encode_macro_regime(self, macro: Any) -> float:
- """编码宏观生态为数值信号"""
- regime_map = {
- "spring": 0.5,
- "summer": 1.0,
- "autumn": -0.5,
- "winter": -1.0,
- "unknown": 0.0
- }
- return regime_map.get(macro.regime.value, 0.0)
- def _encode_micro_state(self, micro: Any) -> float:
- """编码微观状态为数值信号"""
- state_map = {
- "trending": 1.0,
- "ranging": 0.0,
- "reversing": -1.0
- }
- return state_map.get(micro.state.value, 0.0)
- def _calculate_macd_signal(self, data: pd.DataFrame) -> Optional[float]:
- """计算MACD信号"""
- if len(data) < 35:
- return None
- exp1 = data['close'].ewm(span=12, adjust=False).mean()
- exp2 = data['close'].ewm(span=26, adjust=False).mean()
- macd = exp1 - exp2
- signal = macd.ewm(span=9, adjust=False).mean()
- # 标准化
- histogram = macd - signal
- return np.clip(histogram.iloc[-1] / data['close'].iloc[-1] * 100, -1, 1)
- def get_collected_signals(self) -> List[PrimarySignal]:
- """获取所有采集的信号"""
- return self.signal_history
|