| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50市场状态分类器 - 30分钟级别(优化版)
- 专为30分钟交易策略优化,增强交易信号生成
- """
- import numpy as np
- import pandas as pd
- from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
- from sklearn.metrics import classification_report, confusion_matrix
- import warnings
- warnings.filterwarnings('ignore')
- def load_5min_data(filepath='SZ#399673.txt'):
- """加载5分钟数据文件"""
- print(f"加载5分钟数据: {filepath}")
- df = pd.read_csv(filepath, sep='\t', skiprows=2, encoding='gbk', header=None,
- comment='#', on_bad_lines='skip')
- df.columns = ['date', 'time', 'open', 'high', 'low', 'close', 'volume', 'amount']
- df = df[df['date'].astype(str).str.match(r'\d{4}/\d{2}/\d{2}')].copy()
- def format_time(t):
- if pd.isna(t):
- return '0000'
- t = int(t)
- return f"{t:04d}"
- df['time_str'] = df['time'].apply(format_time)
- df['datetime'] = pd.to_datetime(df['date'] + ' ' + df['time_str'],
- format='%Y/%m/%d %H%M')
- df = df.set_index('datetime').sort_index()
- df = df.drop('time_str', axis=1)
- for col in ['open', 'high', 'low', 'close', 'volume', 'amount']:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- print(f"[OK] 加载成功: {len(df)}条5分钟数据")
- print(f" 日期范围: {df.index[0]} ~ {df.index[-1]}")
-
- return df
- def resample_to_30min(df_5min):
- """将5分钟数据聚合成30分钟数据"""
- print("\n聚合成30分钟数据...")
- df_30min = df_5min.resample('30min').agg({
- 'open': 'first',
- 'high': 'max',
- 'low': 'min',
- 'close': 'last',
- 'volume': 'sum',
- 'amount': 'sum'
- }).dropna()
- df_30min['return'] = df_30min['close'].pct_change()
-
- # 计算K线实体和影线
- df_30min['body'] = df_30min['close'] - df_30min['open']
- df_30min['upper_shadow'] = df_30min['high'] - df_30min[['open', 'close']].max(axis=1)
- df_30min['lower_shadow'] = df_30min[['open', 'close']].min(axis=1) - df_30min['low']
- df_30min['body_pct'] = abs(df_30min['body']) / (df_30min['high'] - df_30min['low'] + 1e-10)
- print(f"[OK] 聚合完成: {len(df_30min)}条30分钟数据")
- return df_30min
- def calculate_features_30min_v2(df):
- """优化版30分钟特征计算 - 更适合交易决策"""
- features = pd.DataFrame(index=df.index)
- features['close'] = df['close']
- # ========== 1. 收益率特征 ==========
- features['ret_1'] = df['return'] # 当前周期
- features['ret_2'] = df['close'].pct_change(2) # 1小时
- features['ret_4'] = df['close'].pct_change(4) # 2小时
- features['ret_8'] = df['close'].pct_change(8) # 半日
- features['ret_16'] = df['close'].pct_change(16) # 1日
-
- # 累计收益率
- features['cum_ret_4h'] = (df['close'] / df['close'].shift(8) - 1) # 4小时累计
- features['cum_ret_1d'] = (df['close'] / df['close'].shift(16) - 1) # 1日累计
- # ========== 2. 波动率特征 ==========
- features['volatility_4'] = df['return'].rolling(4).std() * np.sqrt(48)
- features['volatility_8'] = df['return'].rolling(8).std() * np.sqrt(48)
- features['volatility_16'] = df['return'].rolling(16).std() * np.sqrt(48)
- features['vol_ratio'] = features['volatility_4'] / (features['volatility_16'] + 1e-10)
-
- # 波动率变化
- features['vol_change'] = features['volatility_8'].diff()
- # ========== 3. 趋势特征 ==========
- # 多周期均线
- features['ma4'] = df['close'].rolling(4).mean() # 2小时
- features['ma8'] = df['close'].rolling(8).mean() # 半日
- features['ma16'] = df['close'].rolling(16).mean() # 1日
- features['ma48'] = df['close'].rolling(48).mean() # 3日
-
- # 均线关系
- features['ma4_above_ma8'] = (features['ma4'] > features['ma8']).astype(int)
- features['ma8_above_ma16'] = (features['ma8'] > features['ma16']).astype(int)
- features['ma_slope_4'] = features['ma4'].diff(4) / features['ma4'].shift(4) * 100
-
- # 价格与均线偏离
- features['dist_to_ma4'] = (df['close'] - features['ma4']) / features['ma4'] * 100
- features['dist_to_ma16'] = (df['close'] - features['ma16']) / features['ma16'] * 100
- # ========== 4. 动量指标 ==========
- # RSI
- delta = df['close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(14).mean()
- rs = gain / (loss + 1e-10)
- features['rsi_14'] = 100 - (100 / (1 + rs))
- features['rsi_change'] = features['rsi_14'].diff(2)
-
- # RSI状态
- features['rsi_overbought'] = (features['rsi_14'] > 70).astype(int)
- features['rsi_oversold'] = (features['rsi_14'] < 30).astype(int)
- features['rsi_neutral'] = ((features['rsi_14'] >= 40) & (features['rsi_14'] <= 60)).astype(int)
- # ========== 5. MACD ==========
- ema12 = df['close'].ewm(span=12).mean()
- ema26 = df['close'].ewm(span=26).mean()
- features['macd'] = ema12 - ema26
- features['macd_signal'] = features['macd'].ewm(span=9).mean()
- features['macd_hist'] = features['macd'] - features['macd_signal']
- features['macd_cross'] = ((features['macd'] > features['macd_signal']) &
- (features['macd'].shift(1) <= features['macd_signal'].shift(1))).astype(int)
- # ========== 6. 布林带 ==========
- features['bb_middle'] = df['close'].rolling(20).mean()
- bb_std = df['close'].rolling(20).std()
- features['bb_upper'] = features['bb_middle'] + 2 * bb_std
- features['bb_lower'] = features['bb_middle'] - 2 * bb_std
- features['bb_width'] = (features['bb_upper'] - features['bb_lower']) / features['bb_middle'] * 100
- features['bb_position'] = (df['close'] - features['bb_lower']) / (features['bb_upper'] - features['bb_lower'] + 1e-10)
-
- # 是否触及上下轨
- features['touch_upper'] = (df['close'] >= features['bb_upper'] * 0.995).astype(int)
- features['touch_lower'] = (df['close'] <= features['bb_lower'] * 1.005).astype(int)
- # ========== 7. K线形态特征 ==========
- features['body_pct'] = df['body_pct']
- features['upper_shadow_ratio'] = df['upper_shadow'] / (df['high'] - df['low'] + 1e-10)
- features['lower_shadow_ratio'] = df['lower_shadow'] / (df['high'] - df['low'] + 1e-10)
-
- # 锤子/吊颈线识别
- features['hammer'] = ((features['lower_shadow_ratio'] > 0.6) &
- (features['body_pct'] < 0.3)).astype(int)
- features['hanging_man'] = ((features['upper_shadow_ratio'] > 0.6) &
- (features['body_pct'] < 0.3)).astype(int)
- # ========== 8. 成交量特征 ==========
- features['volume_ratio'] = df['volume'] / df['volume'].rolling(16).mean()
- features['volume_spike'] = (features['volume_ratio'] > 2).astype(int)
- features['volume_trend'] = df['volume'].rolling(8).apply(lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) == 8 else 0)
-
- # 量价关系
- features['vol_price_corr'] = df['volume'].rolling(8).corr(df['close'])
- # ========== 9. ATR与波动 ==========
- high_low = df['high'] - df['low']
- high_close = abs(df['high'] - df['close'].shift())
- low_close = abs(df['low'] - df['close'].shift())
- tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
- features['atr_14'] = tr.rolling(14).mean()
- features['atr_ratio'] = features['atr_14'] / df['close'] * 100
- # ========== 10. 时间特征 ==========
- features['hour'] = df.index.hour
- features['minute'] = df.index.minute
- features['is_open'] = ((df.index.hour == 9) & (df.index.minute == 30)).astype(int)
- features['is_morning'] = ((df.index.hour >= 9) & (df.index.hour < 11)).astype(int)
- features['is_afternoon'] = ((df.index.hour >= 13) & (df.index.hour < 15)).astype(int)
- features['is_close'] = ((df.index.hour == 15) & (df.index.minute == 0)).astype(int)
- # ========== 11. 价格行为 ==========
- # 连续涨跌
- features['consecutive_up'] = (df['return'] > 0).astype(int).groupby(
- (df['return'] <= 0).astype(int).cumsum()).cumsum()
- features['consecutive_down'] = (df['return'] < 0).astype(int).groupby(
- (df['return'] >= 0).astype(int).cumsum()).cumsum()
-
- # 加速度
- features['price_accel'] = df['close'].diff().diff()
- features['return_accel'] = df['return'].diff()
- # ========== 12. 支撑阻力 ==========
- # 近期高低点
- features['near_high_8'] = (df['close'] >= df['high'].rolling(8).max() * 0.995).astype(int)
- features['near_low_8'] = (df['close'] <= df['low'].rolling(8).min() * 1.005).astype(int)
-
- # 填充缺失值
- features = features.ffill().fillna(0)
- return features
- def define_market_regime_30min_v2(df, features, lookback=8):
- """
- 优化版30分钟市场状态标签定义
-
- 状态定义:
- 0 = 震荡 - 适合观望或区间交易
- 1 = 趋势 - 适合趋势跟随
- 2 = 反转 - 适合反向交易或减仓
- """
- labels = []
- n = len(df)
-
- for i in range(n):
- if i < lookback:
- labels.append(0)
- continue
-
- # 获取回看窗口数据
- window_close = df['close'].iloc[i-lookback:i]
- window_high = df['high'].iloc[i-lookback:i]
- window_low = df['low'].iloc[i-lookback:i]
- window_rsi = features['rsi_14'].iloc[i-lookback:i]
- window_vol = features['volatility_4'].iloc[i-lookback:i]
-
- start_price = window_close.iloc[0]
- end_price = window_close.iloc[-1]
- period_return = (end_price / start_price - 1) * 100
-
- # 波动率
- volatility = window_vol.mean()
-
- # 价格区间
- max_price = window_high.max()
- min_price = window_low.min()
- price_range = (max_price - min_price) / start_price * 100
-
- # RSI特征
- rsi_start = window_rsi.iloc[0]
- rsi_end = window_rsi.iloc[-1]
- rsi_change = rsi_end - rsi_start
- rsi_max = window_rsi.max()
- rsi_min = window_rsi.min()
-
- # 判断逻辑
- label = 0 # 默认震荡
-
- # ===== 反转信号判断 =====
- reversal_signals = 0
-
- # RSI极值反转
- if (rsi_start > 70 and rsi_change < -15) or (rsi_start < 30 and rsi_change > 15):
- reversal_signals += 2
- elif (rsi_max > 75 or rsi_min < 25):
- reversal_signals += 1
-
- # 价格触及极端后回落
- if price_range > 4 and abs(period_return) < 1:
- reversal_signals += 1
-
- # RSI背离
- if period_return > 2 and rsi_change < -5:
- reversal_signals += 2
- elif period_return < -2 and rsi_change > 5:
- reversal_signals += 2
-
- # 布林带触及
- bb_pos = features['bb_position'].iloc[i]
- if (bb_pos > 0.95 and period_return < 0) or (bb_pos < 0.05 and period_return > 0):
- reversal_signals += 1
-
- if reversal_signals >= 3:
- label = 2 # 反转
-
- # ===== 趋势信号判断 =====
- elif label == 0: # 不是反转才判断趋势
- trend_signals = 0
-
- # 明显的价格方向
- if abs(period_return) >= 2.5:
- trend_signals += 2
- elif abs(period_return) >= 1.5:
- trend_signals += 1
-
- # 低波动率(趋势市场通常波动率适中)
- if 10 < volatility < 30:
- trend_signals += 1
-
- # RSI趋势一致
- if period_return > 0 and rsi_change > 5:
- trend_signals += 1
- elif period_return < 0 and rsi_change < -5:
- trend_signals += 1
-
- # 均线排列
- if features['ma4_above_ma8'].iloc[i] == 1 and period_return > 0:
- trend_signals += 1
- elif features['ma4_above_ma8'].iloc[i] == 0 and period_return < 0:
- trend_signals += 1
-
- # MACD支持
- if features['macd_hist'].iloc[i] * period_return > 0:
- trend_signals += 1
-
- if trend_signals >= 4:
- label = 1 # 趋势
-
- labels.append(label)
-
- return np.array(labels)
- def backtest_strategy(df_result, initial_capital=1000000):
- """
- 基于30分钟状态识别进行策略回测
-
- 策略规则:
- - 震荡:观望(不持仓)
- - 趋势:跟随趋势(买入或做空)
- - 反转:反向交易或减仓
- """
- print("\n" + "="*70)
- print("30分钟状态策略回测")
- print("="*70)
-
- # 计算收益率
- df_result = df_result.copy()
- df_result['ret'] = df_result['close'].pct_change()
-
- capital = initial_capital
- position = 0 # 0=空仓, 1=做多, -1=做空
- entry_price = 0
- trades = []
-
- for i in range(1, len(df_result)):
- current = df_result.iloc[i]
- prev = df_result.iloc[i-1]
-
- state = int(current['state'])
- price = current['close']
- ret = current['ret']
-
- # 状态转换信号
- if position == 0: # 空仓
- if state == 1: # 趋势 -> 开仓
- # 根据当前趋势方向决定多空
- position = 1 if ret > 0 else -1
- entry_price = price
- trades.append({
- 'time': df_result.index[i],
- 'action': 'OPEN',
- 'position': 'LONG' if position == 1 else 'SHORT',
- 'price': price
- })
-
- else: # 有持仓
- # 检查出场条件
- exit_signal = False
-
- if state == 2: # 反转信号 -> 出场
- exit_signal = True
- elif state == 0: # 震荡 -> 出场观望
- exit_signal = True
- elif position == 1 and ret < -0.008: # 做多止损0.8%
- exit_signal = True
- elif position == -1 and ret > 0.008: # 做空止损0.8%
- exit_signal = True
-
- if exit_signal:
- pnl = (price / entry_price - 1) * position
- capital *= (1 + pnl)
- trades.append({
- 'time': df_result.index[i],
- 'action': 'CLOSE',
- 'position': 'LONG' if position == 1 else 'SHORT',
- 'price': price,
- 'pnl': pnl
- })
- position = 0
-
- # 计算回测结果
- total_return = (capital / initial_capital - 1) * 100
-
- print(f"\n初始资金: {initial_capital:,.0f}")
- print(f"最终资金: {capital:,.0f}")
- print(f"总收益率: {total_return:+.2f}%")
- print(f"交易次数: {len([t for t in trades if t['action'] == 'CLOSE'])}")
-
- if len(trades) > 0:
- closes = [t for t in trades if t['action'] == 'CLOSE']
- wins = len([t for t in closes if t.get('pnl', 0) > 0])
- win_rate = wins / len(closes) * 100 if closes else 0
- print(f"胜率: {win_rate:.1f}%")
-
- return trades, capital
- def train_and_evaluate(df_30min, features, labels):
- """训练和评估模型"""
- print("\n训练30分钟分类器...")
-
- valid_idx = ~np.isnan(labels)
- X = features[valid_idx]
- y = labels[valid_idx]
- df_aligned = df_30min.iloc[valid_idx].copy()
-
- # 时间序列分割
- split_idx = int(len(X) * 0.8)
- X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
- y_train, y_test = y[:split_idx], y[split_idx:]
-
- print(f"训练集: {len(X_train)}条")
- print(f"测试集: {len(X_test)}条")
-
- # 使用GBDT(通常比RF更适合时序)
- clf = GradientBoostingClassifier(
- n_estimators=150,
- max_depth=5,
- learning_rate=0.1,
- random_state=42
- )
-
- clf.fit(X_train, y_train)
-
- train_score = clf.score(X_train, y_train)
- test_score = clf.score(X_test, y_test)
-
- print(f"\n训练准确率: {train_score:.2%}")
- print(f"测试准确率: {test_score:.2%}")
-
- y_pred = clf.predict(X_test)
- print("\n分类报告:")
- print(classification_report(y_test, y_pred, target_names=['震荡', '趋势', '反转']))
-
- # 预测所有数据
- all_pred = clf.predict(X)
- all_proba = clf.predict_proba(X)
-
- df_aligned['state'] = all_pred
- df_aligned['prob_ranging'] = all_proba[:, 0]
- df_aligned['prob_trend'] = all_proba[:, 1]
- df_aligned['prob_reversal'] = all_proba[:, 2]
-
- # 特征重要性
- importance = pd.DataFrame({
- 'feature': X.columns,
- 'importance': clf.feature_importances_
- }).sort_values('importance', ascending=False)
-
- print("\n特征重要性 TOP 15:")
- print(importance.head(15).to_string(index=False))
-
- return clf, df_aligned, importance
- def main():
- """主程序"""
- print("="*70)
- print("创业板50市场状态分类器 - 30分钟级别(优化版)")
- print("="*70)
-
- # 1. 加载数据
- df_5min = load_5min_data('SZ#399673.txt')
-
- # 2. 聚合30分钟
- df_30min = resample_to_30min(df_5min)
-
- # 3. 计算优化特征
- print("\n计算30分钟特征(优化版)...")
- features = calculate_features_30min_v2(df_30min)
- print(f"特征数量: {features.shape[1]}")
-
- # 4. 定义标签
- print("\n定义市场状态标签(优化版)...")
- labels = define_market_regime_30min_v2(df_30min, features, lookback=8)
-
- # 统计
- unique, counts = np.unique(labels, return_counts=True)
- print("\n标签分布:")
- state_names = ['震荡', '趋势', '反转']
- for u, c in zip(unique, counts):
- print(f" {state_names[u]}: {c}个周期 ({c/len(labels)*100:.1f}%)")
-
- # 5. 训练模型
- clf, df_result, importance = train_and_evaluate(df_30min, features, labels)
-
- # 6. 策略回测
- trades, final_capital = backtest_strategy(df_result)
-
- # 7. 保存结果
- print("\n保存结果...")
- df_result.to_csv('cyb50_30min_regime_v2.csv')
- print("[OK] 结果已保存: cyb50_30min_regime_v2.csv")
-
- import pickle
- with open('rf_classifier_30min_v2.pkl', 'wb') as f:
- pickle.dump(clf, f)
- print("[OK] 模型已保存: rf_classifier_30min_v2.pkl")
-
- print("\n" + "="*70)
- if __name__ == "__main__":
- main()
|