| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- """
- 宏观生态识别器
- 识别市场的"季节":春季(复苏)、夏季(繁荣)、秋季(分化)、冬季(萧条)
- """
- from dataclasses import dataclass
- from typing import Dict, List, Optional, Tuple
- from enum import Enum
- import numpy as np
- import pandas as pd
- class MacroRegime(Enum):
- """宏观生态类型"""
- SPRING = "spring" # 春季(复苏)
- SUMMER = "summer" # 夏季(繁荣)
- AUTUMN = "autumn" # 秋季(分化)
- WINTER = "winter" # 冬季(萧条)
- UNKNOWN = "unknown" # 未知/过渡
- @dataclass
- class MacroEcosystem:
- """宏观生态数据结构"""
- regime: MacroRegime
- confidence: float
- volatility_trend: float # 波动率趋势
- volume_trend: float # 成交量趋势
- dispersion: float # 板块离散度
- adx_value: float # ADX值
- description: str
- class MacroEcosystemIdentifier:
- """
- 宏观生态识别器
- 基于以下因子识别市场季节:
- 1. 波动率期限结构(近月波动率变化)
- 2. 板块离散度(成分股收益率标准差的历史分位)
- 3. 资金流向广度(涨跌家数比的熵值)
- 4. ADX趋势强度
- """
- def __init__(
- self,
- lookback_days: int = 252,
- spring_vol_rebound: float = 0.20,
- spring_volume_ratio: float = 1.2,
- summer_adx_threshold: float = 25,
- summer_adx_duration: int = 5,
- autumn_dispersion_pct: float = 70,
- winter_volume_pct: float = 50,
- winter_vol_compression: float = 20
- ):
- self.lookback_days = lookback_days
- self.spring_vol_rebound = spring_vol_rebound
- self.spring_volume_ratio = spring_volume_ratio
- self.summer_adx_threshold = summer_adx_threshold
- self.summer_adx_duration = summer_adx_duration
- self.autumn_dispersion_pct = autumn_dispersion_pct
- self.winter_volume_pct = winter_volume_pct
- self.winter_vol_compression = winter_vol_compression
- def identify(
- self,
- price_data: pd.DataFrame,
- sector_data: Optional[pd.DataFrame] = None
- ) -> MacroEcosystem:
- """
- 识别当前宏观生态
- Args:
- price_data: 价格数据,包含 ['open', 'high', 'low', 'close', 'volume']
- sector_data: 板块数据(可选),成分股收益率数据
- Returns:
- MacroEcosystem: 宏观生态识别结果
- """
- # 计算技术指标
- volatility = self._calculate_volatility(price_data)
- volume_ma5 = price_data['volume'].rolling(5).mean().iloc[-1]
- volume_ma20 = price_data['volume'].rolling(20).mean().iloc[-1]
- volume_ratio = volume_ma5 / volume_ma20 if volume_ma20 > 0 else 1.0
- adx_value = self._calculate_adx(price_data)
- dispersion = self._calculate_dispersion(sector_data) if sector_data is not None else 50.0
- # 计算历史分位数
- vol_percentile = self._get_percentile(volatility, price_data)
- vol_low_30 = np.percentile(volatility.iloc[-60:], 30)
- vol_rebound = (volatility.iloc[-1] - vol_low_30) / vol_low_30 if vol_low_30 > 0 else 0
- # 四季识别逻辑
- regime_scores = {}
- # 春季:波动率从低位回升,成交量温和放大
- spring_score = self._score_spring(vol_rebound, volume_ratio, dispersion)
- regime_scores[MacroRegime.SPRING] = spring_score
- # 夏季:趋势明确,波动率稳定,情绪高涨
- summer_score = self._score_summer(adx_value, volatility, price_data, dispersion)
- regime_scores[MacroRegime.SUMMER] = summer_score
- # 秋季:板块离散度大,轮动加速
- autumn_score = self._score_autumn(dispersion, volatility, price_data)
- regime_scores[MacroRegime.AUTUMN] = autumn_score
- # 冬季:成交量萎缩,波动率压缩
- winter_score = self._score_winter(volume_ratio, vol_percentile, adx_value)
- regime_scores[MacroRegime.WINTER] = winter_score
- # 选择最高分的生态
- best_regime = max(regime_scores, key=regime_scores.get)
- best_score = regime_scores[best_regime]
- # 如果最高分低于阈值,标记为未知
- if best_score < 0.3:
- best_regime = MacroRegime.UNKNOWN
- best_score = 0.0
- return MacroEcosystem(
- regime=best_regime,
- confidence=best_score,
- volatility_trend=vol_rebound,
- volume_trend=volume_ratio - 1.0,
- dispersion=dispersion,
- adx_value=adx_value,
- description=self._get_description(best_regime)
- )
- def _calculate_volatility(self, data: pd.DataFrame, period: int = 20) -> pd.Series:
- """计算波动率(20日收益率标准差年化)"""
- returns = data['close'].pct_change()
- volatility = returns.rolling(period).std() * np.sqrt(252)
- return volatility
- def _calculate_adx(self, data: pd.DataFrame, period: int = 14) -> float:
- """计算ADX(平均趋向指数)"""
- high = data['high']
- low = data['low']
- close = data['close']
- # +DM和-DM
- plus_dm = high.diff()
- minus_dm = -low.diff()
- plus_dm[plus_dm < 0] = 0
- minus_dm[minus_dm < 0] = 0
- # TR(真实波幅)
- tr1 = high - low
- tr2 = abs(high - close.shift(1))
- tr3 = abs(low - close.shift(1))
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
- # ATR
- atr = tr.rolling(period).mean()
- # +DI和-DI
- plus_di = 100 * (plus_dm.rolling(period).mean() / atr)
- minus_di = 100 * (minus_dm.rolling(period).mean() / atr)
- # DX和ADX
- dx = 100 * abs(plus_di - minus_di) / (plus_di + minus_di)
- adx = dx.rolling(period).mean()
- return adx.iloc[-1] if not pd.isna(adx.iloc[-1]) else 20.0
- def _calculate_dispersion(self, sector_data: pd.DataFrame) -> float:
- """计算板块离散度(成分股收益率标准差的历史分位)"""
- if sector_data is None or sector_data.empty:
- return 50.0
- # 计算各成分股近期收益率
- returns = sector_data.pct_change().iloc[-20:]
- cross_sectional_std = returns.std(axis=1).mean()
- # 转换为历史分位数(简化处理)
- historical_std = returns.std(axis=1).rolling(60).mean()
- if len(historical_std) > 0 and not pd.isna(historical_std.iloc[-1]):
- percentile = 50 + (cross_sectional_std - historical_std.mean()) / historical_std.std() * 25
- return max(0, min(100, percentile))
- return 50.0
- def _get_percentile(self, series: pd.Series, data: pd.DataFrame) -> float:
- """获取当前值的历史分位数"""
- if len(series) < 60:
- return 50.0
- current = series.iloc[-1]
- historical = series.iloc[-252:-1].dropna()
- if len(historical) == 0:
- return 50.0
- return (historical < current).mean() * 100
- def _score_spring(self, vol_rebound: float, volume_ratio: float, dispersion: float) -> float:
- """评分:春季(复苏)"""
- # 波动率从低位回升
- vol_score = min(1.0, vol_rebound / self.spring_vol_rebound)
- # 成交量放大
- volume_score = min(1.0, max(0, (volume_ratio - 1.0) / (self.spring_volume_ratio - 1.0)))
- # 板块离散度中等(30-60分位)
- dispersion_score = 1.0 - abs(dispersion - 45) / 15 if 30 <= dispersion <= 60 else 0.0
- return vol_score * 0.4 + volume_score * 0.4 + dispersion_score * 0.2
- def _score_summer(self, adx: float, volatility: pd.Series, data: pd.DataFrame, dispersion: float) -> float:
- """评分:夏季(繁荣)"""
- # ADX高于阈值
- adx_score = min(1.0, adx / self.summer_adx_threshold) if adx > 20 else 0.0
- # 波动率稳定(历史40-70分位)
- vol_percentile = self._get_percentile(volatility, data)
- vol_score = 1.0 - abs(vol_percentile - 55) / 15 if 40 <= vol_percentile <= 70 else 0.0
- # 板块离散度低(同涨同跌)
- dispersion_score = 1.0 - dispersion / 40 if dispersion < 40 else 0.0
- return adx_score * 0.5 + vol_score * 0.3 + dispersion_score * 0.2
- def _score_autumn(self, dispersion: float, volatility: pd.Series, data: pd.DataFrame) -> float:
- """评分:秋季(分化)"""
- # 板块离散度高
- dispersion_score = min(1.0, (dispersion - 50) / 30) if dispersion > 50 else 0.0
- # 波动率上升
- vol_recent = volatility.iloc[-5:].mean()
- vol_prev = volatility.iloc[-20:-5].mean()
- vol_increase = (vol_recent - vol_prev) / vol_prev if vol_prev > 0 else 0
- vol_score = min(1.0, vol_increase / 0.15)
- return dispersion_score * 0.6 + vol_score * 0.4
- def _score_winter(self, volume_ratio: float, vol_percentile: float, adx: float) -> float:
- """评分:冬季(萧条)"""
- # 成交量萎缩
- volume_score = 1.0 - min(1.0, volume_ratio) if volume_ratio < 1.0 else 0.0
- # 波动率压缩至历史低位
- vol_score = 1.0 - vol_percentile / self.winter_vol_compression if vol_percentile < self.winter_vol_compression else 0.0
- # 无明确趋势
- adx_score = 1.0 - min(1.0, adx / 20) if adx < 25 else 0.0
- return volume_score * 0.3 + vol_score * 0.4 + adx_score * 0.3
- def _get_description(self, regime: MacroRegime) -> str:
- """获取生态描述"""
- descriptions = {
- MacroRegime.SPRING: "春季复苏:波动率回升,成交温和放大,机构左侧布局",
- MacroRegime.SUMMER: "夏季繁荣:趋势明确,情绪高涨,散户机构共振",
- MacroRegime.AUTUMN: "秋季分化:板块离散,轮动加速,量化游资主导",
- MacroRegime.WINTER: "冬季萧条:成交萎缩,波动压缩,空头主导",
- MacroRegime.UNKNOWN: "过渡状态:生态不明,建议观望"
- }
- return descriptions.get(regime, "未知状态")
|