| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50市场状态分类器 - 30分钟级别
- 基于本地5分钟数据文件,聚合成30分钟K线
- """
- import numpy as np
- import pandas as pd
- from sklearn.ensemble import RandomForestClassifier
- from sklearn.metrics import classification_report, confusion_matrix
- import warnings
- warnings.filterwarnings('ignore')
- def load_5min_data(filepath='SZ#399673.txt'):
- """加载5分钟数据文件"""
- print(f"加载5分钟数据: {filepath}")
- # 读取数据,跳过前两行(标题行),过滤注释行
- df = pd.read_csv(filepath, sep='\t', skiprows=2, encoding='gbk', header=None,
- comment='#', on_bad_lines='skip')
- # 指定列名
- df.columns = ['date', 'time', 'open', 'high', 'low', 'close', 'volume', 'amount']
- # 过滤掉包含非日期数据的行
- df = df[df['date'].astype(str).str.match(r'\d{4}/\d{2}/\d{2}')].copy()
- # 创建datetime索引
- # 处理time列: 如果是数字,格式化为4位时间字符串
- def format_time(t):
- if pd.isna(t):
- return '0000'
- t = int(t)
- return f"{t:04d}"
- df['time_str'] = df['time'].apply(format_time)
- df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time_str'],
- format='%Y/%m/%d %H%M')
- df = df.set_index('datetime').sort_index()
- df = df.drop('time_str', axis=1)
- # 转换为数值类型
- for col in ['open', 'high', 'low', 'close', 'volume', 'amount']:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- print(f"[OK] 加载成功: {len(df)}条5分钟数据")
- print(f" 日期范围: {df.index[0]} ~ {df.index[-1]}")
- print(f" 价格范围: {df['close'].min():.2f} ~ {df['close'].max():.2f}")
- return df
- def resample_to_30min(df_5min):
- """将5分钟数据聚合成30分钟数据"""
- print("\n聚合成30分钟数据...")
- # 30分钟重采样规则
- df_30min = df_5min.resample('30min').agg({
- 'open': 'first',
- 'high': 'max',
- 'low': 'min',
- 'close': 'last',
- 'volume': 'sum',
- 'amount': 'sum'
- }).dropna()
- # 计算收益率
- df_30min['return'] = df_30min['close'].pct_change()
- print(f"[OK] 聚合完成: {len(df_30min)}条30分钟数据")
- return df_30min
- def calculate_features_30min(df):
- """计算30分钟级别的技术指标特征"""
- features = pd.DataFrame(index=df.index)
- # 价格特征
- features['close'] = df['close']
- # 1. 收益率特征(30分钟周期)
- features['ret_1'] = df['return'] # 1个30分钟周期
- features['ret_4'] = df['close'].pct_change(4) # 2小时
- features['ret_8'] = df['close'].pct_change(8) # 4小时(半日)
- features['ret_16'] = df['close'].pct_change(16) # 8小时(1个交易日)
- # 2. 波动率特征(30分钟周期)
- features['volatility_4'] = df['return'].rolling(4).std() * np.sqrt(48) # 2小时波动率年化
- features['volatility_16'] = df['return'].rolling(16).std() * np.sqrt(48) # 日波动率年化
- features['volatility_ratio'] = features['volatility_4'] / (features['volatility_16'] + 1e-10)
- # 3. 动量特征
- features['momentum_8'] = df['close'] / df['close'].shift(8) - 1 # 4小时动量
- features['momentum_16'] = df['close'] / df['close'].shift(16) - 1 # 日动量
- # 4. 均线特征(30分钟周期)
- features['ma4'] = df['close'].rolling(4).mean() # 2小时均线
- features['ma16'] = df['close'].rolling(16).mean() # 日均线
- features['ma48'] = df['close'].rolling(48).mean() # 3日均线
- features['ma4_above_ma16'] = (features['ma4'] > features['ma16']).astype(int)
- # 5. RSI(14个30分钟周期 = 7小时)
- delta = df['close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
- rs = gain / (loss + 1e-10)
- features['rsi_14'] = 100 - (100 / (1 + rs))
- features['rsi_overbought'] = (features['rsi_14'] > 70).astype(int)
- features['rsi_oversold'] = (features['rsi_14'] < 30).astype(int)
- # 6. MACD
- ema12 = df['close'].ewm(span=12).mean()
- ema26 = df['close'].ewm(span=26).mean()
- features['macd'] = ema12 - ema26
- features['macd_signal'] = features['macd'].ewm(span=9).mean()
- features['macd_hist'] = features['macd'] - features['macd_signal']
- # 7. 布林带
- features['bb_middle'] = df['close'].rolling(20).mean()
- bb_std = df['close'].rolling(20).std()
- features['bb_upper'] = features['bb_middle'] + 2 * bb_std
- features['bb_lower'] = features['bb_middle'] - 2 * bb_std
- features['bb_position'] = (df['close'] - features['bb_lower']) / (features['bb_upper'] - features['bb_lower'] + 1e-10)
- # 8. 成交量特征
- features['volume_ratio'] = df['volume'] / df['volume'].rolling(16).mean()
- features['volume_spike'] = (features['volume_ratio'] > 2).astype(int)
- # 9. 趋势强度(ADX近似)
- high_low = df['high'] - df['low']
- features['atr_14'] = high_low.rolling(14).mean()
- features['atr_ratio'] = features['atr_14'] / df['close']
- # 10. 日内时间特征
- features['hour'] = df.index.hour
- features['is_morning'] = ((features['hour'] >= 9) & (features['hour'] < 11)).astype(int)
- features['is_afternoon'] = ((features['hour'] >= 13) & (features['hour'] < 15)).astype(int)
- # 11. 价格变化加速度
- features['price_accel'] = df['close'].diff().diff()
- features['price_accel_normalized'] = features['price_accel'] / (df['close'] * 0.01)
- # 12. 连续涨跌周期数
- features['consecutive_up'] = (df['return'] > 0).astype(int).groupby((df['return'] <= 0).astype(int).cumsum()).cumsum()
- features['consecutive_down'] = (df['return'] < 0).astype(int).groupby((df['return'] >= 0).astype(int).cumsum()).cumsum()
- # 填充缺失值
- features = features.ffill().fillna(0)
- return features
- def define_market_regime_30min(df, lookback=8):
- """
- 基于规则定义30分钟市场状态标签
- 参数:
- lookback: 回看周期数(默认8 = 4小时)
- """
- labels = []
- # 预计算RSI
- delta = df['close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
- rs = gain / (loss + 1e-10)
- rsi = 100 - (100 / (1 + rs))
- for i in range(len(df)):
- if i < lookback:
- labels.append(0)
- continue
- # 获取回看期间数据
- period_close = df['close'].iloc[i-lookback:i]
- period_high = df['high'].iloc[i-lookback:i]
- period_low = df['low'].iloc[i-lookback:i]
- period_rsi = rsi.iloc[i-lookback:i]
- start_price = period_close.iloc[0]
- end_price = period_close.iloc[-1]
- period_return = (end_price / start_price - 1) * 100
- daily_returns = period_close.pct_change().dropna()
- volatility = daily_returns.std() * np.sqrt(48) * 100
- max_price = period_high.max()
- min_price = period_low.min()
- mid = lookback // 2
- first_half_return = (period_close.iloc[mid] / start_price - 1) * 100
- second_half_return = (end_price / period_close.iloc[mid] - 1) * 100
- # RSI特征
- rsi_start = period_rsi.iloc[0]
- rsi_end = period_rsi.iloc[-1]
- rsi_change = rsi_end - rsi_start
- # 定义标签
- label = 0 # 默认震荡
- # ========== 反转判断 ==========
- condition_1 = (rsi_start > 68 and rsi_change < -15) or (rsi_start < 32 and rsi_change > 15)
- condition_2 = (first_half_return * second_half_return < 0 and
- abs(first_half_return) > 1.5 and abs(second_half_return) > 1.0)
- condition_3 = (period_rsi.max() > 72 or period_rsi.min() < 28)
- condition_4 = 12 < volatility < 40
- reversal_score = sum([condition_1, condition_2, condition_3, condition_4])
- if reversal_score >= 2:
- label = 2
- # ========== 趋势判断 ==========
- elif abs(period_return) >= 2.5 and volatility < 35:
- if reversal_score < 2:
- label = 1
- labels.append(label)
- return np.array(labels)
- def train_and_predict(df_30min, features, labels):
- """训练模型并预测"""
- print("\n训练30分钟级别分类器...")
- # 对齐数据
- valid_idx = ~np.isnan(labels)
- X = features[valid_idx]
- y = labels[valid_idx]
- df_aligned = df_30min.iloc[valid_idx].copy()
- # 分割训练集和测试集(按时间顺序,80%训练,20%测试)
- split_idx = int(len(X) * 0.8)
- X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
- y_train, y_test = y[:split_idx], y[split_idx:]
- print(f"训练集: {len(X_train)}条")
- print(f"测试集: {len(X_test)}条")
- # 训练模型
- clf = RandomForestClassifier(
- n_estimators=200,
- max_depth=15,
- min_samples_split=10,
- min_samples_leaf=5,
- random_state=42,
- class_weight={0: 1.0, 1: 1.2, 2: 2.0}
- )
- clf.fit(X_train, y_train)
- # 评估
- train_score = clf.score(X_train, y_train)
- test_score = clf.score(X_test, y_test)
- print(f"\n训练准确率: {train_score:.2%}")
- print(f"测试准确率: {test_score:.2%}")
- # 详细报告
- y_pred = clf.predict(X_test)
- print("\n分类报告:")
- print(classification_report(y_test, y_pred, target_names=['震荡', '趋势', '反转']))
- # 预测所有数据
- all_pred = clf.predict(X)
- all_proba = clf.predict_proba(X)
- # 添加预测结果到DataFrame
- df_aligned['state'] = all_pred
- df_aligned['prob_ranging'] = all_proba[:, 0]
- df_aligned['prob_trend'] = all_proba[:, 1]
- df_aligned['prob_reversal'] = all_proba[:, 2]
- # 特征重要性
- feature_importance = pd.DataFrame({
- 'feature': X.columns,
- 'importance': clf.feature_importances_
- }).sort_values('importance', ascending=False)
- print("\n特征重要性 TOP 10:")
- print(feature_importance.head(10).to_string(index=False))
- return clf, df_aligned, feature_importance
- def analyze_regime_distribution(df_result):
- """分析市场状态分布"""
- print("\n" + "="*70)
- print("30分钟市场状态分析")
- print("="*70)
- state_names = ['震荡', '趋势', '反转']
- # 整体分布
- print("\n【整体分布】")
- for i, name in enumerate(state_names):
- count = (df_result['state'] == i).sum()
- pct = count / len(df_result) * 100
- print(f" {name}: {count}个周期 ({pct:.1f}%)")
- # 按日期统计
- print("\n【最近5个交易日状态分布】")
- df_result['date'] = df_result.index.date
- recent_dates = df_result['date'].unique()[-5:]
- for date in recent_dates:
- day_data = df_result[df_result['date'] == date]
- print(f"\n {date}:")
- for i, name in enumerate(state_names):
- count = (day_data['state'] == i).sum()
- print(f" {name}: {count}个30分钟周期")
- # 当前状态
- latest = df_result.iloc[-1]
- current_state = state_names[int(latest['state'])]
- print("\n【当前状态】")
- print(f" 时间: {df_result.index[-1]}")
- print(f" 收盘价: {latest['close']:.2f}")
- print(f" 市场状态: {current_state}")
- print(f" 置信度: {latest[['prob_ranging', 'prob_trend', 'prob_reversal']].max():.2%}")
- print(f" 概率分布: 震荡{latest['prob_ranging']:.1%} / 趋势{latest['prob_trend']:.1%} / 反转{latest['prob_reversal']:.1%}")
- def main():
- """主程序"""
- print("="*70)
- print("创业板50市场状态分类器 - 30分钟级别")
- print("="*70)
- # 1. 加载5分钟数据
- df_5min = load_5min_data('SZ#399673.txt')
- # 2. 聚合成30分钟数据
- df_30min = resample_to_30min(df_5min)
- # 3. 计算特征
- print("\n计算30分钟技术指标...")
- features = calculate_features_30min(df_30min)
- print(f"特征数量: {features.shape[1]}")
- # 4. 定义标签
- print("\n定义市场状态标签...")
- labels = define_market_regime_30min(df_30min, lookback=8)
- # 统计标签分布
- unique, counts = np.unique(labels, return_counts=True)
- print("\n标签分布:")
- state_names = ['震荡', '趋势', '反转']
- for u, c in zip(unique, counts):
- print(f" {state_names[u]}: {c}个周期 ({c/len(labels)*100:.1f}%)")
- # 5. 训练并预测
- clf, df_result, importance = train_and_predict(df_30min, features, labels)
- # 6. 分析结果
- analyze_regime_distribution(df_result)
- # 7. 保存结果
- print("\n保存结果...")
- df_result.to_csv('cyb50_30min_regime_result.csv')
- print("[OK] 结果已保存: cyb50_30min_regime_result.csv")
- # 保存模型
- import pickle
- with open('rf_classifier_30min.pkl', 'wb') as f:
- pickle.dump(clf, f)
- print("[OK] 模型已保存: rf_classifier_30min.pkl")
- print("\n" + "="*70)
- if __name__ == "__main__":
- main()
|