| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- """
- 生态融合器
- 整合四层生态识别结果(宏观/中观/微观/瞬时)为统一的市场生态对象
- """
- from dataclasses import dataclass, field
- from datetime import datetime
- from typing import Dict, List, Optional, Any
- from enum import Enum
- import pandas as pd
- from .macro import MacroEcosystem, MacroRegime, MacroEcosystemIdentifier
- from .meso import MesoEcosystem, HealthLevel, MesoEcosystemIdentifier
- from .micro import MicroEcosystem, MicroState, MicroEcosystemIdentifier
- from .instant import InstantEcosystem, InstantEcosystemIdentifier
- @dataclass
- class UnifiedEcosystem:
- """
- 统一市场生态对象
- 融合四层生态识别结果,提供完整的市场状态描述
- """
- timestamp: datetime
- # 各层生态 (必须参数在前)
- macro: MacroEcosystem
- meso: MesoEcosystem
- micro: MicroEcosystem
- # 融合结果
- overall_regime: str # 综合生态类型
- confidence: float # 整体置信度 (0-1)
- trading_bias: str # 交易倾向 "long" / "short" / "neutral"
- risk_level: str # 风险等级 "low" / "medium" / "high"
- # 建议参数
- suggested_position: float # 建议仓位 (0-1)
- suggested_agents: List[str] # 建议激活的智能体
- # 可选参数 (带默认值的在后)
- instant: Optional[InstantEcosystem] = None
- warnings: List[str] = field(default_factory=list)
- def to_dict(self) -> Dict[str, Any]:
- """转换为字典格式"""
- return {
- "timestamp": self.timestamp.isoformat(),
- "overall_regime": self.overall_regime,
- "confidence": self.confidence,
- "trading_bias": self.trading_bias,
- "risk_level": self.risk_level,
- "suggested_position": self.suggested_position,
- "suggested_agents": self.suggested_agents,
- "warnings": self.warnings,
- "macro": {
- "regime": self.macro.regime.value,
- "confidence": self.macro.confidence,
- "description": self.macro.description
- },
- "meso": {
- "health_score": self.meso.health_score,
- "health_level": self.meso.health_level.value
- },
- "micro": {
- "state": self.micro.state.value,
- "toxicity": self.micro.flow_toxicity.value,
- "smart_money": self.micro.smart_money.direction if self.micro.smart_money.detected else None
- },
- "instant": {
- "imbalance": self.instant.imbalance_direction.value if self.instant else None,
- "block_flow": self.instant.block_trade_flow if self.instant else 0
- } if self.instant else None
- }
- class EcosystemFusion:
- """
- 生态融合器
- 整合四层生态识别结果,生成统一的交易决策建议
- """
- def __init__(
- self,
- macro_identifier: Optional[MacroEcosystemIdentifier] = None,
- meso_identifier: Optional[MesoEcosystemIdentifier] = None,
- micro_identifier: Optional[MicroEcosystemIdentifier] = None,
- instant_identifier: Optional[InstantEcosystemIdentifier] = None
- ):
- self.macro_id = macro_identifier or MacroEcosystemIdentifier()
- self.meso_id = meso_identifier or MesoEcosystemIdentifier()
- self.micro_id = micro_identifier or MicroEcosystemIdentifier()
- self.instant_id = instant_identifier or InstantEcosystemIdentifier()
- def fuse(
- self,
- price_data: pd.DataFrame,
- sector_data: Optional[pd.DataFrame] = None,
- tick_data: Optional[pd.DataFrame] = None,
- order_book_data: Optional[pd.DataFrame] = None,
- trade_data: Optional[pd.DataFrame] = None
- ) -> UnifiedEcosystem:
- """
- 融合四层生态识别结果
- Args:
- price_data: 日线/30分钟价格数据
- sector_data: 板块数据(可选)
- tick_data: 分钟级tick数据(可选)
- order_book_data: 订单簿数据(可选)
- trade_data: 成交数据(可选)
- Returns:
- UnifiedEcosystem: 统一生态对象
- """
- timestamp = pd.Timestamp.now()
- # 1. 宏观生态识别(日度更新)
- macro = self.macro_id.identify(price_data, sector_data)
- # 2. 中观生态识别(日度更新)
- meso = self.meso_id.identify(price_data, order_book_data, trade_data)
- # 3. 微观生态识别(30分钟更新)
- # 训练HMM(如果未训练)
- if not self.micro_id._is_fitted:
- self.micro_id.fit(price_data)
- micro = self.micro_id.identify(price_data, trade_data, order_book_data)
- # 4. 瞬时生态识别(分钟级,可选)
- instant = None
- if tick_data is not None and not tick_data.empty:
- instant = self.instant_id.identify(
- tick_data, order_book_data, timestamp
- )
- # 5. 融合决策
- overall_regime = self._determine_overall_regime(macro, meso, micro)
- confidence = self._calculate_confidence(macro, meso, micro)
- trading_bias = self._determine_trading_bias(macro, micro, instant)
- risk_level = self._determine_risk_level(meso, micro, instant)
- suggested_position = self._calculate_position(
- macro, meso, micro, risk_level
- )
- suggested_agents = self._recommend_agents(macro, micro)
- warnings = self._generate_warnings(macro, meso, micro, instant)
- return UnifiedEcosystem(
- timestamp=timestamp,
- macro=macro,
- meso=meso,
- micro=micro,
- overall_regime=overall_regime,
- confidence=confidence,
- trading_bias=trading_bias,
- risk_level=risk_level,
- suggested_position=suggested_position,
- suggested_agents=suggested_agents,
- instant=instant,
- warnings=warnings
- )
- def _determine_overall_regime(
- self,
- macro: MacroEcosystem,
- meso: MesoEcosystem,
- micro: MicroEcosystem
- ) -> str:
- """确定综合生态类型"""
- # 主要基于宏观生态
- base_regime = macro.regime.value
- # 中观结构健康度修正
- if meso.health_level == HealthLevel.LOW:
- base_regime += "_fragile"
- # 微观状态修正
- if micro.state == MicroState.REVERSING:
- base_regime += "_reversing"
- elif micro.state == MicroState.TRENDING:
- base_regime += "_trending"
- return base_regime
- def _calculate_confidence(
- self,
- macro: MacroEcosystem,
- meso: MesoEcosystem,
- micro: MicroEcosystem
- ) -> float:
- """计算整体置信度"""
- # 加权平均各层置信度
- macro_conf = macro.confidence
- meso_conf = meso.health_score / 100
- micro_conf = max(micro.state_probability.values())
- confidence = (
- macro_conf * 0.4 +
- meso_conf * 0.3 +
- micro_conf * 0.3
- )
- return min(1.0, max(0.0, confidence))
- def _determine_trading_bias(
- self,
- macro: MacroEcosystem,
- micro: MicroEcosystem,
- instant: Optional[InstantEcosystem]
- ) -> str:
- """确定交易倾向"""
- # 基于宏观生态
- if macro.regime in [MacroRegime.SPRING, MacroRegime.SUMMER]:
- bias = "long"
- elif macro.regime == MacroRegime.WINTER:
- bias = "short"
- else:
- bias = "neutral"
- # 微观状态修正
- if micro.state == MicroState.REVERSING:
- # 反转状态,反向操作或观望
- bias = "neutral"
- # 瞬时信号修正
- if instant is not None:
- if instant.imbalance_direction.value == "bid_dominant":
- bias = "long"
- elif instant.imbalance_direction.value == "ask_dominant":
- bias = "short"
- # 有毒订单流过滤
- if micro.flow_toxicity.value in ["high", "medium"]:
- bias = "neutral"
- return bias
- def _determine_risk_level(
- self,
- meso: MesoEcosystem,
- micro: MicroEcosystem,
- instant: Optional[InstantEcosystem]
- ) -> str:
- """确定风险等级"""
- risk_score = 0
- # 中观结构健康度
- if meso.health_level == HealthLevel.LOW:
- risk_score += 3
- elif meso.health_level == HealthLevel.MEDIUM:
- risk_score += 1
- # 微观有毒订单流
- if micro.flow_toxicity.value == "high":
- risk_score += 2
- elif micro.flow_toxicity.value == "medium":
- risk_score += 1
- # 微观状态
- if micro.state == MicroState.REVERSING:
- risk_score += 1
- # 瞬时跳动率
- if instant is not None:
- if instant.tick_activity.value == "spike":
- risk_score += 1
- if risk_score >= 4:
- return "high"
- elif risk_score >= 2:
- return "medium"
- else:
- return "low"
- def _calculate_position(
- self,
- macro: MacroEcosystem,
- meso: MesoEcosystem,
- micro: MicroEcosystem,
- risk_level: str
- ) -> float:
- """计算建议仓位"""
- # 基础仓位(基于宏观生态)
- base_position = {
- MacroRegime.SPRING: 0.6,
- MacroRegime.SUMMER: 1.0,
- MacroRegime.AUTUMN: 0.4,
- MacroRegime.WINTER: 0.2,
- MacroRegime.UNKNOWN: 0.0
- }.get(macro.regime, 0.0)
- # 中观修正
- health_factor = meso.health_score / 100
- # 微观修正
- if micro.flow_toxicity.value == "high":
- micro_factor = 0.3
- elif micro.flow_toxicity.value == "medium":
- micro_factor = 0.6
- else:
- micro_factor = 1.0
- # 风险修正
- risk_factor = {
- "low": 1.0,
- "medium": 0.7,
- "high": 0.4
- }.get(risk_level, 0.5)
- suggested = base_position * health_factor * micro_factor * risk_factor
- return min(1.0, max(0.0, suggested))
- def _recommend_agents(
- self,
- macro: MacroEcosystem,
- micro: MicroEcosystem
- ) -> List[str]:
- """推荐激活的智能体"""
- agents = []
- # 基于宏观生态推荐
- regime_agents = {
- MacroRegime.SPRING: ["trend_hunter", "mean_reversion"],
- MacroRegime.SUMMER: ["trend_hunter", "momentum_surfer"],
- MacroRegime.AUTUMN: ["structure_arbitrage", "momentum_surfer"],
- MacroRegime.WINTER: ["mean_reversion", "volatility_seller"],
- MacroRegime.UNKNOWN: []
- }
- agents.extend(regime_agents.get(macro.regime, []))
- # 微观状态调整
- if micro.state == MicroState.TRENDING:
- if "trend_hunter" not in agents:
- agents.append("trend_hunter")
- elif micro.state == MicroState.RANGING:
- if "mean_reversion" not in agents:
- agents.append("mean_reversion")
- # 主力资金信号
- if micro.smart_money.detected:
- if micro.smart_money.direction == "accumulate":
- if "trend_hunter" not in agents:
- agents.append("trend_hunter")
- elif micro.smart_money.direction == "distribute":
- if "mean_reversion" not in agents:
- agents.append("mean_reversion")
- return agents
- def _generate_warnings(
- self,
- macro: MacroEcosystem,
- meso: MesoEcosystem,
- micro: MicroEcosystem,
- instant: Optional[InstantEcosystem]
- ) -> List[str]:
- """生成警告信息"""
- warnings = []
- # 宏观警告
- if macro.regime == MacroRegime.UNKNOWN:
- warnings.append("宏观生态不明,建议观望")
- # 中观警告
- if meso.health_level == HealthLevel.LOW:
- warnings.append(f"市场结构健康度低 ({meso.health_score:.1f}),流动性风险")
- # 微观警告
- warnings.extend(micro.warnings)
- # 瞬时警告
- if instant is not None:
- if instant.tick_activity.value == "spike":
- warnings.append("跳动率突变,市场活跃度异常")
- if abs(instant.block_trade_flow) > 2000: # 2000万
- direction = "流入" if instant.block_trade_flow > 0 else "流出"
- warnings.append(f"大单巨额{direction}: {abs(instant.block_trade_flow):.0f}万元")
- return warnings
|