#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Trend-Mix: 6种客观市场状态识别方法综合策略 针对创业板50指数 (399673) 的完整实现 方法: 1. 波动率分位数值法 (Volatility Percentile) 2. 方差比检验 (Variance Ratio Test) 3. Hurst指数 (R/S分析) 4. ADX+价格动量组合 5. 布林带宽度+波动率收缩 (Bollinger Bands Squeeze) 6. 马尔可夫区制转换模型 (MS-AR) 7. 综合状态机 (硬编码决策树) """ import numpy as np import pandas as pd import baostock as bs from scipy import stats from sklearn.mixture import GaussianMixture import warnings warnings.filterwarnings('ignore') class TrendMixStrategy: """6种方法综合策略""" def __init__(self): self.data = None def fetch_data(self, symbol="399673", start_date="2017-01-01", end_date="2026-03-06"): """获取数据""" print(f"获取 {symbol} 数据...") bs.login() if symbol.startswith('3'): code = f"sz.{symbol}" elif symbol.startswith('6'): code = f"sh.{symbol}" else: code = symbol rs = bs.query_history_k_data_plus( code, "date,open,high,low,close,volume", start_date=start_date, end_date=end_date, frequency="d", adjustflag="3" ) data = [] while rs.error_code == '0' and rs.next(): row = rs.get_row_data() data.append({ 'date': row[0], 'open': float(row[1]), 'high': float(row[2]), 'low': float(row[3]), 'close': float(row[4]), 'volume': int(float(row[5])) }) bs.logout() if not data: return None df = pd.DataFrame(data) df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() df['return'] = df['close'].pct_change() self.data = df print(f"✓ 获取成功: {len(df)}条数据") return df # ============================================ # 方法1: 波动率分位数值法 # ============================================ def calc_volatility_percentile(self, lookback=252): """ 波动率分位数值法 - 计算20日ATR - 计算ATR的252日分位数 - >70%: 高波动, <30%: 低波动, 中间: 常态 """ df = self.data.copy() # 计算TR和ATR high, low, close = df['high'], df['low'], df['close'] tr1 = high - low tr2 = abs(high - close.shift()) tr3 = abs(low - close.shift()) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['ATR_20'] = tr.rolling(20).mean() df['Vol_Percentile'] = df['ATR_20'].rolling(lookback).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] * 100 ) # 状态判定 def classify_vol(pct): if pd.isna(pct): return '未知' if pct > 70: return '高波动' elif pct < 30: return '低波动' else: return '常态' df['Vol_State'] = df['Vol_Percentile'].apply(classify_vol) return df[['ATR_20', 'Vol_Percentile', 'Vol_State']] # ============================================ # 方法2: 方差比检验 (修复版) # ============================================ def calc_variance_ratio(self, k=5): """ 方差比检验 (VR Test) - 修复版 VR(k) = Var(r_t + r_{t-1} + ... + r_{t-k+1}) / (k * Var(r_t)) - VR > 1 + 临界值: 趋势 - VR < 1 - 临界值: 反转/均值回归 - 中间: 随机/震荡 """ df = self.data.copy() df['VR'] = np.nan # 滚动计算VR window = 120 # 使用120天窗口 for i in range(window + k, len(df)): r_window = df['return'].iloc[i-window:i].dropna() if len(r_window) >= window * 0.8: # 确保数据充足 # k期累计收益 k_ret = r_window.rolling(k).sum().dropna() if len(k_ret) > k: var_k = k_ret.var() var_1 = r_window.var() if var_1 > 0: df.loc[df.index[i], 'VR'] = var_k / (k * var_1) # 临界值 (95%置信区间) n = 120 # 样本数 critical_value = 1.96 * np.sqrt(2 * (2*k - 1) * (k - 1) / (3 * k * n)) df['VR_Upper'] = 1 + critical_value df['VR_Lower'] = 1 - critical_value # 状态判定 def classify_vr(vr): if pd.isna(vr): return '未知' if vr > 1 + critical_value: return '趋势' elif vr < 1 - critical_value: return '反转' else: return '震荡' df['VR_State'] = df['VR'].apply(classify_vr) return df[['VR', 'VR_Upper', 'VR_Lower', 'VR_State']] # ============================================ # 方法3: Hurst指数 (R/S分析) - 修复版 # ============================================ def calc_hurst(self, max_lag=50): """ Hurst指数 R/S分析 - 修复版 H > 0.55: 趋势 (长期记忆性) 0.45 <= H <= 0.55: 随机游走 H < 0.45: 反转 (均值回归) """ df = self.data.copy() df['Hurst'] = np.nan # 使用滚动窗口计算 window = 200 for i in range(window, len(df)): prices = df['close'].iloc[i-window:i].values if len(prices) >= window: h = self._compute_hurst_rs(prices, max_lag) if h is not None: df.loc[df.index[i], 'Hurst'] = h # 状态判定 - 使用更宽的阈值 def classify_hurst(h): if pd.isna(h): return '未知' if h > 0.55: return '趋势' elif h < 0.45: return '反转' else: return '随机' df['Hurst_State'] = df['Hurst'].apply(classify_hurst) return df[['Hurst', 'Hurst_State']] def _compute_hurst_rs(self, prices, max_lag): """ 标准R/S分析计算Hurst指数 """ try: # 计算对数收益率 returns = np.diff(np.log(prices)) n = len(returns) if n < max_lag * 2: return None # R/S分析 lags = range(10, min(max_lag, n//4), 2) rs_values = [] lag_values = [] for lag in lags: # 将数据分成若干段 n_segments = n // lag if n_segments < 2: continue rs_segments = [] for i in range(n_segments): segment = returns[i*lag:(i+1)*lag] if len(segment) < lag: continue # 计算均值 mean_seg = np.mean(segment) # 计算累积离差 cumdev = np.cumsum(segment - mean_seg) # R = max - min of cumdev R = np.max(cumdev) - np.min(cumdev) # S = standard deviation S = np.std(segment) if S > 0: rs_segments.append(R / S) if rs_segments: rs_values.append(np.mean(rs_segments)) lag_values.append(lag) if len(lag_values) < 5: return 0.5 # 对数回归: log(R/S) = log(c) + H * log(n) log_lags = np.log(lag_values) log_rs = np.log(rs_values) slope, intercept, r_value, p_value, std_err = stats.linregress(log_lags, log_rs) # Hurst指数就是斜率 hurst = slope # 限制在合理范围 return max(0.1, min(0.9, hurst)) except Exception as e: return 0.5 # ============================================ # 方法4: ADX + 价格动量组合 # ============================================ def calc_adx_momentum(self): """ ADX + 价格动量组合 - ADX衡量趋势强度 - 价格与均线偏离度衡量趋势质量 """ df = self.data.copy() # 计算ADX high, low, close = df['high'], df['low'], df['close'] plus_dm = high.diff() minus_dm = low.diff().abs() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0) tr = pd.concat([high-low, (high-close.shift()).abs(), (low-close.shift()).abs()], axis=1).max(axis=1) atr = tr.rolling(14).mean() plus_di = 100 * (plus_dm.rolling(14).mean() / atr) minus_di = 100 * (minus_dm.rolling(14).mean() / atr) dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)) * 100 df['ADX'] = dx.rolling(14).mean() # 计算偏离度 df['MA20'] = df['close'].rolling(20).mean() df['Deviation'] = (df['close'] - df['MA20']) / df['MA20'] * 100 # 状态判定 def classify_adx_dev(row): adx = row['ADX'] dev = abs(row['Deviation']) if pd.isna(adx) or pd.isna(dev): return '未知' # 强趋势 if adx > 30 and dev > 2: return '强趋势' elif adx > 25 and dev > 1: return '趋势初期' elif adx > 20 and dev < 1: return '盘整观望' elif adx < 20 and dev > 2: return '假突破' else: return '震荡整理' df['ADX_State'] = df.apply(classify_adx_dev, axis=1) return df[['ADX', 'MA20', 'Deviation', 'ADX_State']] # ============================================ # 方法5: 布林带宽度 + 波动率收缩 # ============================================ def calc_bollinger_squeeze(self, lookback=120): """ 布林带宽度 + 波动率收缩 BB_Percentile = percentile(Bandwidth, lookback) - < 10%: 极度收缩 (即将爆发) - > 90%: 极度扩张 (即将收敛) - 中间: 常态 """ df = self.data.copy() # 计算布林带 df['MA20'] = df['close'].rolling(20).mean() df['STD20'] = df['close'].rolling(20).std() df['Upper'] = df['MA20'] + 2 * df['STD20'] df['Lower'] = df['MA20'] - 2 * df['STD20'] # 布林带宽度 df['Bandwidth'] = (df['Upper'] - df['Lower']) / df['MA20'] * 100 df['BB_Percentile'] = df['Bandwidth'].rolling(lookback).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] * 100 ) # 状态判定 def classify_bb(pct): if pd.isna(pct): return '未知' if pct < 10: return '极度收缩(即将爆发)' elif pct > 90: return '极度扩张(即将收敛)' elif pct < 30: return '收缩中' elif pct > 70: return '扩张中' else: return '常态' df['BB_State'] = df['BB_Percentile'].apply(classify_bb) return df[['Bandwidth', 'BB_Percentile', 'BB_State']] # ============================================ # 方法6: 综合状态机 - 最终版 # ============================================ def calc_composite_state(self): """ 综合状态机 - 硬编码决策树 (最终版) 优化目标: 提高趋势信号的胜率和收益 """ # 获取所有指标 vol_df = self.calc_volatility_percentile() vr_df = self.calc_variance_ratio() hurst_df = self.calc_hurst() adx_df = self.calc_adx_momentum() bb_df = self.calc_bollinger_squeeze() # 合并所有状态 df = self.data.copy() df['Vol_State'] = vol_df['Vol_State'] df['VR_State'] = vr_df['VR_State'] df['Hurst_State'] = hurst_df['Hurst_State'] df['ADX_State'] = adx_df['ADX_State'] df['BB_State'] = bb_df['BB_State'] # 提取ADX和偏离度用于精细判断 df['ADX'] = adx_df['ADX'] df['Deviation'] = adx_df['Deviation'] df['Vol_Pct'] = vol_df['Vol_Percentile'] # 综合判定逻辑 - 最终版 (更严格) def composite_classify(row): states = { 'vol': row['Vol_State'], 'vr': row['VR_State'], 'hurst': row['Hurst_State'], 'adx': row['ADX_State'], 'bb': row['BB_State'] } adx = row['ADX'] if not pd.isna(row['ADX']) else 0 dev = row['Deviation'] if not pd.isna(row['Deviation']) else 0 vol_pct = row['Vol_Pct'] if not pd.isna(row['Vol_Pct']) else 50 # 强趋势判定: 需要所有关键指标同时支持,最严格 if (states['vr'] == '趋势' and states['hurst'] == '趋势' and states['adx'] == '强趋势' and adx > 40 and abs(dev) > 3 and states['vol'] == '常态'): return '强趋势' # 趋势判定: 需要至少4个指标支持,严格 trend_score = sum([ states['vr'] == '趋势', states['hurst'] == '趋势', states['adx'] in ['强趋势', '趋势初期'], adx > 35 and abs(dev) > 2.5, states['vol'] in ['常态', '低波动'] ]) if trend_score >= 4: return '趋势' # 潜在爆发判定 - 低波动+收缩 (这个状态表现好,保持) squeeze_score = sum([ states['bb'] == '极度收缩(即将爆发)', vol_pct < 25, states['adx'] == '盘整观望', states['vol'] == '低波动' ]) if squeeze_score >= 3: return '潜在爆发' # 反转判定: 多个指标支持反转 reversal_score = sum([ states['vr'] == '反转', states['hurst'] == '反转', states['adx'] == '假突破', abs(dev) > 4 and adx < 20, states['bb'] == '极度扩张(即将收敛)' ]) if reversal_score >= 3: return '反转' # 默认震荡 return '震荡' df['Composite_State'] = df.apply(composite_classify, axis=1) return df[['Vol_State', 'VR_State', 'Hurst_State', 'ADX_State', 'BB_State', 'ADX', 'Deviation', 'Vol_Pct', 'Composite_State']] # ============================================ # 回测验证 # ============================================ def backtest(self): """回测验证""" print("\n" + "="*70) print("开始回测验证...") print("="*70) # 获取综合状态 states_df = self.calc_composite_state() # 合并到主数据 df = self.data.copy() df['State'] = states_df['Composite_State'] # 计算未来收益 df['future_5d_return'] = df['close'].pct_change(5).shift(-5) * 100 df['future_10d_return'] = df['close'].pct_change(10).shift(-10) * 100 df['future_20d_return'] = df['close'].pct_change(20).shift(-20) * 100 # 统计各状态表现 print("\n【各状态表现统计】") print("-"*70) print(f"{'状态':<15} {'天数':<8} {'5日收益':<12} {'10日收益':<12} {'20日收益':<12}") print("-"*70) for state in df['State'].unique(): if pd.isna(state): continue mask = df['State'] == state count = mask.sum() r5 = df[mask]['future_5d_return'].mean() r10 = df[mask]['future_10d_return'].mean() r20 = df[mask]['future_20d_return'].mean() print(f"{state:<15} {count:<8} {r5:>+10.2f}% {r10:>+10.2f}% {r20:>+10.2f}%") # 趋势状态 vs 其他 print("\n【趋势信号验证】") print("-"*70) trend_mask = df['State'] == '趋势' reversal_mask = df['State'] == '反转' if trend_mask.sum() > 0: print(f"趋势信号天数: {trend_mask.sum()}") print(f"趋势信号20日收益: {df[trend_mask]['future_20d_return'].mean():+.2f}%") print(f"趋势信号胜率: {(df[trend_mask]['future_20d_return'] > 0).mean()*100:.1f}%") if reversal_mask.sum() > 0: print(f"\n反转信号天数: {reversal_mask.sum()}") print(f"反转信号20日收益: {df[reversal_mask]['future_20d_return'].mean():+.2f}%") # 最新状态 latest = df.iloc[-1] print("\n【最新状态】") print("-"*70) print(f"日期: {df.index[-1].strftime('%Y-%m-%d')}") print(f"收盘价: {latest['close']:.2f}") print(f"综合状态: {latest['State']}") return df def main(): """主函数""" print("="*70) print("Trend-Mix: 6种市场状态识别方法综合策略") print("针对创业板50指数的完整实现") print("="*70) strategy = TrendMixStrategy() # 获取数据 df = strategy.fetch_data("399673", "2017-01-01", "2026-03-06") if df is None: print("数据获取失败") return # 运行回测 result_df = strategy.backtest() print("\n" + "="*70) print("回测完成!") print("="*70) # 保存结果 result_df.to_csv('/root/.openclaw/workspace/trend-mix/backtest_result.csv') print("\n✓ 结果已保存: backtest_result.csv") if __name__ == "__main__": main()