| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- """
- 贝叶斯信号融合引擎
- 使用贝叶斯网络进行非线性信号融合
- P(上涨|信号,生态) ∝ P(生态) × ∏ P(信号i|上涨,生态) / P(信号i|生态)
- """
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple, Any
- from datetime import datetime
- import numpy as np
- import pandas as pd
- from scipy import stats
- @dataclass
- class FusedSignal:
- """融合后的信号"""
- up_probability: float # 上涨概率
- down_probability: float # 下跌概率
- neutral_probability: float # 横盘概率
- overall_confidence: float # 整体置信度
- signal_grade: str # 信号等级:strong/medium/weak/none
- recommended_action: str # 建议动作
- fusion_metadata: Dict[str, Any] = field(default_factory=dict)
- timestamp: datetime = field(default_factory=datetime.now)
- class BayesianSignalFusion:
- """
- 贝叶斯信号融合器
- 融合逻辑:
- 1. 一级信号 → 贝叶斯网络输入
- 2. 生态作为隐变量调节条件概率
- 3. 输出后验概率分布
- """
- def __init__(
- self,
- prior_up: float = 0.33,
- prior_down: float = 0.33,
- prior_neutral: float = 0.34,
- min_sample_size: int = 1000
- ):
- self.prior = {
- "up": prior_up,
- "down": prior_down,
- "neutral": prior_neutral
- }
- self.min_sample_size = min_sample_size
- # 历史条件概率表(简化实现)
- self.conditional_probs: Dict[str, Dict] = {}
- def fuse(
- self,
- primary_signals: List[Any],
- ecosystem: Any,
- fallback_to_weighted: bool = True
- ) -> FusedSignal:
- """
- 执行贝叶斯融合
- Args:
- primary_signals: 一级信号列表
- ecosystem: 市场生态
- fallback_to_weighted: 小样本时是否回退到加权平均
- Returns:
- FusedSignal: 融合后的信号
- """
- if not primary_signals:
- return FusedSignal(
- up_probability=self.prior["up"],
- down_probability=self.prior["down"],
- neutral_probability=self.prior["neutral"],
- overall_confidence=0.0,
- signal_grade="none",
- recommended_action="hold"
- )
- # 检查样本量
- if fallback_to_weighted:
- sample_size = self._estimate_sample_size(ecosystem)
- if sample_size < self.min_sample_size:
- return self._weighted_fusion(primary_signals, ecosystem, sample_size)
- # 贝叶斯推断
- posterior = self._bayesian_inference(primary_signals, ecosystem)
- # 确定信号等级和建议
- grade, action = self._determine_signal_grade(posterior)
- return FusedSignal(
- up_probability=posterior["up"],
- down_probability=posterior["down"],
- neutral_probability=posterior["neutral"],
- overall_confidence=self._calculate_confidence(posterior, primary_signals),
- signal_grade=grade,
- recommended_action=action,
- fusion_metadata={
- "method": "bayesian",
- "signal_count": len(primary_signals),
- "prior": self.prior.copy(),
- "posterior": posterior
- }
- )
- def _bayesian_inference(
- self,
- signals: List[Any],
- ecosystem: Any
- ) -> Dict[str, float]:
- """执行贝叶斯推断"""
- # 初始化后验为 prior
- posterior = self.prior.copy()
- # 获取生态条件
- regime = getattr(ecosystem.macro, 'regime', None) if ecosystem else None
- # 对每个信号更新后验
- for signal in signals:
- likelihood = self._calculate_likelihood(signal, regime)
- # 贝叶斯更新: P(H|D) ∝ P(D|H) * P(H)
- for direction in ["up", "down", "neutral"]:
- posterior[direction] *= likelihood.get(direction, 0.33)
- # 归一化
- total = sum(posterior.values())
- if total > 0:
- posterior = {k: v/total for k, v in posterior.items()}
- return posterior
- def _calculate_likelihood(
- self,
- signal: Any,
- regime: Any
- ) -> Dict[str, float]:
- """计算似然函数 P(信号|方向,生态)"""
- value = getattr(signal, 'value', 0)
- # 基于信号值和生态计算似然
- # 简化模型:信号值越正,上涨概率越高
- likelihood = {
- "up": max(0.1, min(0.9, 0.5 + value * 0.4)),
- "down": max(0.1, min(0.9, 0.5 - value * 0.4)),
- "neutral": max(0.1, min(0.9, 1 - abs(value) * 0.5))
- }
- # 生态调节
- if regime:
- regime_boost = {
- "summer": {"up": 1.2, "down": 0.8, "neutral": 0.9},
- "winter": {"up": 0.8, "down": 1.2, "neutral": 0.9},
- "spring": {"up": 1.1, "down": 0.9, "neutral": 0.95},
- "autumn": {"up": 0.9, "down": 0.9, "neutral": 1.1}
- }.get(regime.value, {"up": 1, "down": 1, "neutral": 1})
- for direction in likelihood:
- likelihood[direction] *= regime_boost[direction]
- # 重新归一化
- total = sum(likelihood.values())
- return {k: v/total for k, v in likelihood.items()}
- def _weighted_fusion(
- self,
- signals: List[Any],
- ecosystem: Any,
- sample_size: int
- ) -> FusedSignal:
- """小样本回退:加权平均融合"""
- if not signals:
- return self._create_neutral_signal("no_signals")
- # 加权平均
- weighted_sum = sum(s.value * getattr(s, 'confidence', 0.5) for s in signals)
- total_weight = sum(getattr(s, 'confidence', 0.5) for s in signals)
- if total_weight == 0:
- return self._create_neutral_signal("zero_weight")
- avg_signal = weighted_sum / total_weight
- # 转换为概率
- up_prob = max(0, min(1, 0.5 + avg_signal * 0.5))
- down_prob = max(0, min(1, 0.5 - avg_signal * 0.5))
- neutral_prob = 1 - abs(avg_signal)
- # 归一化
- total = up_prob + down_prob + neutral_prob
- probs = {
- "up": up_prob / total,
- "down": down_prob / total,
- "neutral": neutral_prob / total
- }
- grade, action = self._determine_signal_grade(probs)
- return FusedSignal(
- up_probability=probs["up"],
- down_probability=probs["down"],
- neutral_probability=probs["neutral"],
- overall_confidence=abs(avg_signal) * (sample_size / self.min_sample_size),
- signal_grade=grade,
- recommended_action=action,
- fusion_metadata={
- "method": "weighted_fallback",
- "reason": f"sample_size {sample_size} < {self.min_sample_size}",
- "signal_count": len(signals)
- }
- )
- def _determine_signal_grade(self, probs: Dict[str, float]) -> Tuple[str, str]:
- """确定信号等级"""
- max_prob = max(probs.values())
- dominant = max(probs, key=probs.get)
- if max_prob > 0.7 and probs["neutral"] < 0.3:
- grade = "strong"
- elif max_prob > 0.55:
- grade = "medium"
- elif max_prob > 0.45:
- grade = "weak"
- else:
- grade = "none"
- # 建议动作
- if dominant == "up" and grade in ["strong", "medium"]:
- action = "buy"
- elif dominant == "down" and grade in ["strong", "medium"]:
- action = "sell"
- else:
- action = "hold"
- return grade, action
- def _calculate_confidence(
- self,
- posterior: Dict[str, float],
- signals: List[Any]
- ) -> float:
- """计算整体置信度"""
- # 基于概率分布的熵计算置信度
- entropy = -sum(p * np.log(p + 1e-10) for p in posterior.values())
- max_entropy = np.log(3) # 三分类最大熵
- confidence = 1 - (entropy / max_entropy)
- # 信号数量加成
- signal_bonus = min(0.2, len(signals) * 0.02)
- return min(1.0, confidence + signal_bonus)
- def _estimate_sample_size(self, ecosystem: Any) -> int:
- """估算当前生态的历史样本量"""
- # 简化:返回固定值,实际应从数据库查询
- return 2000 # 假设充足样本
- def _create_neutral_signal(self, reason: str) -> FusedSignal:
- """创建中性信号"""
- return FusedSignal(
- up_probability=self.prior["up"],
- down_probability=self.prior["down"],
- neutral_probability=self.prior["neutral"],
- overall_confidence=0.0,
- signal_grade="none",
- recommended_action="hold",
- fusion_metadata={"reason": reason}
- )
|