""" 贝叶斯信号融合引擎 使用贝叶斯网络进行非线性信号融合 P(上涨|信号,生态) ∝ P(生态) × ∏ P(信号i|上涨,生态) / P(信号i|生态) """ from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Any from datetime import datetime import numpy as np import pandas as pd from scipy import stats @dataclass class FusedSignal: """融合后的信号""" up_probability: float # 上涨概率 down_probability: float # 下跌概率 neutral_probability: float # 横盘概率 overall_confidence: float # 整体置信度 signal_grade: str # 信号等级:strong/medium/weak/none recommended_action: str # 建议动作 fusion_metadata: Dict[str, Any] = field(default_factory=dict) timestamp: datetime = field(default_factory=datetime.now) class BayesianSignalFusion: """ 贝叶斯信号融合器 融合逻辑: 1. 一级信号 → 贝叶斯网络输入 2. 生态作为隐变量调节条件概率 3. 输出后验概率分布 """ def __init__( self, prior_up: float = 0.33, prior_down: float = 0.33, prior_neutral: float = 0.34, min_sample_size: int = 1000 ): self.prior = { "up": prior_up, "down": prior_down, "neutral": prior_neutral } self.min_sample_size = min_sample_size # 历史条件概率表(简化实现) self.conditional_probs: Dict[str, Dict] = {} def fuse( self, primary_signals: List[Any], ecosystem: Any, fallback_to_weighted: bool = True ) -> FusedSignal: """ 执行贝叶斯融合 Args: primary_signals: 一级信号列表 ecosystem: 市场生态 fallback_to_weighted: 小样本时是否回退到加权平均 Returns: FusedSignal: 融合后的信号 """ if not primary_signals: return FusedSignal( up_probability=self.prior["up"], down_probability=self.prior["down"], neutral_probability=self.prior["neutral"], overall_confidence=0.0, signal_grade="none", recommended_action="hold" ) # 检查样本量 if fallback_to_weighted: sample_size = self._estimate_sample_size(ecosystem) if sample_size < self.min_sample_size: return self._weighted_fusion(primary_signals, ecosystem, sample_size) # 贝叶斯推断 posterior = self._bayesian_inference(primary_signals, ecosystem) # 确定信号等级和建议 grade, action = self._determine_signal_grade(posterior) return FusedSignal( up_probability=posterior["up"], down_probability=posterior["down"], neutral_probability=posterior["neutral"], overall_confidence=self._calculate_confidence(posterior, primary_signals), signal_grade=grade, recommended_action=action, fusion_metadata={ "method": "bayesian", "signal_count": len(primary_signals), "prior": self.prior.copy(), "posterior": posterior } ) def _bayesian_inference( self, signals: List[Any], ecosystem: Any ) -> Dict[str, float]: """执行贝叶斯推断""" # 初始化后验为 prior posterior = self.prior.copy() # 获取生态条件 regime = getattr(ecosystem.macro, 'regime', None) if ecosystem else None # 对每个信号更新后验 for signal in signals: likelihood = self._calculate_likelihood(signal, regime) # 贝叶斯更新: P(H|D) ∝ P(D|H) * P(H) for direction in ["up", "down", "neutral"]: posterior[direction] *= likelihood.get(direction, 0.33) # 归一化 total = sum(posterior.values()) if total > 0: posterior = {k: v/total for k, v in posterior.items()} return posterior def _calculate_likelihood( self, signal: Any, regime: Any ) -> Dict[str, float]: """计算似然函数 P(信号|方向,生态)""" value = getattr(signal, 'value', 0) # 基于信号值和生态计算似然 # 简化模型:信号值越正,上涨概率越高 likelihood = { "up": max(0.1, min(0.9, 0.5 + value * 0.4)), "down": max(0.1, min(0.9, 0.5 - value * 0.4)), "neutral": max(0.1, min(0.9, 1 - abs(value) * 0.5)) } # 生态调节 if regime: regime_boost = { "summer": {"up": 1.2, "down": 0.8, "neutral": 0.9}, "winter": {"up": 0.8, "down": 1.2, "neutral": 0.9}, "spring": {"up": 1.1, "down": 0.9, "neutral": 0.95}, "autumn": {"up": 0.9, "down": 0.9, "neutral": 1.1} }.get(regime.value, {"up": 1, "down": 1, "neutral": 1}) for direction in likelihood: likelihood[direction] *= regime_boost[direction] # 重新归一化 total = sum(likelihood.values()) return {k: v/total for k, v in likelihood.items()} def _weighted_fusion( self, signals: List[Any], ecosystem: Any, sample_size: int ) -> FusedSignal: """小样本回退:加权平均融合""" if not signals: return self._create_neutral_signal("no_signals") # 加权平均 weighted_sum = sum(s.value * getattr(s, 'confidence', 0.5) for s in signals) total_weight = sum(getattr(s, 'confidence', 0.5) for s in signals) if total_weight == 0: return self._create_neutral_signal("zero_weight") avg_signal = weighted_sum / total_weight # 转换为概率 up_prob = max(0, min(1, 0.5 + avg_signal * 0.5)) down_prob = max(0, min(1, 0.5 - avg_signal * 0.5)) neutral_prob = 1 - abs(avg_signal) # 归一化 total = up_prob + down_prob + neutral_prob probs = { "up": up_prob / total, "down": down_prob / total, "neutral": neutral_prob / total } grade, action = self._determine_signal_grade(probs) return FusedSignal( up_probability=probs["up"], down_probability=probs["down"], neutral_probability=probs["neutral"], overall_confidence=abs(avg_signal) * (sample_size / self.min_sample_size), signal_grade=grade, recommended_action=action, fusion_metadata={ "method": "weighted_fallback", "reason": f"sample_size {sample_size} < {self.min_sample_size}", "signal_count": len(signals) } ) def _determine_signal_grade(self, probs: Dict[str, float]) -> Tuple[str, str]: """确定信号等级""" max_prob = max(probs.values()) dominant = max(probs, key=probs.get) if max_prob > 0.7 and probs["neutral"] < 0.3: grade = "strong" elif max_prob > 0.55: grade = "medium" elif max_prob > 0.45: grade = "weak" else: grade = "none" # 建议动作 if dominant == "up" and grade in ["strong", "medium"]: action = "buy" elif dominant == "down" and grade in ["strong", "medium"]: action = "sell" else: action = "hold" return grade, action def _calculate_confidence( self, posterior: Dict[str, float], signals: List[Any] ) -> float: """计算整体置信度""" # 基于概率分布的熵计算置信度 entropy = -sum(p * np.log(p + 1e-10) for p in posterior.values()) max_entropy = np.log(3) # 三分类最大熵 confidence = 1 - (entropy / max_entropy) # 信号数量加成 signal_bonus = min(0.2, len(signals) * 0.02) return min(1.0, confidence + signal_bonus) def _estimate_sample_size(self, ecosystem: Any) -> int: """估算当前生态的历史样本量""" # 简化:返回固定值,实际应从数据库查询 return 2000 # 假设充足样本 def _create_neutral_signal(self, reason: str) -> FusedSignal: """创建中性信号""" return FusedSignal( up_probability=self.prior["up"], down_probability=self.prior["down"], neutral_probability=self.prior["neutral"], overall_confidence=0.0, signal_grade="none", recommended_action="hold", fusion_metadata={"reason": reason} )