| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- """
- 微观生态识别器(扩展版)
- 基于HMM三态模型 + 订单流毒性检测 + 主力资金识别
- """
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple
- from enum import Enum
- import numpy as np
- import pandas as pd
- from hmmlearn import hmm
- class MicroState(Enum):
- """微观生态状态"""
- RANGING = "ranging" # 震荡
- TRENDING = "trending" # 趋势
- REVERSING = "reversing" # 反转
- class FlowToxicity(Enum):
- """订单流毒性等级"""
- NONE = "none" # 无毒性
- LOW = "low" # 轻度
- MEDIUM = "medium" # 中度
- HIGH = "high" # 高度毒性
- @dataclass
- class SmartMoneySignal:
- """主力资金信号"""
- detected: bool
- direction: str # "accumulate", "distribute", "neutral"
- confidence: float
- large_order_count: int
- avg_order_size: float
- @dataclass
- class MicroEcosystem:
- """微观生态数据结构"""
- state: MicroState
- state_probability: Dict[MicroState, float]
- flow_toxicity: FlowToxicity
- smart_money: SmartMoneySignal
- hmm_features: Dict[str, float]
- warnings: List[str] = field(default_factory=list)
- class MicroEcosystemIdentifier:
- """
- 微观生态识别器(扩展版)
- 功能:
- 1. HMM三态识别(震荡/趋势/反转)
- 2. 订单流毒性检测
- 3. 主力资金(Smart Money)识别
- """
- def __init__(
- self,
- n_components: int = 3,
- covariance_type: str = "full",
- n_iter: int = 100,
- # 有毒订单流检测参数
- toxic_price_reversal: float = 0.003,
- toxic_depth_ratio: float = 0.8,
- toxic_duration: int = 10,
- # 主力资金识别参数
- smart_money_threshold: float = 1_000_000,
- accumulation_period: int = 5
- ):
- self.n_components = n_components
- self.covariance_type = covariance_type
- self.n_iter = n_iter
- # 有毒订单流参数
- self.toxic_price_reversal = toxic_price_reversal
- self.toxic_depth_ratio = toxic_depth_ratio
- self.toxic_duration = toxic_duration
- # 主力资金参数
- self.smart_money_threshold = smart_money_threshold
- self.accumulation_period = accumulation_period
- # HMM模型
- self.hmm_model: Optional[hmm.GaussianHMM] = None
- self._is_fitted = False
- def fit(self, price_data: pd.DataFrame):
- """
- 训练HMM模型
- Args:
- price_data: 历史价格数据
- """
- features = self._extract_hmm_features(price_data)
- self.hmm_model = hmm.GaussianHMM(
- n_components=self.n_components,
- covariance_type=self.covariance_type,
- n_iter=self.n_iter,
- random_state=42
- )
- self.hmm_model.fit(features)
- self._is_fitted = True
- # 识别各状态对应的市场类型
- self._label_states(price_data, features)
- def identify(
- self,
- price_data: pd.DataFrame,
- trade_data: Optional[pd.DataFrame] = None,
- order_book_data: Optional[pd.DataFrame] = None
- ) -> MicroEcosystem:
- """
- 识别微观生态
- Args:
- price_data: 价格数据
- trade_data: 成交数据(可选)
- order_book_data: 订单簿数据(可选)
- Returns:
- MicroEcosystem: 微观生态识别结果
- """
- warnings = []
- # 1. HMM状态识别
- if not self._is_fitted:
- self.fit(price_data)
- hmm_state, state_probs, features = self._predict_hmm_state(price_data)
- # 2. 订单流毒性检测
- toxicity = self._detect_flow_toxicity(
- price_data, trade_data, order_book_data
- )
- if toxicity in [FlowToxicity.HIGH, FlowToxicity.MEDIUM]:
- warnings.append(f"Detected {toxicity.value} toxic order flow")
- # 3. 主力资金识别
- smart_money = self._detect_smart_money(price_data, trade_data)
- if smart_money.detected:
- warnings.append(
- f"Smart money {smart_money.direction} detected "
- f"(confidence: {smart_money.confidence:.2f})"
- )
- return MicroEcosystem(
- state=hmm_state,
- state_probability=state_probs,
- flow_toxicity=toxicity,
- smart_money=smart_money,
- hmm_features=features,
- warnings=warnings
- )
- def _extract_hmm_features(self, data: pd.DataFrame) -> np.ndarray:
- """提取HMM特征"""
- # 特征1: 对数收益率
- log_returns = np.log(data['close'] / data['close'].shift(1))
- # 特征2: 波动率(20日)
- volatility = log_returns.rolling(20).std()
- # 特征3: 价格位置(在20日区间中的位置)
- price_position = (data['close'] - data['low'].rolling(20).min()) / \
- (data['high'].rolling(20).max() - data['low'].rolling(20).min() + 1e-10)
- # 特征4: 成交量变化
- volume_change = data['volume'].pct_change()
- features = pd.DataFrame({
- 'log_returns': log_returns,
- 'volatility': volatility,
- 'price_position': price_position,
- 'volume_change': volume_change
- }).dropna()
- return features.values
- def _label_states(self, price_data: pd.DataFrame, features: np.ndarray):
- """根据历史表现标记HMM状态"""
- hidden_states = self.hmm_model.predict(features)
- # 计算各状态的统计特征来标记
- returns = pd.Series(features[:, 0])
- volatility = pd.Series(features[:, 1])
- state_stats = {}
- for state in range(self.n_components):
- mask = hidden_states == state
- if mask.sum() > 0:
- state_stats[state] = {
- 'volatility': volatility[mask].mean(),
- 'trend_strength': abs(returns[mask].mean())
- }
- # 标记状态:高波动+弱趋势=震荡,低波动+强趋势=趋势,高波动+强趋势=反转
- self.state_labels = {}
- for state, stats in state_stats.items():
- vol = stats['volatility']
- trend = stats['trend_strength']
- if vol > np.percentile(list(s['volatility'] for s in state_stats.values()), 66):
- if trend > np.percentile(list(s['trend_strength'] for s in state_stats.values()), 50):
- self.state_labels[state] = MicroState.REVERSING
- else:
- self.state_labels[state] = MicroState.RANGING
- else:
- if trend > np.percentile(list(s['trend_strength'] for s in state_stats.values()), 50):
- self.state_labels[state] = MicroState.TRENDING
- else:
- self.state_labels[state] = MicroState.RANGING
- def _predict_hmm_state(
- self,
- price_data: pd.DataFrame
- ) -> Tuple[MicroState, Dict[MicroState, float], Dict[str, float]]:
- """预测当前HMM状态"""
- features = self._extract_hmm_features(price_data)
- if len(features) == 0:
- return MicroState.RANGING, {s: 1/3 for s in MicroState}, {}
- # 预测状态概率
- log_prob, state_probs = self.hmm_model.score_samples(features[-1:])
- # 获取最新特征值
- latest_features = {
- 'log_returns': features[-1, 0],
- 'volatility': features[-1, 1],
- 'price_position': features[-1, 2],
- 'volume_change': features[-1, 3]
- }
- # 映射到标记的状态
- probs_by_state = {}
- for state_idx, prob in enumerate(state_probs[0]):
- labeled_state = self.state_labels.get(state_idx, MicroState.RANGING)
- probs_by_state[labeled_state] = probs_by_state.get(labeled_state, 0) + prob
- # 最可能状态
- current_state = max(probs_by_state, key=probs_by_state.get)
- return current_state, probs_by_state, latest_features
- def _detect_flow_toxicity(
- self,
- price_data: pd.DataFrame,
- trade_data: Optional[pd.DataFrame] = None,
- order_book_data: Optional[pd.DataFrame] = None
- ) -> FlowToxicity:
- """
- 检测订单流毒性
- 有毒订单流特征:
- 1. 大单成交后价格反向运动
- 2. 买卖盘深度持续失衡
- """
- toxicity_score = 0.0
- # 检测1: 价格反转
- if trade_data is not None and 'large_trade' in trade_data.columns:
- large_trades = trade_data[trade_data['large_trade'] == True]
- for idx in large_trades.index:
- if idx + 1 in price_data.index:
- trade_price = large_trades.loc[idx, 'price']
- future_price = price_data.loc[idx + 1, 'close']
- reversal = abs(future_price - trade_price) / trade_price
- if reversal > self.toxic_price_reversal:
- toxicity_score += 0.3
- # 简化检测:使用价格与成交量的关系
- returns = price_data['close'].pct_change().abs().iloc[-20:]
- volumes = price_data['volume'].iloc[-20:]
- # 高成交量但价格不动 = 有毒(对敲或洗盘)
- volume_ma = volumes.mean()
- high_volume_periods = volumes > volume_ma * 1.5
- low_volatility_periods = returns < returns.mean() * 0.5
- toxic_periods = (high_volume_periods & low_volatility_periods).sum()
- toxicity_score += min(0.4, toxic_periods * 0.1)
- # 检测2: 买卖盘深度失衡
- if order_book_data is not None:
- if 'bid_depth' in order_book_data and 'ask_depth' in order_book_data:
- depth_ratio = (
- order_book_data['bid_depth'] /
- (order_book_data['ask_depth'] + 1e-10)
- )
- imbalanced_periods = (
- (depth_ratio < self.toxic_depth_ratio) |
- (depth_ratio > 1 / self.toxic_depth_ratio)
- ).sum()
- if imbalanced_periods > self.toxic_duration:
- toxicity_score += 0.3
- # 判断毒性等级
- if toxicity_score >= 0.6:
- return FlowToxicity.HIGH
- elif toxicity_score >= 0.4:
- return FlowToxicity.MEDIUM
- elif toxicity_score >= 0.2:
- return FlowToxicity.LOW
- else:
- return FlowToxicity.NONE
- def _detect_smart_money(
- self,
- price_data: pd.DataFrame,
- trade_data: Optional[pd.DataFrame] = None
- ) -> SmartMoneySignal:
- """
- 识别主力资金行为
- 特征:
- 1. 连续大单买入/卖出
- 2. 价格维持横盘或缓慢运动(隐蔽建仓)
- 3. 委托簿买方/卖方深度持续增加
- """
- if trade_data is None or trade_data.empty:
- return SmartMoneySignal(
- detected=False,
- direction="neutral",
- confidence=0.0,
- large_order_count=0,
- avg_order_size=0.0
- )
- # 识别大单
- recent_data = trade_data.iloc[-self.accumulation_period * 10:]
- avg_volume = recent_data['volume'].mean()
- large_orders = recent_data[
- recent_data['volume'] > avg_volume * 3
- ]
- if len(large_orders) < 3:
- return SmartMoneySignal(
- detected=False,
- direction="neutral",
- confidence=0.0,
- large_order_count=len(large_orders),
- avg_order_size=large_orders['volume'].mean() if len(large_orders) > 0 else 0.0
- )
- # 分析大单方向
- if 'side' in large_orders.columns:
- buy_orders = large_orders[large_orders['side'] == 'buy']
- sell_orders = large_orders[large_orders['side'] == 'sell']
- else:
- # 通过价格变动推断方向
- price_change = price_data['close'].diff()
- buy_orders = large_orders[price_change.loc[large_orders.index] > 0]
- sell_orders = large_orders[price_change.loc[large_orders.index] < 0]
- buy_volume = buy_orders['volume'].sum() if len(buy_orders) > 0 else 0
- sell_volume = sell_orders['volume'].sum() if len(sell_orders) > 0 else 0
- # 检测建仓/出货
- total_large_volume = buy_volume + sell_volume
- if total_large_volume == 0:
- return SmartMoneySignal(
- detected=False,
- direction="neutral",
- confidence=0.0,
- large_order_count=len(large_orders),
- avg_order_size=0.0
- )
- buy_ratio = buy_volume / total_large_volume
- # 价格走势
- price_trend = (
- price_data['close'].iloc[-1] -
- price_data['close'].iloc[-self.accumulation_period]
- ) / price_data['close'].iloc[-self.accumulation_period]
- # 建仓特征:大单买入 + 价格横盘/微涨
- if buy_ratio > 0.6 and abs(price_trend) < 0.02:
- return SmartMoneySignal(
- detected=True,
- direction="accumulate",
- confidence=buy_ratio,
- large_order_count=len(large_orders),
- avg_order_size=large_orders['volume'].mean()
- )
- # 出货特征:大单卖出 + 价格横盘/微跌
- if buy_ratio < 0.4 and abs(price_trend) < 0.02:
- return SmartMoneySignal(
- detected=True,
- direction="distribute",
- confidence=1 - buy_ratio,
- large_order_count=len(large_orders),
- avg_order_size=large_orders['volume'].mean()
- )
- return SmartMoneySignal(
- detected=False,
- direction="neutral",
- confidence=0.0,
- large_order_count=len(large_orders),
- avg_order_size=large_orders['volume'].mean()
- )
|