| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- """
- 创业板50指数市场状态识别 (Regime Detection)
- 基于波动率和趋势强度识别不同市场状态
- """
- import pandas as pd
- import numpy as np
- from enum import Enum
- class RegimeType(Enum):
- """市场状态类型"""
- STRONG_BULL = "强趋势上涨" # 高波动+上涨趋势
- WEAK_BULL = "弱趋势上涨" # 低波动+上涨趋势
- STRONG_BEAR = "强趋势下跌" # 高波动+下跌趋势
- WEAK_BEAR = "弱趋势下跌" # 低波动+下跌趋势
- CONSOLIDATION = "震荡整理" # 无明显趋势
- UNKNOWN = "未知"
- class RegimeDetector:
- """
- 基于波动率和趋势的市场状态识别器
-
- 创业板50特性:
- - 成长风格, 波动率高于主板
- - 趋势性强但反转快
- - 适合波动率+趋势双因子识别
- """
-
- def __init__(self,
- vol_short=20, # 短期波动率窗口
- vol_long=60, # 长期波动率窗口
- trend_window=20, # 趋势判断窗口
- vol_percentile=60, # 波动率分位数阈值
- trend_threshold=0.05): # 趋势强度阈值
- self.vol_short = vol_short
- self.vol_long = vol_long
- self.trend_window = trend_window
- self.vol_percentile = vol_percentile
- self.trend_threshold = trend_threshold
-
- def calculate_volatility(self, prices):
- """计算年化波动率"""
- returns = prices.pct_change().dropna()
- vol_short = returns.rolling(self.vol_short).std() * np.sqrt(252)
- vol_long = returns.rolling(self.vol_long).std() * np.sqrt(252)
- return vol_short, vol_long
-
- def calculate_trend(self, prices):
- """计算趋势强度和方向"""
- # 使用均线斜率判断趋势
- ma = prices.rolling(self.trend_window).mean()
- # 价格相对均线的偏离
- deviation = (prices - ma) / ma
- # 趋势强度: 斜率方向 + 持续性
- trend_strength = deviation.rolling(self.trend_window).mean()
- return trend_strength
-
- def detect_regime(self, prices):
- """
- 识别当前市场状态
-
- 返回: DataFrame with regime info
- """
- df = pd.DataFrame(index=prices.index)
- df['close'] = prices
-
- # 计算波动率
- vol_short, vol_long = self.calculate_volatility(prices)
- df['vol_short'] = vol_short
- df['vol_long'] = vol_long
-
- # 波动率分位数 (基于长期历史)
- df['vol_percentile'] = vol_short.rolling(252).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
- )
-
- # 趋势强度
- df['trend'] = self.calculate_trend(prices)
-
- # 趋势方向 (使用短期动量)
- df['momentum'] = prices.pct_change(self.trend_window)
-
- # 识别状态
- df['regime'] = RegimeType.UNKNOWN.value
-
- # 高波动
- high_vol = df['vol_percentile'] > self.vol_percentile / 100
- # 强趋势
- strong_trend_up = df['trend'] > self.trend_threshold
- strong_trend_down = df['trend'] < -self.trend_threshold
-
- # 强趋势上涨 (高波动+上涨)
- mask = high_vol & strong_trend_up
- df.loc[mask, 'regime'] = RegimeType.STRONG_BULL.value
-
- # 弱趋势上涨 (低波动+上涨)
- mask = (~high_vol) & strong_trend_up
- df.loc[mask, 'regime'] = RegimeType.WEAK_BULL.value
-
- # 强趋势下跌 (高波动+下跌)
- mask = high_vol & strong_trend_down
- df.loc[mask, 'regime'] = RegimeType.STRONG_BEAR.value
-
- # 弱趋势下跌 (低波动+下跌)
- mask = (~high_vol) & strong_trend_down
- df.loc[mask, 'regime'] = RegimeType.WEAK_BEAR.value
-
- # 震荡 (无明显趋势)
- mask = (~strong_trend_up) & (~strong_trend_down)
- df.loc[mask, 'regime'] = RegimeType.CONSOLIDATION.value
-
- return df
-
- def get_regime_stats(self, df):
- """统计各状态占比和表现"""
- stats = []
- for regime in df['regime'].unique():
- if pd.isna(regime):
- continue
- mask = df['regime'] == regime
- regime_data = df[mask]
-
- # 计算该状态下的收益统计
- returns = regime_data['close'].pct_change().dropna()
-
- stats.append({
- 'regime': regime,
- 'days': len(regime_data),
- 'pct': len(regime_data) / len(df) * 100,
- 'avg_return': returns.mean() * 100 if len(returns) > 0 else 0,
- 'volatility': returns.std() * np.sqrt(252) * 100 if len(returns) > 0 else 0,
- 'sharpe': (returns.mean() / returns.std() * np.sqrt(252)) if len(returns) > 0 and returns.std() > 0 else 0,
- 'max_return': returns.max() * 100 if len(returns) > 0 else 0,
- 'min_return': returns.min() * 100 if len(returns) > 0 else 0,
- })
-
- return pd.DataFrame(stats)
- def analyze_chinext50_regimes(csv_path="chinext50.csv"):
- """分析创业板50的历史状态分布"""
- df = pd.read_csv(csv_path, parse_dates=['datetime'], index_col='datetime')
-
- detector = RegimeDetector(
- vol_short=20,
- vol_long=60,
- trend_window=20,
- vol_percentile=60,
- trend_threshold=0.03 # 创业板波动大,阈值放宽
- )
-
- regimes = detector.detect_regime(df['close'])
- stats = detector.get_regime_stats(regimes)
-
- print("=" * 60)
- print("创业板50指数市场状态分析")
- print("=" * 60)
- print(f"\n数据区间: {regimes.index[0].date()} 至 {regimes.index[-1].date()}")
- print(f"总交易日: {len(regimes)}")
- print("\n各状态分布:")
- print(stats.to_string(index=False))
-
- # 保存结果
- regimes.to_csv("regimes.csv")
- stats.to_csv("regime_stats.csv", index=False)
- print("\n详细数据已保存: regimes.csv")
- print("统计结果已保存: regime_stats.csv")
-
- return regimes, stats
- if __name__ == "__main__":
- analyze_chinext50_regimes()
|