|
|
@@ -0,0 +1,435 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+市场环境识别器 (Market Regime Identifier)
|
|
|
+基于HMM隐马尔可夫模型的市场状态识别系统
|
|
|
+
|
|
|
+状态定义:
|
|
|
+- 状态0(震荡):价格波动大但无明显方向,Hurst指数≈0.5,自相关性低
|
|
|
+- 状态1(趋势):价格持续单向运动,Hurst指数>0.6,高自相关
|
|
|
+- 状态2(反转):超买/超卖后的V型反转,RSI极端值后的快速回归
|
|
|
+
|
|
|
+作者: OpenClaw
|
|
|
+日期: 2026-03-06
|
|
|
+"""
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+from hmmlearn.hmm import GaussianHMM
|
|
|
+from scipy import stats
|
|
|
+import warnings
|
|
|
+warnings.filterwarnings('ignore')
|
|
|
+
|
|
|
+# ==================== 特征工程 ====================
|
|
|
+
|
|
|
+def calculate_hurst(prices, max_lag=100):
|
|
|
+ """
|
|
|
+ 计算Hurst指数
|
|
|
+ H ≈ 0.5: 随机游走(震荡)
|
|
|
+ H > 0.6: 趋势性
|
|
|
+ H < 0.4: 均值回归
|
|
|
+ """
|
|
|
+ lags = range(2, min(max_lag, len(prices)//4))
|
|
|
+ tau = [np.std(np.subtract(prices[lag:], prices[:-lag])) for lag in lags]
|
|
|
+
|
|
|
+ if len(tau) < 2 or any(t <= 0 for t in tau):
|
|
|
+ return 0.5
|
|
|
+
|
|
|
+ reg = np.polyfit(np.log(lags), np.log(tau), 1)
|
|
|
+ return reg[0]
|
|
|
+
|
|
|
+def calculate_rsi(prices, period=14):
|
|
|
+ """计算RSI指标"""
|
|
|
+ deltas = np.diff(prices)
|
|
|
+ gains = np.where(deltas > 0, deltas, 0)
|
|
|
+ losses = np.where(deltas < 0, -deltas, 0)
|
|
|
+
|
|
|
+ avg_gains = np.convolve(gains, np.ones(period)/period, mode='valid')
|
|
|
+ avg_losses = np.convolve(losses, np.ones(period)/period, mode='valid')
|
|
|
+
|
|
|
+ rs = avg_gains / (avg_losses + 1e-10)
|
|
|
+ rsi = 100 - (100 / (1 + rs))
|
|
|
+
|
|
|
+ # 补齐长度
|
|
|
+ padding = np.full(period, 50)
|
|
|
+ return np.concatenate([padding, rsi])
|
|
|
+
|
|
|
+def extract_features(df):
|
|
|
+ """
|
|
|
+ 提取特征向量 X_t
|
|
|
+ X_t = [收益率标准差(5日), 价格动量(10日), 波动率比率(短/长), 成交量变化率, 日内趋势强度]
|
|
|
+ """
|
|
|
+ features = pd.DataFrame(index=df.index)
|
|
|
+
|
|
|
+ # 1. 收益率标准差(5日)
|
|
|
+ returns = df['close'].pct_change()
|
|
|
+ features['ret_std_5'] = returns.rolling(5).std() * np.sqrt(252)
|
|
|
+
|
|
|
+ # 2. 价格动量(10日)
|
|
|
+ features['momentum_10'] = (df['close'] / df['close'].shift(10) - 1) * 100
|
|
|
+
|
|
|
+ # 3. 波动率比率(短/长)
|
|
|
+ vol_short = returns.rolling(5).std()
|
|
|
+ vol_long = returns.rolling(20).std()
|
|
|
+ features['vol_ratio'] = vol_short / (vol_long + 1e-10)
|
|
|
+
|
|
|
+ # 4. 成交量变化率
|
|
|
+ features['volume_change'] = df['volume'].pct_change() * 100
|
|
|
+
|
|
|
+ # 5. 日内趋势强度
|
|
|
+ features['intraday_trend'] = ((df['close'] - df['open']) / (df['high'] - df['low'] + 1e-10)) * 100
|
|
|
+
|
|
|
+ # 6. Hurst指数(额外特征)
|
|
|
+ features['hurst'] = df['close'].rolling(100).apply(calculate_hurst, raw=True)
|
|
|
+
|
|
|
+ # 7. RSI
|
|
|
+ features['rsi'] = calculate_rsi(df['close'].values)
|
|
|
+
|
|
|
+ # 8. 自相关性
|
|
|
+ features['autocorr'] = returns.rolling(20).apply(lambda x: x.autocorr(lag=1) if len(x) > 1 else 0)
|
|
|
+
|
|
|
+ # 填充缺失值
|
|
|
+ features = features.ffill().fillna(0)
|
|
|
+
|
|
|
+ return features
|
|
|
+
|
|
|
+# ==================== HMM模型 ====================
|
|
|
+
|
|
|
+class MarketRegimeHMM:
|
|
|
+ """市场环境HMM模型"""
|
|
|
+
|
|
|
+ # 状态名称
|
|
|
+ STATE_NAMES = {
|
|
|
+ 0: '震荡',
|
|
|
+ 1: '趋势',
|
|
|
+ 2: '反转'
|
|
|
+ }
|
|
|
+
|
|
|
+ # 先验转移概率矩阵
|
|
|
+ PRIOR_TRANSITION = np.array([
|
|
|
+ [0.85, 0.10, 0.05], # 震荡 -> 震荡/趋势/反转
|
|
|
+ [0.15, 0.80, 0.05], # 趋势 -> 震荡/趋势/反转
|
|
|
+ [0.20, 0.10, 0.70] # 反转 -> 震荡/趋势/反转
|
|
|
+ ])
|
|
|
+
|
|
|
+ def __init__(self, n_components=3, n_iter=100):
|
|
|
+ self.model = GaussianHMM(
|
|
|
+ n_components=n_components,
|
|
|
+ covariance_type='full',
|
|
|
+ n_iter=n_iter,
|
|
|
+ random_state=42
|
|
|
+ )
|
|
|
+ self.is_fitted = False
|
|
|
+
|
|
|
+ def fit(self, features):
|
|
|
+ """训练HMM模型"""
|
|
|
+ print("训练HMM模型...")
|
|
|
+
|
|
|
+ # 使用先验转移概率初始化
|
|
|
+ self.model.transmat_ = self.PRIOR_TRANSITION
|
|
|
+
|
|
|
+ # 拟合模型
|
|
|
+ X = features.values
|
|
|
+ self.model.fit(X)
|
|
|
+ self.is_fitted = True
|
|
|
+
|
|
|
+ print(f"模型收敛: {self.model.monitor_.converged}")
|
|
|
+ print(f"迭代次数: {self.model.n_iter}")
|
|
|
+ print("\n学习到的转移概率矩阵:")
|
|
|
+ print(self.model.transmat_.round(3))
|
|
|
+
|
|
|
+ return self
|
|
|
+
|
|
|
+ def predict(self, features):
|
|
|
+ """预测状态序列"""
|
|
|
+ if not self.is_fitted:
|
|
|
+ raise ValueError("模型尚未训练,请先调用fit()")
|
|
|
+
|
|
|
+ X = features.values
|
|
|
+ states = self.model.predict(X)
|
|
|
+
|
|
|
+ # 计算状态概率
|
|
|
+ state_probs = self.model.predict_proba(X)
|
|
|
+
|
|
|
+ return states, state_probs
|
|
|
+
|
|
|
+ def get_current_regime(self, features):
|
|
|
+ """获取当前市场状态"""
|
|
|
+ states, probs = self.predict(features)
|
|
|
+ current_state = states[-1]
|
|
|
+ current_prob = probs[-1]
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'state': current_state,
|
|
|
+ 'state_name': self.STATE_NAMES[current_state],
|
|
|
+ 'probabilities': {
|
|
|
+ self.STATE_NAMES[i]: current_prob[i]
|
|
|
+ for i in range(len(self.STATE_NAMES))
|
|
|
+ },
|
|
|
+ 'confidence': current_prob[current_state]
|
|
|
+ }
|
|
|
+
|
|
|
+# ==================== 策略切换逻辑 ====================
|
|
|
+
|
|
|
+class StrategySelector:
|
|
|
+ """基于市场状态的策略选择器"""
|
|
|
+
|
|
|
+ STRATEGY_CONFIG = {
|
|
|
+ 0: { # 震荡
|
|
|
+ 'name': '均值回归',
|
|
|
+ 'action': 'RSI超买超卖交易',
|
|
|
+ 'position_size': 0.5, # 降低仓位
|
|
|
+ 'stop_loss': '2N',
|
|
|
+ 'description': '关闭趋势策略,使用RSI超买(>70)超卖(<30)信号'
|
|
|
+ },
|
|
|
+ 1: { # 趋势
|
|
|
+ 'name': '海龟趋势',
|
|
|
+ 'action': '全速运行',
|
|
|
+ 'position_size': 1.0, # 全仓位
|
|
|
+ 'stop_loss': '2N',
|
|
|
+ 'description': '增加仓位,突破20日高低点交易'
|
|
|
+ },
|
|
|
+ 2: { # 反转
|
|
|
+ 'name': '反向/观望',
|
|
|
+ 'action': '反向信号或空仓',
|
|
|
+ 'position_size': 0.3, # 最小仓位
|
|
|
+ 'stop_loss': '1N', # 收紧止损
|
|
|
+ 'description': '反向信号或观望,收紧止损'
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def get_strategy(cls, state):
|
|
|
+ """根据状态获取策略配置"""
|
|
|
+ return cls.STRATEGY_CONFIG.get(state, cls.STRATEGY_CONFIG[0])
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def generate_signal(cls, state, rsi_value, price, ma20):
|
|
|
+ """生成交易信号"""
|
|
|
+ strategy = cls.get_strategy(state)
|
|
|
+
|
|
|
+ signal = {
|
|
|
+ 'state': state,
|
|
|
+ 'strategy': strategy['name'],
|
|
|
+ 'position_size': strategy['position_size'],
|
|
|
+ 'action': 'HOLD'
|
|
|
+ }
|
|
|
+
|
|
|
+ if state == 0: # 震荡 - RSI均值回归
|
|
|
+ if rsi_value < 30:
|
|
|
+ signal['action'] = 'BUY'
|
|
|
+ signal['reason'] = 'RSI超卖'
|
|
|
+ elif rsi_value > 70:
|
|
|
+ signal['action'] = 'SELL'
|
|
|
+ signal['reason'] = 'RSI超买'
|
|
|
+
|
|
|
+ elif state == 1: # 趋势 - 突破系统
|
|
|
+ if price > ma20 * 1.02:
|
|
|
+ signal['action'] = 'BUY'
|
|
|
+ signal['reason'] = '突破20日均线2%'
|
|
|
+ elif price < ma20 * 0.98:
|
|
|
+ signal['action'] = 'SELL'
|
|
|
+ signal['reason'] = '跌破20日均线2%'
|
|
|
+
|
|
|
+ elif state == 2: # 反转 - 反向或观望
|
|
|
+ if rsi_value > 70:
|
|
|
+ signal['action'] = 'SELL'
|
|
|
+ signal['reason'] = '超买后反转'
|
|
|
+ elif rsi_value < 30:
|
|
|
+ signal['action'] = 'BUY'
|
|
|
+ signal['reason'] = '超卖后反转'
|
|
|
+ else:
|
|
|
+ signal['action'] = 'HOLD'
|
|
|
+ signal['reason'] = '观望'
|
|
|
+
|
|
|
+ return signal
|
|
|
+
|
|
|
+# ==================== 模型评估 ====================
|
|
|
+
|
|
|
+def evaluate_model(hmm, features, true_states=None):
|
|
|
+ """
|
|
|
+ 评估模型性能
|
|
|
+
|
|
|
+ 由于真实状态未知,使用以下指标:
|
|
|
+ 1. 对数似然值
|
|
|
+ 2. AIC/BIC
|
|
|
+ 3. 状态持续时间合理性
|
|
|
+ 4. 状态与价格行为的对应关系
|
|
|
+ """
|
|
|
+ X = features.values
|
|
|
+
|
|
|
+ # 计算对数似然
|
|
|
+ log_likelihood = hmm.model.score(X)
|
|
|
+
|
|
|
+ # 计算AIC和BIC
|
|
|
+ n_params = hmm.model.n_components * (hmm.model.n_features + hmm.model.n_features * (hmm.model.n_features + 1) / 2) + hmm.model.n_components * hmm.model.n_components
|
|
|
+ n_samples = len(X)
|
|
|
+ aic = -2 * log_likelihood + 2 * n_params
|
|
|
+ bic = -2 * log_likelihood + n_params * np.log(n_samples)
|
|
|
+
|
|
|
+ print(f"\n模型评估指标:")
|
|
|
+ print(f"对数似然: {log_likelihood:.2f}")
|
|
|
+ print(f"AIC: {aic:.2f}")
|
|
|
+ print(f"BIC: {bic:.2f}")
|
|
|
+
|
|
|
+ # 预测状态
|
|
|
+ states, probs = hmm.predict(features)
|
|
|
+
|
|
|
+ # 统计状态分布
|
|
|
+ state_counts = pd.Series(states).value_counts().sort_index()
|
|
|
+ state_pct = (state_counts / len(states) * 100).round(2)
|
|
|
+
|
|
|
+ print(f"\n状态分布:")
|
|
|
+ for state_id, state_name in hmm.STATE_NAMES.items():
|
|
|
+ count = state_counts.get(state_id, 0)
|
|
|
+ pct = state_pct.get(state_id, 0)
|
|
|
+ print(f" {state_name}: {count}天 ({pct}%)")
|
|
|
+
|
|
|
+ # 计算平均状态持续时间
|
|
|
+ state_durations = []
|
|
|
+ current_state = states[0]
|
|
|
+ duration = 1
|
|
|
+
|
|
|
+ for s in states[1:]:
|
|
|
+ if s == current_state:
|
|
|
+ duration += 1
|
|
|
+ else:
|
|
|
+ state_durations.append((current_state, duration))
|
|
|
+ current_state = s
|
|
|
+ duration = 1
|
|
|
+ state_durations.append((current_state, duration))
|
|
|
+
|
|
|
+ print(f"\n平均状态持续时间:")
|
|
|
+ for state_id in range(3):
|
|
|
+ durations = [d for s, d in state_durations if s == state_id]
|
|
|
+ if durations:
|
|
|
+ avg_duration = np.mean(durations)
|
|
|
+ print(f" {hmm.STATE_NAMES[state_id]}: {avg_duration:.1f}天")
|
|
|
+
|
|
|
+ return {
|
|
|
+ 'log_likelihood': log_likelihood,
|
|
|
+ 'aic': aic,
|
|
|
+ 'bic': bic,
|
|
|
+ 'state_distribution': state_counts.to_dict(),
|
|
|
+ 'states': states,
|
|
|
+ 'state_probs': probs
|
|
|
+ }
|
|
|
+
|
|
|
+# ==================== 主程序 ====================
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主程序"""
|
|
|
+ print("="*70)
|
|
|
+ print("市场环境识别器 (Market Regime Identifier)")
|
|
|
+ print("基于HMM隐马尔可夫模型")
|
|
|
+ print("="*70)
|
|
|
+
|
|
|
+ # 示例:使用随机数据演示
|
|
|
+ print("\n注意:这是演示版本,请使用真实数据运行")
|
|
|
+ print("数据格式要求:DataFrame包含 'open', 'high', 'low', 'close', 'volume' 列")
|
|
|
+
|
|
|
+ # 生成示例数据
|
|
|
+ np.random.seed(42)
|
|
|
+ n_days = 500
|
|
|
+ dates = pd.date_range('2023-01-01', periods=n_days, freq='B')
|
|
|
+
|
|
|
+ # 模拟价格走势(包含趋势、震荡、反转三种状态)
|
|
|
+ price = 100
|
|
|
+ prices = []
|
|
|
+
|
|
|
+ for i in range(n_days):
|
|
|
+ # 模拟不同状态
|
|
|
+ if i < 150: # 趋势
|
|
|
+ price *= (1 + np.random.normal(0.001, 0.01))
|
|
|
+ elif i < 300: # 震荡
|
|
|
+ price *= (1 + np.random.normal(0, 0.015))
|
|
|
+ else: # 反转
|
|
|
+ if i < 375:
|
|
|
+ price *= (1 + np.random.normal(-0.002, 0.012))
|
|
|
+ else:
|
|
|
+ price *= (1 + np.random.normal(0.002, 0.012))
|
|
|
+ prices.append(price)
|
|
|
+
|
|
|
+ df = pd.DataFrame({
|
|
|
+ 'open': prices + np.random.normal(0, 0.5, n_days),
|
|
|
+ 'high': np.array(prices) + np.abs(np.random.normal(1, 0.5, n_days)),
|
|
|
+ 'low': np.array(prices) - np.abs(np.random.normal(1, 0.5, n_days)),
|
|
|
+ 'close': prices,
|
|
|
+ 'volume': np.random.randint(1000000, 5000000, n_days)
|
|
|
+ }, index=dates)
|
|
|
+
|
|
|
+ print(f"\n示例数据: {len(df)}天")
|
|
|
+ print(f"日期范围: {df.index[0].date()} ~ {df.index[-1].date()}")
|
|
|
+
|
|
|
+ # 特征提取
|
|
|
+ print("\n提取特征...")
|
|
|
+ features = extract_features(df)
|
|
|
+
|
|
|
+ # 选择训练特征(核心5个)
|
|
|
+ feature_cols = ['ret_std_5', 'momentum_10', 'vol_ratio', 'volume_change', 'intraday_trend']
|
|
|
+ X_train = features[feature_cols].dropna()
|
|
|
+
|
|
|
+ print(f"特征矩阵: {X_train.shape}")
|
|
|
+
|
|
|
+ # 训练HMM模型
|
|
|
+ hmm = MarketRegimeHMM(n_components=3, n_iter=100)
|
|
|
+ hmm.fit(X_train)
|
|
|
+
|
|
|
+ # 预测状态
|
|
|
+ states, probs = hmm.predict(X_train)
|
|
|
+
|
|
|
+ # 评估模型
|
|
|
+ eval_results = evaluate_model(hmm, X_train)
|
|
|
+
|
|
|
+ # 获取当前状态
|
|
|
+ current_regime = hmm.get_current_regime(X_train)
|
|
|
+
|
|
|
+ print("\n" + "="*70)
|
|
|
+ print("当前市场状态识别")
|
|
|
+ print("="*70)
|
|
|
+ print(f"状态: {current_regime['state_name']} (状态{current_regime['state']})")
|
|
|
+ print(f"置信度: {current_regime['confidence']:.2%}")
|
|
|
+ print("\n状态概率分布:")
|
|
|
+ for name, prob in current_regime['probabilities'].items():
|
|
|
+ bar = '█' * int(prob * 20)
|
|
|
+ print(f" {name:6s}: {prob:.2%} {bar}")
|
|
|
+
|
|
|
+ # 策略建议
|
|
|
+ strategy = StrategySelector.get_strategy(current_regime['state'])
|
|
|
+ current_rsi = features['rsi'].iloc[-1]
|
|
|
+ current_price = df['close'].iloc[-1]
|
|
|
+ current_ma20 = df['close'].rolling(20).mean().iloc[-1]
|
|
|
+
|
|
|
+ signal = StrategySelector.generate_signal(
|
|
|
+ current_regime['state'],
|
|
|
+ current_rsi,
|
|
|
+ current_price,
|
|
|
+ current_ma20
|
|
|
+ )
|
|
|
+
|
|
|
+ print("\n" + "="*70)
|
|
|
+ print("策略建议")
|
|
|
+ print("="*70)
|
|
|
+ print(f"推荐策略: {strategy['name']}")
|
|
|
+ print(f"操作策略: {strategy['action']}")
|
|
|
+ print(f"仓位建议: {strategy['position_size']*100:.0f}%")
|
|
|
+ print(f"止损设置: {strategy['stop_loss']}")
|
|
|
+ print(f"描述: {strategy['description']}")
|
|
|
+
|
|
|
+ print("\n交易信号:")
|
|
|
+ print(f" 动作: {signal['action']}")
|
|
|
+ if 'reason' in signal:
|
|
|
+ print(f" 原因: {signal['reason']}")
|
|
|
+
|
|
|
+ print("\n" + "="*70)
|
|
|
+ print("使用说明:")
|
|
|
+ print("="*70)
|
|
|
+ print("1. 准备真实市场数据(2017-2025年)")
|
|
|
+ print("2. 调用 extract_features(df) 提取特征")
|
|
|
+ print("3. 使用 MarketRegimeHMM 训练模型")
|
|
|
+ print("4. 根据 get_current_regime() 结果切换策略")
|
|
|
+ print("\n验证要求: 状态识别准确率 > 72%")
|
|
|
+ print("="*70)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|