""" 创业板50指数市场状态识别 (Regime Detection) 基于波动率和趋势强度识别不同市场状态 """ import pandas as pd import numpy as np from enum import Enum class RegimeType(Enum): """市场状态类型""" STRONG_BULL = "强趋势上涨" # 高波动+上涨趋势 WEAK_BULL = "弱趋势上涨" # 低波动+上涨趋势 STRONG_BEAR = "强趋势下跌" # 高波动+下跌趋势 WEAK_BEAR = "弱趋势下跌" # 低波动+下跌趋势 CONSOLIDATION = "震荡整理" # 无明显趋势 UNKNOWN = "未知" class RegimeDetector: """ 基于波动率和趋势的市场状态识别器 创业板50特性: - 成长风格, 波动率高于主板 - 趋势性强但反转快 - 适合波动率+趋势双因子识别 """ def __init__(self, vol_short=20, # 短期波动率窗口 vol_long=60, # 长期波动率窗口 trend_window=20, # 趋势判断窗口 vol_percentile=60, # 波动率分位数阈值 trend_threshold=0.05): # 趋势强度阈值 self.vol_short = vol_short self.vol_long = vol_long self.trend_window = trend_window self.vol_percentile = vol_percentile self.trend_threshold = trend_threshold def calculate_volatility(self, prices): """计算年化波动率""" returns = prices.pct_change().dropna() vol_short = returns.rolling(self.vol_short).std() * np.sqrt(252) vol_long = returns.rolling(self.vol_long).std() * np.sqrt(252) return vol_short, vol_long def calculate_trend(self, prices): """计算趋势强度和方向""" # 使用均线斜率判断趋势 ma = prices.rolling(self.trend_window).mean() # 价格相对均线的偏离 deviation = (prices - ma) / ma # 趋势强度: 斜率方向 + 持续性 trend_strength = deviation.rolling(self.trend_window).mean() return trend_strength def detect_regime(self, prices): """ 识别当前市场状态 返回: DataFrame with regime info """ df = pd.DataFrame(index=prices.index) df['close'] = prices # 计算波动率 vol_short, vol_long = self.calculate_volatility(prices) df['vol_short'] = vol_short df['vol_long'] = vol_long # 波动率分位数 (基于长期历史) df['vol_percentile'] = vol_short.rolling(252).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5 ) # 趋势强度 df['trend'] = self.calculate_trend(prices) # 趋势方向 (使用短期动量) df['momentum'] = prices.pct_change(self.trend_window) # 识别状态 df['regime'] = RegimeType.UNKNOWN.value # 高波动 high_vol = df['vol_percentile'] > self.vol_percentile / 100 # 强趋势 strong_trend_up = df['trend'] > self.trend_threshold strong_trend_down = df['trend'] < -self.trend_threshold # 强趋势上涨 (高波动+上涨) mask = high_vol & strong_trend_up df.loc[mask, 'regime'] = RegimeType.STRONG_BULL.value # 弱趋势上涨 (低波动+上涨) mask = (~high_vol) & strong_trend_up df.loc[mask, 'regime'] = RegimeType.WEAK_BULL.value # 强趋势下跌 (高波动+下跌) mask = high_vol & strong_trend_down df.loc[mask, 'regime'] = RegimeType.STRONG_BEAR.value # 弱趋势下跌 (低波动+下跌) mask = (~high_vol) & strong_trend_down df.loc[mask, 'regime'] = RegimeType.WEAK_BEAR.value # 震荡 (无明显趋势) mask = (~strong_trend_up) & (~strong_trend_down) df.loc[mask, 'regime'] = RegimeType.CONSOLIDATION.value return df def get_regime_stats(self, df): """统计各状态占比和表现""" stats = [] for regime in df['regime'].unique(): if pd.isna(regime): continue mask = df['regime'] == regime regime_data = df[mask] # 计算该状态下的收益统计 returns = regime_data['close'].pct_change().dropna() stats.append({ 'regime': regime, 'days': len(regime_data), 'pct': len(regime_data) / len(df) * 100, 'avg_return': returns.mean() * 100 if len(returns) > 0 else 0, 'volatility': returns.std() * np.sqrt(252) * 100 if len(returns) > 0 else 0, 'sharpe': (returns.mean() / returns.std() * np.sqrt(252)) if len(returns) > 0 and returns.std() > 0 else 0, 'max_return': returns.max() * 100 if len(returns) > 0 else 0, 'min_return': returns.min() * 100 if len(returns) > 0 else 0, }) return pd.DataFrame(stats) def analyze_chinext50_regimes(csv_path="chinext50.csv"): """分析创业板50的历史状态分布""" df = pd.read_csv(csv_path, parse_dates=['datetime'], index_col='datetime') detector = RegimeDetector( vol_short=20, vol_long=60, trend_window=20, vol_percentile=60, trend_threshold=0.03 # 创业板波动大,阈值放宽 ) regimes = detector.detect_regime(df['close']) stats = detector.get_regime_stats(regimes) print("=" * 60) print("创业板50指数市场状态分析") print("=" * 60) print(f"\n数据区间: {regimes.index[0].date()} 至 {regimes.index[-1].date()}") print(f"总交易日: {len(regimes)}") print("\n各状态分布:") print(stats.to_string(index=False)) # 保存结果 regimes.to_csv("regimes.csv") stats.to_csv("regime_stats.csv", index=False) print("\n详细数据已保存: regimes.csv") print("统计结果已保存: regime_stats.csv") return regimes, stats if __name__ == "__main__": analyze_chinext50_regimes()