#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 市场环境识别器 (Market Regime Identifier) 基于HMM隐马尔可夫模型的市场状态识别系统 状态定义: - 状态0(震荡):价格波动大但无明显方向,Hurst指数≈0.5,自相关性低 - 状态1(趋势):价格持续单向运动,Hurst指数>0.6,高自相关 - 状态2(反转):超买/超卖后的V型反转,RSI极端值后的快速回归 作者: OpenClaw 日期: 2026-03-06 """ import numpy as np import pandas as pd from hmmlearn.hmm import GaussianHMM from scipy import stats import warnings warnings.filterwarnings('ignore') # ==================== 特征工程 ==================== def calculate_hurst(prices, max_lag=100): """ 计算Hurst指数 H ≈ 0.5: 随机游走(震荡) H > 0.6: 趋势性 H < 0.4: 均值回归 """ lags = range(2, min(max_lag, len(prices)//4)) tau = [np.std(np.subtract(prices[lag:], prices[:-lag])) for lag in lags] if len(tau) < 2 or any(t <= 0 for t in tau): return 0.5 reg = np.polyfit(np.log(lags), np.log(tau), 1) return reg[0] def calculate_rsi(prices, period=14): """计算RSI指标""" deltas = np.diff(prices) gains = np.where(deltas > 0, deltas, 0) losses = np.where(deltas < 0, -deltas, 0) avg_gains = np.convolve(gains, np.ones(period)/period, mode='valid') avg_losses = np.convolve(losses, np.ones(period)/period, mode='valid') rs = avg_gains / (avg_losses + 1e-10) rsi = 100 - (100 / (1 + rs)) # 补齐长度 padding = np.full(period, 50) return np.concatenate([padding, rsi]) def extract_features(df): """ 提取特征向量 X_t X_t = [收益率标准差(5日), 价格动量(10日), 波动率比率(短/长), 成交量变化率, 日内趋势强度] """ features = pd.DataFrame(index=df.index) # 1. 收益率标准差(5日) returns = df['close'].pct_change() features['ret_std_5'] = returns.rolling(5).std() * np.sqrt(252) # 2. 价格动量(10日) features['momentum_10'] = (df['close'] / df['close'].shift(10) - 1) * 100 # 3. 波动率比率(短/长) vol_short = returns.rolling(5).std() vol_long = returns.rolling(20).std() features['vol_ratio'] = vol_short / (vol_long + 1e-10) # 4. 成交量变化率 features['volume_change'] = df['volume'].pct_change() * 100 # 5. 日内趋势强度 features['intraday_trend'] = ((df['close'] - df['open']) / (df['high'] - df['low'] + 1e-10)) * 100 # 6. Hurst指数(额外特征) features['hurst'] = df['close'].rolling(100).apply(calculate_hurst, raw=True) # 7. RSI features['rsi'] = calculate_rsi(df['close'].values) # 8. 自相关性 features['autocorr'] = returns.rolling(20).apply(lambda x: x.autocorr(lag=1) if len(x) > 1 else 0) # 填充缺失值 features = features.ffill().fillna(0) return features # ==================== HMM模型 ==================== class MarketRegimeHMM: """市场环境HMM模型""" # 状态名称 STATE_NAMES = { 0: '震荡', 1: '趋势', 2: '反转' } # 先验转移概率矩阵 PRIOR_TRANSITION = np.array([ [0.85, 0.10, 0.05], # 震荡 -> 震荡/趋势/反转 [0.15, 0.80, 0.05], # 趋势 -> 震荡/趋势/反转 [0.20, 0.10, 0.70] # 反转 -> 震荡/趋势/反转 ]) def __init__(self, n_components=3, n_iter=100): self.model = GaussianHMM( n_components=n_components, covariance_type='full', n_iter=n_iter, random_state=42 ) self.is_fitted = False def fit(self, features): """训练HMM模型""" print("训练HMM模型...") # 使用先验转移概率初始化 self.model.transmat_ = self.PRIOR_TRANSITION # 拟合模型 X = features.values self.model.fit(X) self.is_fitted = True print(f"模型收敛: {self.model.monitor_.converged}") print(f"迭代次数: {self.model.n_iter}") print("\n学习到的转移概率矩阵:") print(self.model.transmat_.round(3)) return self def predict(self, features): """预测状态序列""" if not self.is_fitted: raise ValueError("模型尚未训练,请先调用fit()") X = features.values states = self.model.predict(X) # 计算状态概率 state_probs = self.model.predict_proba(X) return states, state_probs def get_current_regime(self, features): """获取当前市场状态""" states, probs = self.predict(features) current_state = states[-1] current_prob = probs[-1] return { 'state': current_state, 'state_name': self.STATE_NAMES[current_state], 'probabilities': { self.STATE_NAMES[i]: current_prob[i] for i in range(len(self.STATE_NAMES)) }, 'confidence': current_prob[current_state] } # ==================== 策略切换逻辑 ==================== class StrategySelector: """基于市场状态的策略选择器""" STRATEGY_CONFIG = { 0: { # 震荡 'name': '均值回归', 'action': 'RSI超买超卖交易', 'position_size': 0.5, # 降低仓位 'stop_loss': '2N', 'description': '关闭趋势策略,使用RSI超买(>70)超卖(<30)信号' }, 1: { # 趋势 'name': '海龟趋势', 'action': '全速运行', 'position_size': 1.0, # 全仓位 'stop_loss': '2N', 'description': '增加仓位,突破20日高低点交易' }, 2: { # 反转 'name': '反向/观望', 'action': '反向信号或空仓', 'position_size': 0.3, # 最小仓位 'stop_loss': '1N', # 收紧止损 'description': '反向信号或观望,收紧止损' } } @classmethod def get_strategy(cls, state): """根据状态获取策略配置""" return cls.STRATEGY_CONFIG.get(state, cls.STRATEGY_CONFIG[0]) @classmethod def generate_signal(cls, state, rsi_value, price, ma20): """生成交易信号""" strategy = cls.get_strategy(state) signal = { 'state': state, 'strategy': strategy['name'], 'position_size': strategy['position_size'], 'action': 'HOLD' } if state == 0: # 震荡 - RSI均值回归 if rsi_value < 30: signal['action'] = 'BUY' signal['reason'] = 'RSI超卖' elif rsi_value > 70: signal['action'] = 'SELL' signal['reason'] = 'RSI超买' elif state == 1: # 趋势 - 突破系统 if price > ma20 * 1.02: signal['action'] = 'BUY' signal['reason'] = '突破20日均线2%' elif price < ma20 * 0.98: signal['action'] = 'SELL' signal['reason'] = '跌破20日均线2%' elif state == 2: # 反转 - 反向或观望 if rsi_value > 70: signal['action'] = 'SELL' signal['reason'] = '超买后反转' elif rsi_value < 30: signal['action'] = 'BUY' signal['reason'] = '超卖后反转' else: signal['action'] = 'HOLD' signal['reason'] = '观望' return signal # ==================== 模型评估 ==================== def evaluate_model(hmm, features, true_states=None): """ 评估模型性能 由于真实状态未知,使用以下指标: 1. 对数似然值 2. AIC/BIC 3. 状态持续时间合理性 4. 状态与价格行为的对应关系 """ X = features.values # 计算对数似然 log_likelihood = hmm.model.score(X) # 计算AIC和BIC n_params = hmm.model.n_components * (hmm.model.n_features + hmm.model.n_features * (hmm.model.n_features + 1) / 2) + hmm.model.n_components * hmm.model.n_components n_samples = len(X) aic = -2 * log_likelihood + 2 * n_params bic = -2 * log_likelihood + n_params * np.log(n_samples) print(f"\n模型评估指标:") print(f"对数似然: {log_likelihood:.2f}") print(f"AIC: {aic:.2f}") print(f"BIC: {bic:.2f}") # 预测状态 states, probs = hmm.predict(features) # 统计状态分布 state_counts = pd.Series(states).value_counts().sort_index() state_pct = (state_counts / len(states) * 100).round(2) print(f"\n状态分布:") for state_id, state_name in hmm.STATE_NAMES.items(): count = state_counts.get(state_id, 0) pct = state_pct.get(state_id, 0) print(f" {state_name}: {count}天 ({pct}%)") # 计算平均状态持续时间 state_durations = [] current_state = states[0] duration = 1 for s in states[1:]: if s == current_state: duration += 1 else: state_durations.append((current_state, duration)) current_state = s duration = 1 state_durations.append((current_state, duration)) print(f"\n平均状态持续时间:") for state_id in range(3): durations = [d for s, d in state_durations if s == state_id] if durations: avg_duration = np.mean(durations) print(f" {hmm.STATE_NAMES[state_id]}: {avg_duration:.1f}天") return { 'log_likelihood': log_likelihood, 'aic': aic, 'bic': bic, 'state_distribution': state_counts.to_dict(), 'states': states, 'state_probs': probs } # ==================== 主程序 ==================== def main(): """主程序""" print("="*70) print("市场环境识别器 (Market Regime Identifier)") print("基于HMM隐马尔可夫模型") print("="*70) # 示例:使用随机数据演示 print("\n注意:这是演示版本,请使用真实数据运行") print("数据格式要求:DataFrame包含 'open', 'high', 'low', 'close', 'volume' 列") # 生成示例数据 np.random.seed(42) n_days = 500 dates = pd.date_range('2023-01-01', periods=n_days, freq='B') # 模拟价格走势(包含趋势、震荡、反转三种状态) price = 100 prices = [] for i in range(n_days): # 模拟不同状态 if i < 150: # 趋势 price *= (1 + np.random.normal(0.001, 0.01)) elif i < 300: # 震荡 price *= (1 + np.random.normal(0, 0.015)) else: # 反转 if i < 375: price *= (1 + np.random.normal(-0.002, 0.012)) else: price *= (1 + np.random.normal(0.002, 0.012)) prices.append(price) df = pd.DataFrame({ 'open': prices + np.random.normal(0, 0.5, n_days), 'high': np.array(prices) + np.abs(np.random.normal(1, 0.5, n_days)), 'low': np.array(prices) - np.abs(np.random.normal(1, 0.5, n_days)), 'close': prices, 'volume': np.random.randint(1000000, 5000000, n_days) }, index=dates) print(f"\n示例数据: {len(df)}天") print(f"日期范围: {df.index[0].date()} ~ {df.index[-1].date()}") # 特征提取 print("\n提取特征...") features = extract_features(df) # 选择训练特征(核心5个) feature_cols = ['ret_std_5', 'momentum_10', 'vol_ratio', 'volume_change', 'intraday_trend'] X_train = features[feature_cols].dropna() print(f"特征矩阵: {X_train.shape}") # 训练HMM模型 hmm = MarketRegimeHMM(n_components=3, n_iter=100) hmm.fit(X_train) # 预测状态 states, probs = hmm.predict(X_train) # 评估模型 eval_results = evaluate_model(hmm, X_train) # 获取当前状态 current_regime = hmm.get_current_regime(X_train) print("\n" + "="*70) print("当前市场状态识别") print("="*70) print(f"状态: {current_regime['state_name']} (状态{current_regime['state']})") print(f"置信度: {current_regime['confidence']:.2%}") print("\n状态概率分布:") for name, prob in current_regime['probabilities'].items(): bar = '█' * int(prob * 20) print(f" {name:6s}: {prob:.2%} {bar}") # 策略建议 strategy = StrategySelector.get_strategy(current_regime['state']) current_rsi = features['rsi'].iloc[-1] current_price = df['close'].iloc[-1] current_ma20 = df['close'].rolling(20).mean().iloc[-1] signal = StrategySelector.generate_signal( current_regime['state'], current_rsi, current_price, current_ma20 ) print("\n" + "="*70) print("策略建议") print("="*70) print(f"推荐策略: {strategy['name']}") print(f"操作策略: {strategy['action']}") print(f"仓位建议: {strategy['position_size']*100:.0f}%") print(f"止损设置: {strategy['stop_loss']}") print(f"描述: {strategy['description']}") print("\n交易信号:") print(f" 动作: {signal['action']}") if 'reason' in signal: print(f" 原因: {signal['reason']}") print("\n" + "="*70) print("使用说明:") print("="*70) print("1. 准备真实市场数据(2017-2025年)") print("2. 调用 extract_features(df) 提取特征") print("3. 使用 MarketRegimeHMM 训练模型") print("4. 根据 get_current_regime() 结果切换策略") print("\n验证要求: 状态识别准确率 > 72%") print("="*70) if __name__ == "__main__": main()