#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50市场状态分类器 - 30分钟级别 基于本地5分钟数据文件,聚合成30分钟K线 """ import numpy as np import pandas as pd from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import classification_report, confusion_matrix import warnings warnings.filterwarnings('ignore') def load_5min_data(filepath='SZ#399673.txt'): """加载5分钟数据文件""" print(f"加载5分钟数据: {filepath}") # 读取数据,跳过前两行(标题行),过滤注释行 df = pd.read_csv(filepath, sep='\t', skiprows=2, encoding='gbk', header=None, comment='#', on_bad_lines='skip') # 指定列名 df.columns = ['date', 'time', 'open', 'high', 'low', 'close', 'volume', 'amount'] # 过滤掉包含非日期数据的行 df = df[df['date'].astype(str).str.match(r'\d{4}/\d{2}/\d{2}')].copy() # 创建datetime索引 # 处理time列: 如果是数字,格式化为4位时间字符串 def format_time(t): if pd.isna(t): return '0000' t = int(t) return f"{t:04d}" df['time_str'] = df['time'].apply(format_time) df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time_str'], format='%Y/%m/%d %H%M') df = df.set_index('datetime').sort_index() df = df.drop('time_str', axis=1) # 转换为数值类型 for col in ['open', 'high', 'low', 'close', 'volume', 'amount']: df[col] = pd.to_numeric(df[col], errors='coerce') print(f"[OK] 加载成功: {len(df)}条5分钟数据") print(f" 日期范围: {df.index[0]} ~ {df.index[-1]}") print(f" 价格范围: {df['close'].min():.2f} ~ {df['close'].max():.2f}") return df def resample_to_30min(df_5min): """将5分钟数据聚合成30分钟数据""" print("\n聚合成30分钟数据...") # 30分钟重采样规则 df_30min = df_5min.resample('30min').agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum', 'amount': 'sum' }).dropna() # 计算收益率 df_30min['return'] = df_30min['close'].pct_change() print(f"[OK] 聚合完成: {len(df_30min)}条30分钟数据") return df_30min def calculate_features_30min(df): """计算30分钟级别的技术指标特征""" features = pd.DataFrame(index=df.index) # 价格特征 features['close'] = df['close'] # 1. 收益率特征(30分钟周期) features['ret_1'] = df['return'] # 1个30分钟周期 features['ret_4'] = df['close'].pct_change(4) # 2小时 features['ret_8'] = df['close'].pct_change(8) # 4小时(半日) features['ret_16'] = df['close'].pct_change(16) # 8小时(1个交易日) # 2. 波动率特征(30分钟周期) features['volatility_4'] = df['return'].rolling(4).std() * np.sqrt(48) # 2小时波动率年化 features['volatility_16'] = df['return'].rolling(16).std() * np.sqrt(48) # 日波动率年化 features['volatility_ratio'] = features['volatility_4'] / (features['volatility_16'] + 1e-10) # 3. 动量特征 features['momentum_8'] = df['close'] / df['close'].shift(8) - 1 # 4小时动量 features['momentum_16'] = df['close'] / df['close'].shift(16) - 1 # 日动量 # 4. 均线特征(30分钟周期) features['ma4'] = df['close'].rolling(4).mean() # 2小时均线 features['ma16'] = df['close'].rolling(16).mean() # 日均线 features['ma48'] = df['close'].rolling(48).mean() # 3日均线 features['ma4_above_ma16'] = (features['ma4'] > features['ma16']).astype(int) # 5. RSI(14个30分钟周期 = 7小时) delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() rs = gain / (loss + 1e-10) features['rsi_14'] = 100 - (100 / (1 + rs)) features['rsi_overbought'] = (features['rsi_14'] > 70).astype(int) features['rsi_oversold'] = (features['rsi_14'] < 30).astype(int) # 6. MACD ema12 = df['close'].ewm(span=12).mean() ema26 = df['close'].ewm(span=26).mean() features['macd'] = ema12 - ema26 features['macd_signal'] = features['macd'].ewm(span=9).mean() features['macd_hist'] = features['macd'] - features['macd_signal'] # 7. 布林带 features['bb_middle'] = df['close'].rolling(20).mean() bb_std = df['close'].rolling(20).std() features['bb_upper'] = features['bb_middle'] + 2 * bb_std features['bb_lower'] = features['bb_middle'] - 2 * bb_std features['bb_position'] = (df['close'] - features['bb_lower']) / (features['bb_upper'] - features['bb_lower'] + 1e-10) # 8. 成交量特征 features['volume_ratio'] = df['volume'] / df['volume'].rolling(16).mean() features['volume_spike'] = (features['volume_ratio'] > 2).astype(int) # 9. 趋势强度(ADX近似) high_low = df['high'] - df['low'] features['atr_14'] = high_low.rolling(14).mean() features['atr_ratio'] = features['atr_14'] / df['close'] # 10. 日内时间特征 features['hour'] = df.index.hour features['is_morning'] = ((features['hour'] >= 9) & (features['hour'] < 11)).astype(int) features['is_afternoon'] = ((features['hour'] >= 13) & (features['hour'] < 15)).astype(int) # 11. 价格变化加速度 features['price_accel'] = df['close'].diff().diff() features['price_accel_normalized'] = features['price_accel'] / (df['close'] * 0.01) # 12. 连续涨跌周期数 features['consecutive_up'] = (df['return'] > 0).astype(int).groupby((df['return'] <= 0).astype(int).cumsum()).cumsum() features['consecutive_down'] = (df['return'] < 0).astype(int).groupby((df['return'] >= 0).astype(int).cumsum()).cumsum() # 填充缺失值 features = features.ffill().fillna(0) return features def define_market_regime_30min(df, lookback=8): """ 基于规则定义30分钟市场状态标签 参数: lookback: 回看周期数(默认8 = 4小时) """ labels = [] # 预计算RSI delta = df['close'].diff() gain = (delta.where(delta > 0, 0)).rolling(14).mean() loss = (-delta.where(delta < 0, 0)).rolling(14).mean() rs = gain / (loss + 1e-10) rsi = 100 - (100 / (1 + rs)) for i in range(len(df)): if i < lookback: labels.append(0) continue # 获取回看期间数据 period_close = df['close'].iloc[i-lookback:i] period_high = df['high'].iloc[i-lookback:i] period_low = df['low'].iloc[i-lookback:i] period_rsi = rsi.iloc[i-lookback:i] start_price = period_close.iloc[0] end_price = period_close.iloc[-1] period_return = (end_price / start_price - 1) * 100 daily_returns = period_close.pct_change().dropna() volatility = daily_returns.std() * np.sqrt(48) * 100 max_price = period_high.max() min_price = period_low.min() mid = lookback // 2 first_half_return = (period_close.iloc[mid] / start_price - 1) * 100 second_half_return = (end_price / period_close.iloc[mid] - 1) * 100 # RSI特征 rsi_start = period_rsi.iloc[0] rsi_end = period_rsi.iloc[-1] rsi_change = rsi_end - rsi_start # 定义标签 label = 0 # 默认震荡 # ========== 反转判断 ========== condition_1 = (rsi_start > 68 and rsi_change < -15) or (rsi_start < 32 and rsi_change > 15) condition_2 = (first_half_return * second_half_return < 0 and abs(first_half_return) > 1.5 and abs(second_half_return) > 1.0) condition_3 = (period_rsi.max() > 72 or period_rsi.min() < 28) condition_4 = 12 < volatility < 40 reversal_score = sum([condition_1, condition_2, condition_3, condition_4]) if reversal_score >= 2: label = 2 # ========== 趋势判断 ========== elif abs(period_return) >= 2.5 and volatility < 35: if reversal_score < 2: label = 1 labels.append(label) return np.array(labels) def train_and_predict(df_30min, features, labels): """训练模型并预测""" print("\n训练30分钟级别分类器...") # 对齐数据 valid_idx = ~np.isnan(labels) X = features[valid_idx] y = labels[valid_idx] df_aligned = df_30min.iloc[valid_idx].copy() # 分割训练集和测试集(按时间顺序,80%训练,20%测试) split_idx = int(len(X) * 0.8) X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:] y_train, y_test = y[:split_idx], y[split_idx:] print(f"训练集: {len(X_train)}条") print(f"测试集: {len(X_test)}条") # 训练模型 clf = RandomForestClassifier( n_estimators=200, max_depth=15, min_samples_split=10, min_samples_leaf=5, random_state=42, class_weight={0: 1.0, 1: 1.2, 2: 2.0} ) clf.fit(X_train, y_train) # 评估 train_score = clf.score(X_train, y_train) test_score = clf.score(X_test, y_test) print(f"\n训练准确率: {train_score:.2%}") print(f"测试准确率: {test_score:.2%}") # 详细报告 y_pred = clf.predict(X_test) print("\n分类报告:") print(classification_report(y_test, y_pred, target_names=['震荡', '趋势', '反转'])) # 预测所有数据 all_pred = clf.predict(X) all_proba = clf.predict_proba(X) # 添加预测结果到DataFrame df_aligned['state'] = all_pred df_aligned['prob_ranging'] = all_proba[:, 0] df_aligned['prob_trend'] = all_proba[:, 1] df_aligned['prob_reversal'] = all_proba[:, 2] # 特征重要性 feature_importance = pd.DataFrame({ 'feature': X.columns, 'importance': clf.feature_importances_ }).sort_values('importance', ascending=False) print("\n特征重要性 TOP 10:") print(feature_importance.head(10).to_string(index=False)) return clf, df_aligned, feature_importance def analyze_regime_distribution(df_result): """分析市场状态分布""" print("\n" + "="*70) print("30分钟市场状态分析") print("="*70) state_names = ['震荡', '趋势', '反转'] # 整体分布 print("\n【整体分布】") for i, name in enumerate(state_names): count = (df_result['state'] == i).sum() pct = count / len(df_result) * 100 print(f" {name}: {count}个周期 ({pct:.1f}%)") # 按日期统计 print("\n【最近5个交易日状态分布】") df_result['date'] = df_result.index.date recent_dates = df_result['date'].unique()[-5:] for date in recent_dates: day_data = df_result[df_result['date'] == date] print(f"\n {date}:") for i, name in enumerate(state_names): count = (day_data['state'] == i).sum() print(f" {name}: {count}个30分钟周期") # 当前状态 latest = df_result.iloc[-1] current_state = state_names[int(latest['state'])] print("\n【当前状态】") print(f" 时间: {df_result.index[-1]}") print(f" 收盘价: {latest['close']:.2f}") print(f" 市场状态: {current_state}") print(f" 置信度: {latest[['prob_ranging', 'prob_trend', 'prob_reversal']].max():.2%}") print(f" 概率分布: 震荡{latest['prob_ranging']:.1%} / 趋势{latest['prob_trend']:.1%} / 反转{latest['prob_reversal']:.1%}") def main(): """主程序""" print("="*70) print("创业板50市场状态分类器 - 30分钟级别") print("="*70) # 1. 加载5分钟数据 df_5min = load_5min_data('SZ#399673.txt') # 2. 聚合成30分钟数据 df_30min = resample_to_30min(df_5min) # 3. 计算特征 print("\n计算30分钟技术指标...") features = calculate_features_30min(df_30min) print(f"特征数量: {features.shape[1]}") # 4. 定义标签 print("\n定义市场状态标签...") labels = define_market_regime_30min(df_30min, lookback=8) # 统计标签分布 unique, counts = np.unique(labels, return_counts=True) print("\n标签分布:") state_names = ['震荡', '趋势', '反转'] for u, c in zip(unique, counts): print(f" {state_names[u]}: {c}个周期 ({c/len(labels)*100:.1f}%)") # 5. 训练并预测 clf, df_result, importance = train_and_predict(df_30min, features, labels) # 6. 分析结果 analyze_regime_distribution(df_result) # 7. 保存结果 print("\n保存结果...") df_result.to_csv('cyb50_30min_regime_result.csv') print("[OK] 结果已保存: cyb50_30min_regime_result.csv") # 保存模型 import pickle with open('rf_classifier_30min.pkl', 'wb') as f: pickle.dump(clf, f) print("[OK] 模型已保存: rf_classifier_30min.pkl") print("\n" + "="*70) if __name__ == "__main__": main()