#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50市场状态分类器 - 30分钟级别(优化版) 专为30分钟交易策略优化,增强交易信号生成 """ import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.metrics import classification_report, confusion_matrix import warnings warnings.filterwarnings('ignore') def load_5min_data(filepath='SZ#399673.txt'): """加载5分钟数据文件""" print(f"加载5分钟数据: {filepath}") df = pd.read_csv(filepath, sep='\t', skiprows=2, encoding='gbk', header=None, comment='#', on_bad_lines='skip') df.columns = ['date', 'time', 'open', 'high', 'low', 'close', 'volume', 'amount'] df = df[df['date'].astype(str).str.match(r'\d{4}/\d{2}/\d{2}')].copy() def format_time(t): if pd.isna(t): return '0000' t = int(t) return f"{t:04d}" df['time_str'] = df['time'].apply(format_time) df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time_str'], format='%Y/%m/%d %H%M') df = df.set_index('datetime').sort_index() df = df.drop('time_str', axis=1) for col in ['open', 'high', 'low', 'close', 'volume', 'amount']: df[col] = pd.to_numeric(df[col], errors='coerce') print(f"[OK] 加载成功: {len(df)}条5分钟数据") print(f" 日期范围: {df.index[0]} ~ {df.index[-1]}") return df def resample_to_30min(df_5min): """将5分钟数据聚合成30分钟数据""" print("\n聚合成30分钟数据...") df_30min = df_5min.resample('30min').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'amount': 'sum' }).dropna() df_30min['return'] = df_30min['close'].pct_change() # 计算K线实体和影线 df_30min['body'] = df_30min['close'] - df_30min['open'] df_30min['upper_shadow'] = df_30min['high'] - df_30min[['open', 'close']].max(axis=1) df_30min['lower_shadow'] = df_30min[['open', 'close']].min(axis=1) - df_30min['low'] df_30min['body_pct'] = abs(df_30min['body']) / (df_30min['high'] - df_30min['low'] + 1e-10) print(f"[OK] 聚合完成: {len(df_30min)}条30分钟数据") return df_30min def calculate_features_30min_v2(df): """优化版30分钟特征计算 - 更适合交易决策""" features = pd.DataFrame(index=df.index) features['close'] = df['close'] # ========== 1. 收益率特征 ========== features['ret_1'] = df['return'] # 当前周期 features['ret_2'] = df['close'].pct_change(2) # 1小时 features['ret_4'] = df['close'].pct_change(4) # 2小时 features['ret_8'] = df['close'].pct_change(8) # 半日 features['ret_16'] = df['close'].pct_change(16) # 1日 # 累计收益率 features['cum_ret_4h'] = (df['close'] / df['close'].shift(8) - 1) # 4小时累计 features['cum_ret_1d'] = (df['close'] / df['close'].shift(16) - 1) # 1日累计 # ========== 2. 波动率特征 ========== features['volatility_4'] = df['return'].rolling(4).std() * np.sqrt(48) features['volatility_8'] = df['return'].rolling(8).std() * np.sqrt(48) features['volatility_16'] = df['return'].rolling(16).std() * np.sqrt(48) features['vol_ratio'] = features['volatility_4'] / (features['volatility_16'] + 1e-10) # 波动率变化 features['vol_change'] = features['volatility_8'].diff() # ========== 3. 趋势特征 ========== # 多周期均线 features['ma4'] = df['close'].rolling(4).mean() # 2小时 features['ma8'] = df['close'].rolling(8).mean() # 半日 features['ma16'] = df['close'].rolling(16).mean() # 1日 features['ma48'] = df['close'].rolling(48).mean() # 3日 # 均线关系 features['ma4_above_ma8'] = (features['ma4'] > features['ma8']).astype(int) features['ma8_above_ma16'] = (features['ma8'] > features['ma16']).astype(int) features['ma_slope_4'] = features['ma4'].diff(4) / features['ma4'].shift(4) * 100 # 价格与均线偏离 features['dist_to_ma4'] = (df['close'] - features['ma4']) / features['ma4'] * 100 features['dist_to_ma16'] = (df['close'] - features['ma16']) / features['ma16'] * 100 # ========== 4. 动量指标 ========== # RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() rs = gain / (loss + 1e-10) features['rsi_14'] = 100 - (100 / (1 + rs)) features['rsi_change'] = features['rsi_14'].diff(2) # RSI状态 features['rsi_overbought'] = (features['rsi_14'] > 70).astype(int) features['rsi_oversold'] = (features['rsi_14'] < 30).astype(int) features['rsi_neutral'] = ((features['rsi_14'] >= 40) & (features['rsi_14'] <= 60)).astype(int) # ========== 5. MACD ========== ema12 = df['close'].ewm(span=12).mean() ema26 = df['close'].ewm(span=26).mean() features['macd'] = ema12 - ema26 features['macd_signal'] = features['macd'].ewm(span=9).mean() features['macd_hist'] = features['macd'] - features['macd_signal'] features['macd_cross'] = ((features['macd'] > features['macd_signal']) & (features['macd'].shift(1) <= features['macd_signal'].shift(1))).astype(int) # ========== 6. 布林带 ========== features['bb_middle'] = df['close'].rolling(20).mean() bb_std = df['close'].rolling(20).std() features['bb_upper'] = features['bb_middle'] + 2 * bb_std features['bb_lower'] = features['bb_middle'] - 2 * bb_std features['bb_width'] = (features['bb_upper'] - features['bb_lower']) / features['bb_middle'] * 100 features['bb_position'] = (df['close'] - features['bb_lower']) / (features['bb_upper'] - features['bb_lower'] + 1e-10) # 是否触及上下轨 features['touch_upper'] = (df['close'] >= features['bb_upper'] * 0.995).astype(int) features['touch_lower'] = (df['close'] <= features['bb_lower'] * 1.005).astype(int) # ========== 7. K线形态特征 ========== features['body_pct'] = df['body_pct'] features['upper_shadow_ratio'] = df['upper_shadow'] / (df['high'] - df['low'] + 1e-10) features['lower_shadow_ratio'] = df['lower_shadow'] / (df['high'] - df['low'] + 1e-10) # 锤子/吊颈线识别 features['hammer'] = ((features['lower_shadow_ratio'] > 0.6) & (features['body_pct'] < 0.3)).astype(int) features['hanging_man'] = ((features['upper_shadow_ratio'] > 0.6) & (features['body_pct'] < 0.3)).astype(int) # ========== 8. 成交量特征 ========== features['volume_ratio'] = df['volume'] / df['volume'].rolling(16).mean() features['volume_spike'] = (features['volume_ratio'] > 2).astype(int) features['volume_trend'] = df['volume'].rolling(8).apply(lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) == 8 else 0) # 量价关系 features['vol_price_corr'] = df['volume'].rolling(8).corr(df['close']) # ========== 9. ATR与波动 ========== high_low = df['high'] - df['low'] high_close = abs(df['high'] - df['close'].shift()) low_close = abs(df['low'] - df['close'].shift()) tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) features['atr_14'] = tr.rolling(14).mean() features['atr_ratio'] = features['atr_14'] / df['close'] * 100 # ========== 10. 时间特征 ========== features['hour'] = df.index.hour features['minute'] = df.index.minute features['is_open'] = ((df.index.hour == 9) & (df.index.minute == 30)).astype(int) features['is_morning'] = ((df.index.hour >= 9) & (df.index.hour < 11)).astype(int) features['is_afternoon'] = ((df.index.hour >= 13) & (df.index.hour < 15)).astype(int) features['is_close'] = ((df.index.hour == 15) & (df.index.minute == 0)).astype(int) # ========== 11. 价格行为 ========== # 连续涨跌 features['consecutive_up'] = (df['return'] > 0).astype(int).groupby( (df['return'] <= 0).astype(int).cumsum()).cumsum() features['consecutive_down'] = (df['return'] < 0).astype(int).groupby( (df['return'] >= 0).astype(int).cumsum()).cumsum() # 加速度 features['price_accel'] = df['close'].diff().diff() features['return_accel'] = df['return'].diff() # ========== 12. 支撑阻力 ========== # 近期高低点 features['near_high_8'] = (df['close'] >= df['high'].rolling(8).max() * 0.995).astype(int) features['near_low_8'] = (df['close'] <= df['low'].rolling(8).min() * 1.005).astype(int) # 填充缺失值 features = features.ffill().fillna(0) return features def define_market_regime_30min_v2(df, features, lookback=8): """ 优化版30分钟市场状态标签定义 状态定义: 0 = 震荡 - 适合观望或区间交易 1 = 趋势 - 适合趋势跟随 2 = 反转 - 适合反向交易或减仓 """ labels = [] n = len(df) for i in range(n): if i < lookback: labels.append(0) continue # 获取回看窗口数据 window_close = df['close'].iloc[i-lookback:i] window_high = df['high'].iloc[i-lookback:i] window_low = df['low'].iloc[i-lookback:i] window_rsi = features['rsi_14'].iloc[i-lookback:i] window_vol = features['volatility_4'].iloc[i-lookback:i] start_price = window_close.iloc[0] end_price = window_close.iloc[-1] period_return = (end_price / start_price - 1) * 100 # 波动率 volatility = window_vol.mean() # 价格区间 max_price = window_high.max() min_price = window_low.min() price_range = (max_price - min_price) / start_price * 100 # RSI特征 rsi_start = window_rsi.iloc[0] rsi_end = window_rsi.iloc[-1] rsi_change = rsi_end - rsi_start rsi_max = window_rsi.max() rsi_min = window_rsi.min() # 判断逻辑 label = 0 # 默认震荡 # ===== 反转信号判断 ===== reversal_signals = 0 # RSI极值反转 if (rsi_start > 70 and rsi_change < -15) or (rsi_start < 30 and rsi_change > 15): reversal_signals += 2 elif (rsi_max > 75 or rsi_min < 25): reversal_signals += 1 # 价格触及极端后回落 if price_range > 4 and abs(period_return) < 1: reversal_signals += 1 # RSI背离 if period_return > 2 and rsi_change < -5: reversal_signals += 2 elif period_return < -2 and rsi_change > 5: reversal_signals += 2 # 布林带触及 bb_pos = features['bb_position'].iloc[i] if (bb_pos > 0.95 and period_return < 0) or (bb_pos < 0.05 and period_return > 0): reversal_signals += 1 if reversal_signals >= 3: label = 2 # 反转 # ===== 趋势信号判断 ===== elif label == 0: # 不是反转才判断趋势 trend_signals = 0 # 明显的价格方向 if abs(period_return) >= 2.5: trend_signals += 2 elif abs(period_return) >= 1.5: trend_signals += 1 # 低波动率(趋势市场通常波动率适中) if 10 < volatility < 30: trend_signals += 1 # RSI趋势一致 if period_return > 0 and rsi_change > 5: trend_signals += 1 elif period_return < 0 and rsi_change < -5: trend_signals += 1 # 均线排列 if features['ma4_above_ma8'].iloc[i] == 1 and period_return > 0: trend_signals += 1 elif features['ma4_above_ma8'].iloc[i] == 0 and period_return < 0: trend_signals += 1 # MACD支持 if features['macd_hist'].iloc[i] * period_return > 0: trend_signals += 1 if trend_signals >= 4: label = 1 # 趋势 labels.append(label) return np.array(labels) def backtest_strategy(df_result, initial_capital=1000000): """ 基于30分钟状态识别进行策略回测 策略规则: - 震荡:观望(不持仓) - 趋势:跟随趋势(买入或做空) - 反转:反向交易或减仓 """ print("\n" + "="*70) print("30分钟状态策略回测") print("="*70) # 计算收益率 df_result = df_result.copy() df_result['ret'] = df_result['close'].pct_change() capital = initial_capital position = 0 # 0=空仓, 1=做多, -1=做空 entry_price = 0 trades = [] for i in range(1, len(df_result)): current = df_result.iloc[i] prev = df_result.iloc[i-1] state = int(current['state']) price = current['close'] ret = current['ret'] # 状态转换信号 if position == 0: # 空仓 if state == 1: # 趋势 -> 开仓 # 根据当前趋势方向决定多空 position = 1 if ret > 0 else -1 entry_price = price trades.append({ 'time': df_result.index[i], 'action': 'OPEN', 'position': 'LONG' if position == 1 else 'SHORT', 'price': price }) else: # 有持仓 # 检查出场条件 exit_signal = False if state == 2: # 反转信号 -> 出场 exit_signal = True elif state == 0: # 震荡 -> 出场观望 exit_signal = True elif position == 1 and ret < -0.008: # 做多止损0.8% exit_signal = True elif position == -1 and ret > 0.008: # 做空止损0.8% exit_signal = True if exit_signal: pnl = (price / entry_price - 1) * position capital *= (1 + pnl) trades.append({ 'time': df_result.index[i], 'action': 'CLOSE', 'position': 'LONG' if position == 1 else 'SHORT', 'price': price, 'pnl': pnl }) position = 0 # 计算回测结果 total_return = (capital / initial_capital - 1) * 100 print(f"\n初始资金: {initial_capital:,.0f}") print(f"最终资金: {capital:,.0f}") print(f"总收益率: {total_return:+.2f}%") print(f"交易次数: {len([t for t in trades if t['action'] == 'CLOSE'])}") if len(trades) > 0: closes = [t for t in trades if t['action'] == 'CLOSE'] wins = len([t for t in closes if t.get('pnl', 0) > 0]) win_rate = wins / len(closes) * 100 if closes else 0 print(f"胜率: {win_rate:.1f}%") return trades, capital def train_and_evaluate(df_30min, features, labels): """训练和评估模型""" print("\n训练30分钟分类器...") valid_idx = ~np.isnan(labels) X = features[valid_idx] y = labels[valid_idx] df_aligned = df_30min.iloc[valid_idx].copy() # 时间序列分割 split_idx = int(len(X) * 0.8) X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:] y_train, y_test = y[:split_idx], y[split_idx:] print(f"训练集: {len(X_train)}条") print(f"测试集: {len(X_test)}条") # 使用GBDT(通常比RF更适合时序) clf = GradientBoostingClassifier( n_estimators=150, max_depth=5, learning_rate=0.1, random_state=42 ) clf.fit(X_train, y_train) train_score = clf.score(X_train, y_train) test_score = clf.score(X_test, y_test) print(f"\n训练准确率: {train_score:.2%}") print(f"测试准确率: {test_score:.2%}") y_pred = clf.predict(X_test) print("\n分类报告:") print(classification_report(y_test, y_pred, target_names=['震荡', '趋势', '反转'])) # 预测所有数据 all_pred = clf.predict(X) all_proba = clf.predict_proba(X) df_aligned['state'] = all_pred df_aligned['prob_ranging'] = all_proba[:, 0] df_aligned['prob_trend'] = all_proba[:, 1] df_aligned['prob_reversal'] = all_proba[:, 2] # 特征重要性 importance = pd.DataFrame({ 'feature': X.columns, 'importance': clf.feature_importances_ }).sort_values('importance', ascending=False) print("\n特征重要性 TOP 15:") print(importance.head(15).to_string(index=False)) return clf, df_aligned, importance def main(): """主程序""" print("="*70) print("创业板50市场状态分类器 - 30分钟级别(优化版)") print("="*70) # 1. 加载数据 df_5min = load_5min_data('SZ#399673.txt') # 2. 聚合30分钟 df_30min = resample_to_30min(df_5min) # 3. 计算优化特征 print("\n计算30分钟特征(优化版)...") features = calculate_features_30min_v2(df_30min) print(f"特征数量: {features.shape[1]}") # 4. 定义标签 print("\n定义市场状态标签(优化版)...") labels = define_market_regime_30min_v2(df_30min, features, lookback=8) # 统计 unique, counts = np.unique(labels, return_counts=True) print("\n标签分布:") state_names = ['震荡', '趋势', '反转'] for u, c in zip(unique, counts): print(f" {state_names[u]}: {c}个周期 ({c/len(labels)*100:.1f}%)") # 5. 训练模型 clf, df_result, importance = train_and_evaluate(df_30min, features, labels) # 6. 策略回测 trades, final_capital = backtest_strategy(df_result) # 7. 保存结果 print("\n保存结果...") df_result.to_csv('cyb50_30min_regime_v2.csv') print("[OK] 结果已保存: cyb50_30min_regime_v2.csv") import pickle with open('rf_classifier_30min_v2.pkl', 'wb') as f: pickle.dump(clf, f) print("[OK] 模型已保存: rf_classifier_30min_v2.pkl") print("\n" + "="*70) if __name__ == "__main__": main()