""" 宏观生态识别器 识别市场的"季节":春季(复苏)、夏季(繁荣)、秋季(分化)、冬季(萧条) """ from dataclasses import dataclass from typing import Dict, List, Optional, Tuple from enum import Enum import numpy as np import pandas as pd class MacroRegime(Enum): """宏观生态类型""" SPRING = "spring" # 春季(复苏) SUMMER = "summer" # 夏季(繁荣) AUTUMN = "autumn" # 秋季(分化) WINTER = "winter" # 冬季(萧条) UNKNOWN = "unknown" # 未知/过渡 @dataclass class MacroEcosystem: """宏观生态数据结构""" regime: MacroRegime confidence: float volatility_trend: float # 波动率趋势 volume_trend: float # 成交量趋势 dispersion: float # 板块离散度 adx_value: float # ADX值 description: str class MacroEcosystemIdentifier: """ 宏观生态识别器 基于以下因子识别市场季节: 1. 波动率期限结构(近月波动率变化) 2. 板块离散度(成分股收益率标准差的历史分位) 3. 资金流向广度(涨跌家数比的熵值) 4. ADX趋势强度 """ def __init__( self, lookback_days: int = 252, spring_vol_rebound: float = 0.20, spring_volume_ratio: float = 1.2, summer_adx_threshold: float = 25, summer_adx_duration: int = 5, autumn_dispersion_pct: float = 70, winter_volume_pct: float = 50, winter_vol_compression: float = 20 ): self.lookback_days = lookback_days self.spring_vol_rebound = spring_vol_rebound self.spring_volume_ratio = spring_volume_ratio self.summer_adx_threshold = summer_adx_threshold self.summer_adx_duration = summer_adx_duration self.autumn_dispersion_pct = autumn_dispersion_pct self.winter_volume_pct = winter_volume_pct self.winter_vol_compression = winter_vol_compression def identify( self, price_data: pd.DataFrame, sector_data: Optional[pd.DataFrame] = None ) -> MacroEcosystem: """ 识别当前宏观生态 Args: price_data: 价格数据,包含 ['open', 'high', 'low', 'close', 'volume'] sector_data: 板块数据(可选),成分股收益率数据 Returns: MacroEcosystem: 宏观生态识别结果 """ # 计算技术指标 volatility = self._calculate_volatility(price_data) volume_ma5 = price_data['volume'].rolling(5).mean().iloc[-1] volume_ma20 = price_data['volume'].rolling(20).mean().iloc[-1] volume_ratio = volume_ma5 / volume_ma20 if volume_ma20 > 0 else 1.0 adx_value = self._calculate_adx(price_data) dispersion = self._calculate_dispersion(sector_data) if sector_data is not None else 50.0 # 计算历史分位数 vol_percentile = self._get_percentile(volatility, price_data) vol_low_30 = np.percentile(volatility.iloc[-60:], 30) vol_rebound = (volatility.iloc[-1] - vol_low_30) / vol_low_30 if vol_low_30 > 0 else 0 # 四季识别逻辑 regime_scores = {} # 春季:波动率从低位回升,成交量温和放大 spring_score = self._score_spring(vol_rebound, volume_ratio, dispersion) regime_scores[MacroRegime.SPRING] = spring_score # 夏季:趋势明确,波动率稳定,情绪高涨 summer_score = self._score_summer(adx_value, volatility, price_data, dispersion) regime_scores[MacroRegime.SUMMER] = summer_score # 秋季:板块离散度大,轮动加速 autumn_score = self._score_autumn(dispersion, volatility, price_data) regime_scores[MacroRegime.AUTUMN] = autumn_score # 冬季:成交量萎缩,波动率压缩 winter_score = self._score_winter(volume_ratio, vol_percentile, adx_value) regime_scores[MacroRegime.WINTER] = winter_score # 选择最高分的生态 best_regime = max(regime_scores, key=regime_scores.get) best_score = regime_scores[best_regime] # 如果最高分低于阈值,标记为未知 if best_score < 0.3: best_regime = MacroRegime.UNKNOWN best_score = 0.0 return MacroEcosystem( regime=best_regime, confidence=best_score, volatility_trend=vol_rebound, volume_trend=volume_ratio - 1.0, dispersion=dispersion, adx_value=adx_value, description=self._get_description(best_regime) ) def _calculate_volatility(self, data: pd.DataFrame, period: int = 20) -> pd.Series: """计算波动率(20日收益率标准差年化)""" returns = data['close'].pct_change() volatility = returns.rolling(period).std() * np.sqrt(252) return volatility def _calculate_adx(self, data: pd.DataFrame, period: int = 14) -> float: """计算ADX(平均趋向指数)""" high = data['high'] low = data['low'] close = data['close'] # +DM和-DM plus_dm = high.diff() minus_dm = -low.diff() plus_dm[plus_dm < 0] = 0 minus_dm[minus_dm < 0] = 0 # TR(真实波幅) tr1 = high - low tr2 = abs(high - close.shift(1)) tr3 = abs(low - close.shift(1)) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) # ATR atr = tr.rolling(period).mean() # +DI和-DI plus_di = 100 * (plus_dm.rolling(period).mean() / atr) minus_di = 100 * (minus_dm.rolling(period).mean() / atr) # DX和ADX dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di) adx = dx.rolling(period).mean() return adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0 def _calculate_dispersion(self, sector_data: pd.DataFrame) -> float: """计算板块离散度(成分股收益率标准差的历史分位)""" if sector_data is None or sector_data.empty: return 50.0 # 计算各成分股近期收益率 returns = sector_data.pct_change().iloc[-20:] cross_sectional_std = returns.std(axis=1).mean() # 转换为历史分位数(简化处理) historical_std = returns.std(axis=1).rolling(60).mean() if len(historical_std) > 0 and not pd.isna(historical_std.iloc[-1]): percentile = 50 + (cross_sectional_std - historical_std.mean()) / historical_std.std() * 25 return max(0, min(100, percentile)) return 50.0 def _get_percentile(self, series: pd.Series, data: pd.DataFrame) -> float: """获取当前值的历史分位数""" if len(series) < 60: return 50.0 current = series.iloc[-1] historical = series.iloc[-252:-1].dropna() if len(historical) == 0: return 50.0 return (historical < current).mean() * 100 def _score_spring(self, vol_rebound: float, volume_ratio: float, dispersion: float) -> float: """评分:春季(复苏)""" # 波动率从低位回升 vol_score = min(1.0, vol_rebound / self.spring_vol_rebound) # 成交量放大 volume_score = min(1.0, max(0, (volume_ratio - 1.0) / (self.spring_volume_ratio - 1.0))) # 板块离散度中等(30-60分位) dispersion_score = 1.0 - abs(dispersion - 45) / 15 if 30 <= dispersion <= 60 else 0.0 return vol_score * 0.4 + volume_score * 0.4 + dispersion_score * 0.2 def _score_summer(self, adx: float, volatility: pd.Series, data: pd.DataFrame, dispersion: float) -> float: """评分:夏季(繁荣)""" # ADX高于阈值 adx_score = min(1.0, adx / self.summer_adx_threshold) if adx > 20 else 0.0 # 波动率稳定(历史40-70分位) vol_percentile = self._get_percentile(volatility, data) vol_score = 1.0 - abs(vol_percentile - 55) / 15 if 40 <= vol_percentile <= 70 else 0.0 # 板块离散度低(同涨同跌) dispersion_score = 1.0 - dispersion / 40 if dispersion < 40 else 0.0 return adx_score * 0.5 + vol_score * 0.3 + dispersion_score * 0.2 def _score_autumn(self, dispersion: float, volatility: pd.Series, data: pd.DataFrame) -> float: """评分:秋季(分化)""" # 板块离散度高 dispersion_score = min(1.0, (dispersion - 50) / 30) if dispersion > 50 else 0.0 # 波动率上升 vol_recent = volatility.iloc[-5:].mean() vol_prev = volatility.iloc[-20:-5].mean() vol_increase = (vol_recent - vol_prev) / vol_prev if vol_prev > 0 else 0 vol_score = min(1.0, vol_increase / 0.15) return dispersion_score * 0.6 + vol_score * 0.4 def _score_winter(self, volume_ratio: float, vol_percentile: float, adx: float) -> float: """评分:冬季(萧条)""" # 成交量萎缩 volume_score = 1.0 - min(1.0, volume_ratio) if volume_ratio < 1.0 else 0.0 # 波动率压缩至历史低位 vol_score = 1.0 - vol_percentile / self.winter_vol_compression if vol_percentile < self.winter_vol_compression else 0.0 # 无明确趋势 adx_score = 1.0 - min(1.0, adx / 20) if adx < 25 else 0.0 return volume_score * 0.3 + vol_score * 0.4 + adx_score * 0.3 def _get_description(self, regime: MacroRegime) -> str: """获取生态描述""" descriptions = { MacroRegime.SPRING: "春季复苏:波动率回升,成交温和放大,机构左侧布局", MacroRegime.SUMMER: "夏季繁荣:趋势明确,情绪高涨,散户机构共振", MacroRegime.AUTUMN: "秋季分化:板块离散,轮动加速,量化游资主导", MacroRegime.WINTER: "冬季萧条:成交萎缩,波动压缩,空头主导", MacroRegime.UNKNOWN: "过渡状态:生态不明,建议观望" } return descriptions.get(regime, "未知状态")