#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ HMM模型诊断脚本 验证市场环境识别器的效果 """ import numpy as np import pandas as pd import warnings warnings.filterwarnings('ignore') import sys from pathlib import Path PROJECT_DIR = Path(__file__).resolve().parent if str(PROJECT_DIR) not in sys.path: sys.path.insert(0, str(PROJECT_DIR)) from market_regime_hmm import MarketRegimeHMM, extract_features print("="*70) print("HMM模型诊断报告") print("="*70) # 1. 生成带标签的测试数据 print("\n[1] 生成测试数据...") np.random.seed(42) n_days = 800 # 创建有明确状态特征的数据 segments = [] true_states = [] for i in range(8): state = i % 3 seg_prices = [] price = 1000 + i * 100 for day in range(100): if state == 0: # 震荡: 零均值,中等波动 ret = np.random.normal(0, 0.015) elif state == 1: # 趋势: 正漂移,低波动 ret = np.random.normal(0.001, 0.010) else: # 反转: 前半段单边,后半段反向,形成真正的拐点 if day < 50: direction = 1 if (i % 2 == 0) else -1 ret = np.random.normal(direction * 0.0018, 0.018) else: direction = -1 if (i % 2 == 0) else 1 ret = np.random.normal(direction * 0.0018, 0.018) price *= (1 + ret) seg_prices.append(price) true_states.append(state) segments.extend(seg_prices) # 为反转段补充一个更符合定义的说明 print(" 反转段定义: 前50天单边运行,后50天反向运行") dates = pd.date_range('2020-01-01', periods=n_days, freq='B') df = pd.DataFrame({ 'open': np.array(segments) + np.random.normal(0, 2, n_days), 'high': np.array(segments) + np.abs(np.random.normal(5, 2, n_days)), 'low': np.array(segments) - np.abs(np.random.normal(5, 2, n_days)), 'close': segments, 'volume': np.random.randint(1000000, 5000000, n_days), 'true_state': true_states }, index=dates) print(f"数据天数: {n_days}") print(f"真实状态分布:") for i in range(3): count = sum(1 for s in true_states if s == i) print(f" 状态{i}: {count}天 ({count/n_days*100:.1f}%)") # 2. 特征提取 print("\n[2] 特征提取...") features = extract_features(df) feature_cols = ['ret_std_5', 'momentum_10', 'vol_ratio', 'volume_change', 'intraday_trend'] X = features[feature_cols].dropna() print(f"特征维度: {X.shape}") # 3. 训练模型 print("\n[3] 训练HMM模型...") hmm = MarketRegimeHMM(n_components=3, n_iter=100) hmm.fit(X) # 4. 预测状态 states, probs = hmm.predict(X) df_aligned = df.iloc[-len(states):].copy() df_aligned['predicted_state'] = states df_aligned['return'] = df_aligned['close'].pct_change() # 5. 诊断分析 print("\n" + "="*70) print("诊断结果") print("="*70) # 5.1 转移矩阵对比 print("\n[5.1] 转移矩阵对比") print("\n先验矩阵 (设定):") prior = np.array([ [0.85, 0.10, 0.05], [0.15, 0.80, 0.05], [0.20, 0.10, 0.70] ]) print(prior.round(3)) print("\n学习到的矩阵:") learned = hmm.model.transmat_ print(learned.round(3)) print("\n差异:") diff = np.abs(learned - prior) print(diff.round(3)) print(f"平均绝对差异: {diff.mean():.3f}") # 5.2 状态分布对比 print("\n[5.2] 状态分布对比") print(f"{'状态':<10} {'真实占比':<15} {'预测占比':<15} {'差异':<10}") print("-"*50) for i in range(3): true_pct = sum(1 for s in true_states if s == i) / n_days * 100 pred_pct = sum(1 for s in states if s == i) / len(states) * 100 diff_pct = abs(true_pct - pred_pct) print(f"状态{i:<5} {true_pct:>6.1f}%{' '*8} {pred_pct:>6.1f}%{' '*8} {diff_pct:>5.1f}%") # 5.3 状态特征验证 print("\n[5.3] 各状态的价格行为特征") print(f"{'状态':<8} {'收益率均值':<12} {'收益率标准差':<15} {'样本数':<10}") print("-"*50) for i in range(3): mask = states == i if mask.any(): rets = df_aligned.loc[mask, 'return'].dropna() mean_ret = rets.mean() * 100 std_ret = rets.std() * 100 count = mask.sum() print(f"状态{i:<5} {mean_ret:>+8.3f}%{' '*4} {std_ret:>8.3f}%{' '*6} {count:>5}天") # 5.4 预期 vs 实际 print("\n[5.4] 状态定义验证") state_names = ['震荡', '趋势', '反转'] expected = { 0: {'vol': '中高', 'ret': '接近0'}, 1: {'vol': '低', 'ret': '单边正/负漂移'}, 2: {'vol': '较高', 'ret': '阶段内先同向后反向'} } for i in range(3): mask = states == i if mask.any(): rets = df_aligned.loc[mask, 'return'].dropna() mean_ret = rets.mean() * 100 std_ret = rets.std() * 100 print(f"\n状态{i} ({state_names[i]}):") print(f" 预期: 波动{expected[i]['vol']}, 收益{expected[i]['ret']}") print(f" 实际: 波动{std_ret:.2f}%, 收益{mean_ret:+.3f}%") # 简单判断 if i == 0 and abs(mean_ret) < 0.1 and std_ret > 1.0: print(" ✓ 符合震荡特征") elif i == 1 and mean_ret > 0.05 and std_ret < 1.5: print(" ✓ 符合趋势特征") elif i == 2 and std_ret > 1.8: print(" ✓ 符合反转特征") else: print(" ✗ 特征不匹配") # 5.5 准确率估算 print("\n[5.5] 状态识别准确率估算") # 基于特征匹配度估算 matches = 0 for i in range(len(states) - 1): true_seg = i // 100 if states[i] == true_states[i]: matches += 1 accuracy = matches / len(states) * 100 print(f"与生成标签匹配率: {accuracy:.1f}%") if accuracy >= 72: print("✓ 达到目标准确率 (>72%)") else: print("✗ 未达到目标准确率,需要优化") print("\n" + "="*70) print("诊断结论") print("="*70) print(f"1. 转移矩阵与先验差异: {'可接受' if diff.mean() < 0.3 else '较大'}") print(f"2. 状态识别准确率: {accuracy:.1f}%") print(f"3. 状态特征一致性: 见上文分析") print("\n建议:") if diff.mean() > 0.3: print("- 转移矩阵与先验差异较大,建议检查数据特征或调整模型参数") if accuracy < 72: print("- 准确率不足,建议增加特征维度或使用更长的训练数据") print("="*70)