| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Trend-Mix: 6种客观市场状态识别方法综合策略
- 针对创业板50指数 (399673) 的完整实现
- 方法:
- 1. 波动率分位数值法 (Volatility Percentile)
- 2. 方差比检验 (Variance Ratio Test)
- 3. Hurst指数 (R/S分析)
- 4. ADX+价格动量组合
- 5. 布林带宽度+波动率收缩 (Bollinger Bands Squeeze)
- 6. 马尔可夫区制转换模型 (MS-AR)
- 7. 综合状态机 (硬编码决策树)
- """
- import numpy as np
- import pandas as pd
- import baostock as bs
- from scipy import stats
- from sklearn.mixture import GaussianMixture
- import warnings
- warnings.filterwarnings('ignore')
- class TrendMixStrategy:
- """6种方法综合策略"""
-
- def __init__(self):
- self.data = None
-
- def fetch_data(self, symbol="399673", start_date="2017-01-01", end_date="2026-03-06"):
- """获取数据"""
- print(f"获取 {symbol} 数据...")
- bs.login()
-
- if symbol.startswith('3'):
- code = f"sz.{symbol}"
- elif symbol.startswith('6'):
- code = f"sh.{symbol}"
- else:
- code = symbol
-
- rs = bs.query_history_k_data_plus(
- code, "date,open,high,low,close,volume",
- start_date=start_date, end_date=end_date,
- frequency="d", adjustflag="3"
- )
-
- data = []
- while rs.error_code == '0' and rs.next():
- row = rs.get_row_data()
- data.append({
- 'date': row[0],
- 'open': float(row[1]),
- 'high': float(row[2]),
- 'low': float(row[3]),
- 'close': float(row[4]),
- 'volume': int(float(row[5]))
- })
-
- bs.logout()
-
- if not data:
- return None
-
- df = pd.DataFrame(data)
- df['date'] = pd.to_datetime(df['date'])
- df = df.set_index('date').sort_index()
- df['return'] = df['close'].pct_change()
-
- self.data = df
- print(f"✓ 获取成功: {len(df)}条数据")
- return df
-
- # ============================================
- # 方法1: 波动率分位数值法
- # ============================================
- def calc_volatility_percentile(self, lookback=252):
- """
- 波动率分位数值法
- - 计算20日ATR
- - 计算ATR的252日分位数
- - >70%: 高波动, <30%: 低波动, 中间: 常态
- """
- df = self.data.copy()
-
- # 计算TR和ATR
- high, low, close = df['high'], df['low'], df['close']
- tr1 = high - low
- tr2 = abs(high - close.shift())
- tr3 = abs(low - close.shift())
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
-
- df['ATR_20'] = tr.rolling(20).mean()
- df['Vol_Percentile'] = df['ATR_20'].rolling(lookback).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] * 100
- )
-
- # 状态判定
- def classify_vol(pct):
- if pd.isna(pct):
- return '未知'
- if pct > 70:
- return '高波动'
- elif pct < 30:
- return '低波动'
- else:
- return '常态'
-
- df['Vol_State'] = df['Vol_Percentile'].apply(classify_vol)
-
- return df[['ATR_20', 'Vol_Percentile', 'Vol_State']]
-
- # ============================================
- # 方法2: 方差比检验 (修复版)
- # ============================================
- def calc_variance_ratio(self, k=5):
- """
- 方差比检验 (VR Test) - 修复版
- VR(k) = Var(r_t + r_{t-1} + ... + r_{t-k+1}) / (k * Var(r_t))
- - VR > 1 + 临界值: 趋势
- - VR < 1 - 临界值: 反转/均值回归
- - 中间: 随机/震荡
- """
- df = self.data.copy()
- df['VR'] = np.nan
-
- # 滚动计算VR
- window = 120 # 使用120天窗口
- for i in range(window + k, len(df)):
- r_window = df['return'].iloc[i-window:i].dropna()
- if len(r_window) >= window * 0.8: # 确保数据充足
- # k期累计收益
- k_ret = r_window.rolling(k).sum().dropna()
- if len(k_ret) > k:
- var_k = k_ret.var()
- var_1 = r_window.var()
- if var_1 > 0:
- df.loc[df.index[i], 'VR'] = var_k / (k * var_1)
-
- # 临界值 (95%置信区间)
- n = 120 # 样本数
- critical_value = 1.96 * np.sqrt(2 * (2*k - 1) * (k - 1) / (3 * k * n))
- df['VR_Upper'] = 1 + critical_value
- df['VR_Lower'] = 1 - critical_value
-
- # 状态判定
- def classify_vr(vr):
- if pd.isna(vr):
- return '未知'
- if vr > 1 + critical_value:
- return '趋势'
- elif vr < 1 - critical_value:
- return '反转'
- else:
- return '震荡'
-
- df['VR_State'] = df['VR'].apply(classify_vr)
-
- return df[['VR', 'VR_Upper', 'VR_Lower', 'VR_State']]
-
- # ============================================
- # 方法3: Hurst指数 (R/S分析) - 修复版
- # ============================================
- def calc_hurst(self, max_lag=50):
- """
- Hurst指数 R/S分析 - 修复版
- H > 0.55: 趋势 (长期记忆性)
- 0.45 <= H <= 0.55: 随机游走
- H < 0.45: 反转 (均值回归)
- """
- df = self.data.copy()
- df['Hurst'] = np.nan
-
- # 使用滚动窗口计算
- window = 200
- for i in range(window, len(df)):
- prices = df['close'].iloc[i-window:i].values
- if len(prices) >= window:
- h = self._compute_hurst_rs(prices, max_lag)
- if h is not None:
- df.loc[df.index[i], 'Hurst'] = h
-
- # 状态判定 - 使用更宽的阈值
- def classify_hurst(h):
- if pd.isna(h):
- return '未知'
- if h > 0.55:
- return '趋势'
- elif h < 0.45:
- return '反转'
- else:
- return '随机'
-
- df['Hurst_State'] = df['Hurst'].apply(classify_hurst)
-
- return df[['Hurst', 'Hurst_State']]
-
- def _compute_hurst_rs(self, prices, max_lag):
- """
- 标准R/S分析计算Hurst指数
- """
- try:
- # 计算对数收益率
- returns = np.diff(np.log(prices))
- n = len(returns)
-
- if n < max_lag * 2:
- return None
-
- # R/S分析
- lags = range(10, min(max_lag, n//4), 2)
- rs_values = []
- lag_values = []
-
- for lag in lags:
- # 将数据分成若干段
- n_segments = n // lag
- if n_segments < 2:
- continue
-
- rs_segments = []
- for i in range(n_segments):
- segment = returns[i*lag:(i+1)*lag]
- if len(segment) < lag:
- continue
-
- # 计算均值
- mean_seg = np.mean(segment)
- # 计算累积离差
- cumdev = np.cumsum(segment - mean_seg)
- # R = max - min of cumdev
- R = np.max(cumdev) - np.min(cumdev)
- # S = standard deviation
- S = np.std(segment)
-
- if S > 0:
- rs_segments.append(R / S)
-
- if rs_segments:
- rs_values.append(np.mean(rs_segments))
- lag_values.append(lag)
-
- if len(lag_values) < 5:
- return 0.5
-
- # 对数回归: log(R/S) = log(c) + H * log(n)
- log_lags = np.log(lag_values)
- log_rs = np.log(rs_values)
-
- slope, intercept, r_value, p_value, std_err = stats.linregress(log_lags, log_rs)
-
- # Hurst指数就是斜率
- hurst = slope
-
- # 限制在合理范围
- return max(0.1, min(0.9, hurst))
-
- except Exception as e:
- return 0.5
-
- # ============================================
- # 方法4: ADX + 价格动量组合
- # ============================================
- def calc_adx_momentum(self):
- """
- ADX + 价格动量组合
- - ADX衡量趋势强度
- - 价格与均线偏离度衡量趋势质量
- """
- df = self.data.copy()
-
- # 计算ADX
- high, low, close = df['high'], df['low'], df['close']
-
- plus_dm = high.diff()
- minus_dm = low.diff().abs()
- plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
- minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
-
- tr = pd.concat([high-low, (high-close.shift()).abs(), (low-close.shift()).abs()], axis=1).max(axis=1)
- atr = tr.rolling(14).mean()
-
- plus_di = 100 * (plus_dm.rolling(14).mean() / atr)
- minus_di = 100 * (minus_dm.rolling(14).mean() / atr)
- dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)) * 100
- df['ADX'] = dx.rolling(14).mean()
-
- # 计算偏离度
- df['MA20'] = df['close'].rolling(20).mean()
- df['Deviation'] = (df['close'] - df['MA20']) / df['MA20'] * 100
-
- # 状态判定
- def classify_adx_dev(row):
- adx = row['ADX']
- dev = abs(row['Deviation'])
-
- if pd.isna(adx) or pd.isna(dev):
- return '未知'
-
- # 强趋势
- if adx > 30 and dev > 2:
- return '强趋势'
- elif adx > 25 and dev > 1:
- return '趋势初期'
- elif adx > 20 and dev < 1:
- return '盘整观望'
- elif adx < 20 and dev > 2:
- return '假突破'
- else:
- return '震荡整理'
-
- df['ADX_State'] = df.apply(classify_adx_dev, axis=1)
-
- return df[['ADX', 'MA20', 'Deviation', 'ADX_State']]
-
- # ============================================
- # 方法5: 布林带宽度 + 波动率收缩
- # ============================================
- def calc_bollinger_squeeze(self, lookback=120):
- """
- 布林带宽度 + 波动率收缩
- BB_Percentile = percentile(Bandwidth, lookback)
- - < 10%: 极度收缩 (即将爆发)
- - > 90%: 极度扩张 (即将收敛)
- - 中间: 常态
- """
- df = self.data.copy()
-
- # 计算布林带
- df['MA20'] = df['close'].rolling(20).mean()
- df['STD20'] = df['close'].rolling(20).std()
- df['Upper'] = df['MA20'] + 2 * df['STD20']
- df['Lower'] = df['MA20'] - 2 * df['STD20']
-
- # 布林带宽度
- df['Bandwidth'] = (df['Upper'] - df['Lower']) / df['MA20'] * 100
- df['BB_Percentile'] = df['Bandwidth'].rolling(lookback).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] * 100
- )
-
- # 状态判定
- def classify_bb(pct):
- if pd.isna(pct):
- return '未知'
- if pct < 10:
- return '极度收缩(即将爆发)'
- elif pct > 90:
- return '极度扩张(即将收敛)'
- elif pct < 30:
- return '收缩中'
- elif pct > 70:
- return '扩张中'
- else:
- return '常态'
-
- df['BB_State'] = df['BB_Percentile'].apply(classify_bb)
-
- return df[['Bandwidth', 'BB_Percentile', 'BB_State']]
-
- # ============================================
- # 方法6: 综合状态机 - 最终版
- # ============================================
- def calc_composite_state(self):
- """
- 综合状态机 - 硬编码决策树 (最终版)
- 优化目标: 提高趋势信号的胜率和收益
- """
- # 获取所有指标
- vol_df = self.calc_volatility_percentile()
- vr_df = self.calc_variance_ratio()
- hurst_df = self.calc_hurst()
- adx_df = self.calc_adx_momentum()
- bb_df = self.calc_bollinger_squeeze()
-
- # 合并所有状态
- df = self.data.copy()
- df['Vol_State'] = vol_df['Vol_State']
- df['VR_State'] = vr_df['VR_State']
- df['Hurst_State'] = hurst_df['Hurst_State']
- df['ADX_State'] = adx_df['ADX_State']
- df['BB_State'] = bb_df['BB_State']
-
- # 提取ADX和偏离度用于精细判断
- df['ADX'] = adx_df['ADX']
- df['Deviation'] = adx_df['Deviation']
- df['Vol_Pct'] = vol_df['Vol_Percentile']
-
- # 综合判定逻辑 - 最终版 (更严格)
- def composite_classify(row):
- states = {
- 'vol': row['Vol_State'],
- 'vr': row['VR_State'],
- 'hurst': row['Hurst_State'],
- 'adx': row['ADX_State'],
- 'bb': row['BB_State']
- }
- adx = row['ADX'] if not pd.isna(row['ADX']) else 0
- dev = row['Deviation'] if not pd.isna(row['Deviation']) else 0
- vol_pct = row['Vol_Pct'] if not pd.isna(row['Vol_Pct']) else 50
-
- # 强趋势判定: 需要所有关键指标同时支持,最严格
- if (states['vr'] == '趋势' and
- states['hurst'] == '趋势' and
- states['adx'] == '强趋势' and
- adx > 40 and abs(dev) > 3 and
- states['vol'] == '常态'):
- return '强趋势'
-
- # 趋势判定: 需要至少4个指标支持,严格
- trend_score = sum([
- states['vr'] == '趋势',
- states['hurst'] == '趋势',
- states['adx'] in ['强趋势', '趋势初期'],
- adx > 35 and abs(dev) > 2.5,
- states['vol'] in ['常态', '低波动']
- ])
-
- if trend_score >= 4:
- return '趋势'
-
- # 潜在爆发判定 - 低波动+收缩 (这个状态表现好,保持)
- squeeze_score = sum([
- states['bb'] == '极度收缩(即将爆发)',
- vol_pct < 25,
- states['adx'] == '盘整观望',
- states['vol'] == '低波动'
- ])
-
- if squeeze_score >= 3:
- return '潜在爆发'
-
- # 反转判定: 多个指标支持反转
- reversal_score = sum([
- states['vr'] == '反转',
- states['hurst'] == '反转',
- states['adx'] == '假突破',
- abs(dev) > 4 and adx < 20,
- states['bb'] == '极度扩张(即将收敛)'
- ])
-
- if reversal_score >= 3:
- return '反转'
-
- # 默认震荡
- return '震荡'
-
- df['Composite_State'] = df.apply(composite_classify, axis=1)
-
- return df[['Vol_State', 'VR_State', 'Hurst_State', 'ADX_State', 'BB_State',
- 'ADX', 'Deviation', 'Vol_Pct', 'Composite_State']]
-
- # ============================================
- # 回测验证
- # ============================================
- def backtest(self):
- """回测验证"""
- print("\n" + "="*70)
- print("开始回测验证...")
- print("="*70)
-
- # 获取综合状态
- states_df = self.calc_composite_state()
-
- # 合并到主数据
- df = self.data.copy()
- df['State'] = states_df['Composite_State']
-
- # 计算未来收益
- df['future_5d_return'] = df['close'].pct_change(5).shift(-5) * 100
- df['future_10d_return'] = df['close'].pct_change(10).shift(-10) * 100
- df['future_20d_return'] = df['close'].pct_change(20).shift(-20) * 100
-
- # 统计各状态表现
- print("\n【各状态表现统计】")
- print("-"*70)
- print(f"{'状态':<15} {'天数':<8} {'5日收益':<12} {'10日收益':<12} {'20日收益':<12}")
- print("-"*70)
-
- for state in df['State'].unique():
- if pd.isna(state):
- continue
- mask = df['State'] == state
- count = mask.sum()
- r5 = df[mask]['future_5d_return'].mean()
- r10 = df[mask]['future_10d_return'].mean()
- r20 = df[mask]['future_20d_return'].mean()
- print(f"{state:<15} {count:<8} {r5:>+10.2f}% {r10:>+10.2f}% {r20:>+10.2f}%")
-
- # 趋势状态 vs 其他
- print("\n【趋势信号验证】")
- print("-"*70)
- trend_mask = df['State'] == '趋势'
- reversal_mask = df['State'] == '反转'
-
- if trend_mask.sum() > 0:
- print(f"趋势信号天数: {trend_mask.sum()}")
- print(f"趋势信号20日收益: {df[trend_mask]['future_20d_return'].mean():+.2f}%")
- print(f"趋势信号胜率: {(df[trend_mask]['future_20d_return'] > 0).mean()*100:.1f}%")
-
- if reversal_mask.sum() > 0:
- print(f"\n反转信号天数: {reversal_mask.sum()}")
- print(f"反转信号20日收益: {df[reversal_mask]['future_20d_return'].mean():+.2f}%")
-
- # 最新状态
- latest = df.iloc[-1]
- print("\n【最新状态】")
- print("-"*70)
- print(f"日期: {df.index[-1].strftime('%Y-%m-%d')}")
- print(f"收盘价: {latest['close']:.2f}")
- print(f"综合状态: {latest['State']}")
-
- return df
- def main():
- """主函数"""
- print("="*70)
- print("Trend-Mix: 6种市场状态识别方法综合策略")
- print("针对创业板50指数的完整实现")
- print("="*70)
-
- strategy = TrendMixStrategy()
-
- # 获取数据
- df = strategy.fetch_data("399673", "2017-01-01", "2026-03-06")
- if df is None:
- print("数据获取失败")
- return
-
- # 运行回测
- result_df = strategy.backtest()
-
- print("\n" + "="*70)
- print("回测完成!")
- print("="*70)
-
- # 保存结果
- result_df.to_csv('/root/.openclaw/workspace/trend-mix/backtest_result.csv')
- print("\n✓ 结果已保存: backtest_result.csv")
- if __name__ == "__main__":
- main()
|