| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- """
- 中观生态识别器
- 计算市场结构健康度评分(0-100)
- 基于五个维度:价格冲击、订单流平衡、流动性深度、波动率效率、信息冲击响应
- """
- from dataclasses import dataclass
- from typing import Dict, Optional
- from enum import Enum
- import numpy as np
- import pandas as pd
- class HealthLevel(Enum):
- """结构健康度分级"""
- HIGH = "high" # > 70
- MEDIUM = "medium" # 40-70
- LOW = "low" # < 40
- @dataclass
- class MesoEcosystem:
- """中观生态数据结构"""
- health_score: float # 总健康度评分 (0-100)
- health_level: HealthLevel
- price_impact: float # 价格冲击系数
- order_flow: float # 订单流平衡
- liquidity_depth: float # 流动性深度
- volatility_efficiency: float # 波动率效率
- info_response: float # 信息冲击响应
- components: Dict[str, float] # 各维度明细
- class MesoEcosystemIdentifier:
- """
- 中观生态识别器
- 计算市场微观结构健康度:
- 1. 价格冲击系数:大单对价格的影响程度
- 2. 订单流平衡:买卖盘力量对比
- 3. 流动性深度:买卖盘深度
- 4. 波动率效率:波动率与信息到达的关系
- 5. 信息冲击响应:价格对新信息的反应速度
- """
- def __init__(
- self,
- lookback_days: int = 60,
- weights: Optional[Dict[str, float]] = None,
- high_threshold: float = 70,
- medium_threshold: float = 40
- ):
- self.lookback_days = lookback_days
- self.weights = weights or {
- "price_impact": 0.30,
- "order_flow": 0.25,
- "liquidity_depth": 0.20,
- "volatility_efficiency": 0.15,
- "info_response": 0.10
- }
- self.high_threshold = high_threshold
- self.medium_threshold = medium_threshold
- def identify(
- self,
- price_data: pd.DataFrame,
- order_book_data: Optional[pd.DataFrame] = None,
- trade_data: Optional[pd.DataFrame] = None
- ) -> MesoEcosystem:
- """
- 识别中观生态结构健康度
- Args:
- price_data: 价格数据
- order_book_data: 订单簿数据(可选)
- trade_data: 成交数据(可选)
- Returns:
- MesoEcosystem: 中观生态识别结果
- """
- # 计算五个维度
- price_impact = self._calculate_price_impact(price_data, trade_data)
- order_flow = self._calculate_order_flow(price_data, order_book_data, trade_data)
- liquidity_depth = self._calculate_liquidity_depth(order_book_data, price_data)
- volatility_efficiency = self._calculate_volatility_efficiency(price_data)
- info_response = self._calculate_info_response(price_data)
- # 标准化到0-100分
- components = {
- "price_impact": self._normalize_price_impact(price_impact),
- "order_flow": self._normalize_order_flow(order_flow),
- "liquidity_depth": self._normalize_liquidity_depth(liquidity_depth),
- "volatility_efficiency": self._normalize_vol_efficiency(volatility_efficiency),
- "info_response": self._normalize_info_response(info_response)
- }
- # 计算加权总分
- health_score = sum(
- components[key] * self.weights[key]
- for key in components
- )
- # 确定健康等级
- if health_score >= self.high_threshold:
- health_level = HealthLevel.HIGH
- elif health_score >= self.medium_threshold:
- health_level = HealthLevel.MEDIUM
- else:
- health_level = HealthLevel.LOW
- return MesoEcosystem(
- health_score=health_score,
- health_level=health_level,
- price_impact=price_impact,
- order_flow=order_flow,
- liquidity_depth=liquidity_depth,
- volatility_efficiency=volatility_efficiency,
- info_response=info_response,
- components=components
- )
- def _calculate_price_impact(
- self,
- price_data: pd.DataFrame,
- trade_data: Optional[pd.DataFrame] = None
- ) -> float:
- """
- 计算价格冲击系数
- 基于Kyle的lambda:价格变化 / 交易量
- 值越小表示流动性越好
- """
- if trade_data is not None and not trade_data.empty:
- # 使用成交数据计算
- price_changes = price_data['close'].diff().abs()
- volumes = trade_data['volume'] if 'volume' in trade_data else price_data['volume']
- # 计算日内价格冲击
- impact = price_changes / np.sqrt(volumes)
- impact = impact.replace([np.inf, -np.inf], np.nan).dropna()
- if len(impact) > 0:
- return impact.iloc[-20:].mean()
- # 简化计算:使用价格波动率与成交量比率
- returns = price_data['close'].pct_change().abs()
- volume_ma = price_data['volume'].rolling(20).mean()
- impact = returns / np.log(volume_ma + 1)
- impact = impact.replace([np.inf, -np.inf], np.nan).dropna()
- return impact.iloc[-20:].mean() if len(impact) > 0 else 0.001
- def _calculate_order_flow(
- self,
- price_data: pd.DataFrame,
- order_book_data: Optional[pd.DataFrame] = None,
- trade_data: Optional[pd.DataFrame] = None
- ) -> float:
- """
- 计算订单流平衡
- 正值表示买盘占优,负值表示卖盘占优
- 使用买卖盘深度比或主动买卖比例
- """
- if order_book_data is not None and not order_book_data.empty:
- # 使用订单簿数据
- if 'bid_depth' in order_book_data and 'ask_depth' in order_book_data:
- bid_depth = order_book_data['bid_depth'].iloc[-1]
- ask_depth = order_book_data['ask_depth'].iloc[-1]
- if ask_depth > 0:
- return (bid_depth - ask_depth) / (bid_depth + ask_depth)
- # 使用价格-成交量关系估计
- # 上涨日成交量 vs 下跌日成交量
- returns = price_data['close'].pct_change()
- volumes = price_data['volume']
- up_volume = volumes[returns > 0].iloc[-20:].mean()
- down_volume = volumes[returns < 0].iloc[-20:].mean()
- if pd.isna(up_volume) or pd.isna(down_volume) or (up_volume + down_volume) == 0:
- return 0.0
- return (up_volume - down_volume) / (up_volume + down_volume)
- def _calculate_liquidity_depth(
- self,
- order_book_data: Optional[pd.DataFrame] = None,
- price_data: Optional[pd.DataFrame] = None
- ) -> float:
- """
- 计算流动性深度
- 单位价格变动所需的成交量
- """
- if order_book_data is not None and not order_book_data.empty:
- # 使用订单簿深度
- if 'bid_depth' in order_book_data:
- return order_book_data['bid_depth'].iloc[-20:].mean()
- # 使用Amihud非流动性指标倒数
- if price_data is not None:
- returns = price_data['close'].pct_change().abs()
- volumes = price_data['volume']
- illiquidity = returns / (volumes * price_data['close'])
- illiquidity = illiquidity.replace([np.inf, -np.inf], np.nan).dropna()
- if len(illiquidity) > 0:
- # 返回倒数,值越大流动性越好
- return 1.0 / (illiquidity.iloc[-20:].mean() + 1e-10)
- return 1.0
- def _calculate_volatility_efficiency(self, price_data: pd.DataFrame) -> float:
- """
- 计算波动率效率
- 实际波动率与信息到达率的比率
- 衡量波动率是否有效反映信息
- """
- # 计算已实现波动率
- returns = price_data['close'].pct_change().dropna()
- realized_vol = returns.iloc[-20:].std() * np.sqrt(252)
- # 计算信息到达率(使用成交量变化作为代理)
- volume_changes = price_data['volume'].pct_change().abs().iloc[-20:].mean()
- # 效率 = 波动率 / 信息到达率
- if volume_changes > 0:
- efficiency = realized_vol / (volume_changes + 0.01)
- else:
- efficiency = realized_vol
- return efficiency
- def _calculate_info_response(self, price_data: pd.DataFrame) -> float:
- """
- 计算信息冲击响应
- 价格对新信息的反应速度和程度
- 使用价格自相关性衡量
- """
- returns = price_data['close'].pct_change().dropna()
- if len(returns) < 20:
- return 0.5
- # 计算1阶和5阶自相关系数
- autocorr_1 = returns.iloc[-20:].autocorr(lag=1)
- autocorr_5 = returns.iloc[-20:].autocorr(lag=5)
- # 负自相关表示快速反转(信息快速被吸收)
- # 正自相关表示动量延续
- response = -(autocorr_1 + autocorr_5 / 5) / 2
- return response
- # 标准化方法
- def _normalize_price_impact(self, value: float) -> float:
- """标准化价格冲击系数 (0-100)"""
- # 值越小越好
- # 0.0001 = 优秀 (100分), 0.01 = 差 (0分)
- score = 100 * (1 - min(1, max(0, (value - 0.0001) / 0.0099)))
- return max(0, min(100, score))
- def _normalize_order_flow(self, value: float) -> float:
- """标准化订单流平衡 (0-100)"""
- # -1到1标准化到0-100,0为中性(50分)
- return 50 + value * 50
- def _normalize_liquidity_depth(self, value: float) -> float:
- """标准化流动性深度 (0-100)"""
- # 使用对数变换
- score = 50 + 50 * np.log(value + 1) / np.log(100)
- return max(0, min(100, score))
- def _normalize_vol_efficiency(self, value: float) -> float:
- """标准化波动率效率 (0-100)"""
- # 1-5之间为合理范围
- score = 100 - abs(value - 3) * 25
- return max(0, min(100, score))
- def _normalize_info_response(self, value: float) -> float:
- """标准化信息冲击响应 (0-100)"""
- # -0.5到0.5标准化到0-100
- score = 50 + value * 100
- return max(0, min(100, score))
|