#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数点位量化交易策略回测框架 训练集:2017-2023 | 验证集:2024-2025 """ import pandas as pd import numpy as np import matplotlib.pyplot as plt from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') # 设置中文显示 plt.rcParams['font.sans-serif'] = ['DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False # ==================== 1. 数据生成/加载 ==================== def generate_mock_data(start='2017-01-01', end='2025-12-31', seed=42): """ 生成模拟的创业板50指数数据(用于演示框架) 实际使用时应替换为真实数据 """ np.random.seed(seed) dates = pd.date_range(start=start, end=end, freq='D') # 只保留交易日(简化:去掉周末) dates = dates[dates.dayofweek < 5] n = len(dates) # 生成带有趋势和波动的价格序列 # 创业板50基准约2000点 returns = np.random.normal(0.0003, 0.016, n) # 日均收益0.03%,波动1.6% # 添加一些趋势性(牛市、熊市、震荡) # 2019-2021牛市 bull_mask = (dates >= '2019-01-01') & (dates <= '2021-12-31') returns[bull_mask] += 0.001 # 2022熊市 bear_mask = (dates >= '2022-01-01') & (dates <= '2022-12-31') returns[bear_mask] -= 0.001 # 2024-2025震荡 osc_mask = dates >= '2024-01-01' returns[osc_mask] = np.random.normal(0, 0.012, sum(osc_mask)) # 计算价格 price = 2000 * np.exp(np.cumsum(returns)) # 生成OHLC数据 df = pd.DataFrame(index=dates) df['close'] = price df['open'] = price * (1 + np.random.normal(0, 0.005, n)) df['high'] = np.maximum(df[['open', 'close']].max(axis=1) * (1 + np.abs(np.random.normal(0, 0.008, n))), df[['open', 'close']].max(axis=1)) df['low'] = np.minimum(df[['open', 'close']].min(axis=1) * (1 - np.abs(np.random.normal(0, 0.008, n))), df[['open', 'close']].min(axis=1)) return df # ==================== 2. 技术指标计算 ==================== def calculate_atr(high, low, close, period=20): """计算ATR(平均真实波幅)""" tr1 = high - low tr2 = abs(high - close.shift(1)) tr3 = abs(low - close.shift(1)) tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) atr = tr.rolling(window=period).mean() return atr def calculate_rsrs(high, low, n=20, m=250): """ 计算RSRS指标(阻力支撑相对强度)- 优化版 返回: (rsrs_score, r_squared) """ # 使用rolling计算斜率和R² def rolling_beta(x): if len(x) < n or np.std(x[:n//2]) == 0: return np.nan low_vals = x[:n//2] high_vals = x[n//2:] if np.std(low_vals) == 0: return 0 beta = np.corrcoef(low_vals, high_vals)[0,1] * np.std(high_vals) / np.std(low_vals) return beta # 简化计算:使用 rolling.apply slopes = pd.Series(index=high.index, dtype=float) r2s = pd.Series(index=high.index, dtype=float) for i in range(n-1, len(high)): low_window = low.iloc[i-n+1:i+1].values high_window = high.iloc[i-n+1:i+1].values if np.std(low_window) > 0: beta = np.corrcoef(low_window, high_window)[0,1] * np.std(high_window) / np.std(low_window) # R² y_pred = np.mean(high_window) + beta * (low_window - np.mean(low_window)) ss_res = np.sum((high_window - y_pred) ** 2) ss_tot = np.sum((high_window - np.mean(high_window)) ** 2) r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0 slopes.iloc[i] = beta r2s.iloc[i] = r2 # 计算标准分(滚动M日) rsrs = pd.Series(index=high.index, dtype=float) for i in range(m+n-2, len(slopes)): slope_window = slopes.iloc[i-m+1:i+1] if slope_window.std() > 0: zscore = (slopes.iloc[i] - slope_window.mean()) / slope_window.std() rsrs.iloc[i] = zscore * r2s.iloc[i] return rsrs, r2s def calculate_rsi(close, period=14): """计算RSI指标""" delta = close.diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) return rsi # ==================== 3. 市场状态判断 ==================== def classify_state(close, ma20, ma60, ma120, atr_percent, rsrs, close_series): """ 判断市场状态:BULL(牛市)/ BEAR(熊市)/ OSCILLATE(震荡) close_series: 用于计算涨跌幅的收盘价序列 """ # 趋势得分 trend_score = 0 if close > ma20: trend_score += 1 if ma20 > ma60: trend_score += 1 if ma60 > ma120: trend_score += 1 # 波动率 high_volatility = atr_percent > 5 # 熔断检测(极端情况) daily_return = close_series.pct_change().iloc[-1] if len(close_series) > 1 else 0 crash = daily_return < -0.07 if crash or (trend_score <= 1 and high_volatility and close < ma60): return "BEAR" elif trend_score >= 2 and not high_volatility: return "BULL" else: return "OSCILLATE" # ==================== 4. 策略核心 ==================== class CYB50Strategy: """创业板50指数交易策略""" def __init__(self, params=None): # 默认参数 self.params = params or { 'rsrs_n': 20, 'rsrs_m': 250, 'bull_buy': 0.5, 'bull_sell': -0.7, 'bear_buy': 1.5, 'bull_max': 1.0, 'osc_max': 0.6, 'stop_loss': 0.10, 'min_change': 0.20 } self.current_position = 0 self.state_history = [] self.entry_price = None def generate_signal(self, data): """生成交易信号""" close = data['close'] high = data['high'] low = data['low'] # 计算指标 rsrs, r2 = calculate_rsrs(high, low, self.params['rsrs_n'], self.params['rsrs_m']) ma20 = close.rolling(20).mean() ma60 = close.rolling(60).mean() ma120 = close.rolling(120).mean() atr = calculate_atr(high, low, close, 20) atr_percent = atr / close * 100 # 获取当前值 curr_rsrs = rsrs.iloc[-1] curr_close = close.iloc[-1] curr_ma20 = ma20.iloc[-1] curr_ma60 = ma60.iloc[-1] curr_ma120 = ma120.iloc[-1] curr_atr_pct = atr_percent.iloc[-1] # 检查是否有足够数据 if pd.isna(curr_rsrs): return 0, "INIT" # 判断市场状态 state = classify_state(curr_close, curr_ma20, curr_ma60, curr_ma120, curr_atr_pct, curr_rsrs, close) # 状态防抖(连续3日确认,极端情况除外) self.state_history.append(state) if len(self.state_history) >= 3: # 熔断检测:单日大跌立即转熊 daily_return = close.pct_change().iloc[-1] if daily_return < -0.07: state = "BEAR" elif len(self.state_history) >= 3: # 正常防抖 recent_states = self.state_history[-3:] if len(set(recent_states)) > 1: state = self.state_history[-2] if len(self.state_history) >= 2 else state # 根据状态确定仓位 target_pos = self._calculate_position(state, curr_rsrs, curr_atr_pct) # 止损检查 if self.entry_price is not None and self.current_position > 0: current_drawdown = (curr_close - self.entry_price) / self.entry_price if current_drawdown < -self.params['stop_loss']: target_pos = 0 self.entry_price = None # 最小调仓幅度过滤 if abs(target_pos - self.current_position) < self.params['min_change']: target_pos = self.current_position # 更新入场价 if target_pos > 0 and self.current_position == 0: self.entry_price = curr_close elif target_pos == 0: self.entry_price = None self.current_position = target_pos return target_pos, state def _calculate_position(self, state, rsrs, atr_percent): """根据状态和指标计算目标仓位""" p = self.params if state == "BULL": if rsrs > p['bull_buy']: pos = p['bull_max'] elif rsrs < p['bull_sell']: pos = 0 else: pos = p['bull_max'] * 0.5 elif state == "BEAR": # 熊市:空仓为主 if rsrs < -p['bear_buy']: pos = 0.1 # 极端超卖,10%仓位博反弹 else: pos = 0 else: # OSCILLATE if rsrs > 0.7: pos = p['osc_max'] elif rsrs < -0.7: pos = 0 else: pos = p['osc_max'] * 0.5 # 波动率调整 if atr_percent > 5: pos *= 0.6 return np.clip(pos, 0, 1) # ==================== 5. 回测引擎 ==================== def backtest(data, strategy, initial_capital=1000000, start_date=None, end_date=None): """ 回测引擎 """ # 数据切片 if start_date: data = data[data.index >= start_date] if end_date: data = data[data.index <= end_date] dates = [] positions = [] navs = [] states = [] capital = initial_capital current_nav = 1.0 # 跳过前250日(warm-up) start_idx = 250 for i in range(start_idx, len(data)): curr_data = data.iloc[:i+1] curr_date = data.index[i] # 获取信号 position, state = strategy.generate_signal(curr_data) # 计算当日收益(使用前一日仓位) if i > start_idx: daily_return = data['close'].iloc[i] / data['close'].iloc[i-1] - 1 prev_position = positions[-1] if positions else 0 strategy_return = daily_return * prev_position current_nav *= (1 + strategy_return) dates.append(curr_date) positions.append(position) navs.append(current_nav) states.append(state) # 构建结果DataFrame results = pd.DataFrame({ 'date': dates, 'position': positions, 'nav': navs, 'state': states }).set_index('date') # 计算指数基准 index_data = data.iloc[start_idx:].copy() results['index_close'] = index_data['close'] results['index_nav'] = results['index_close'] / results['index_close'].iloc[0] # 计算指标 metrics = calculate_metrics(results['nav'], results['index_nav']) return results, metrics def calculate_metrics(strategy_nav, index_nav): """计算回测指标""" # 收益率 total_return = strategy_nav.iloc[-1] - 1 days = len(strategy_nav) annual_return = (1 + total_return) ** (252 / days) - 1 # 指数收益 index_return = index_nav.iloc[-1] - 1 index_annual = (1 + index_return) ** (252 / days) - 1 # 最大回撤 running_max = strategy_nav.expanding().max() drawdown = (strategy_nav - running_max) / running_max max_drawdown = drawdown.min() # 波动率 daily_returns = strategy_nav.pct_change().dropna() volatility = daily_returns.std() * np.sqrt(252) # 夏普比率(假设无风险利率3%) excess_return = annual_return - 0.03 sharpe = excess_return / volatility if volatility > 0 else 0 # 卡玛比率 calmar = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0 # 胜率 positive_days = (daily_returns > 0).sum() total_days = len(daily_returns) win_rate = positive_days / total_days # Beta index_returns = index_nav.pct_change().dropna() covariance = daily_returns.cov(index_returns) variance = index_returns.var() beta = covariance / variance if variance > 0 else 1 # 年化超额收益 excess_annual = annual_return - index_annual return { 'total_return': total_return, 'annual_return': annual_return, 'index_return': index_return, 'index_annual': index_annual, 'excess_annual': excess_annual, 'max_drawdown': max_drawdown, 'volatility': volatility, 'sharpe': sharpe, 'calmar': calmar, 'win_rate': win_rate, 'beta': beta, 'trading_days': days } # ==================== 6. 参数优化 ==================== def grid_search(data, param_grid): """网格搜索最优参数 - 简化版""" best_score = -999 best_params = None best_metrics = None # 只测试1组参数(加速演示) test_params = [ {'rsrs_n': 20, 'rsrs_m': 250, 'bull_buy': 0.5, 'bull_sell': -0.7, 'bear_buy': 1.5, 'bull_max': 1.0, 'osc_max': 0.6, 'stop_loss': 0.10, 'min_change': 0.20}, ] for params in test_params: print(f"测试参数: {params}") strategy = CYB50Strategy(params) results, metrics = backtest(data, strategy, start_date='2018-02-01', end_date='2023-12-31') # 综合评分 score = metrics['sharpe'] * 0.4 + metrics['calmar'] * 0.4 + metrics['excess_annual'] * 2 print(f" 年化: {metrics['annual_return']*100:.1f}%, 回撤: {metrics['max_drawdown']*100:.1f}%, 夏普: {metrics['sharpe']:.2f}, 评分: {score:.2f}") if score > best_score and metrics['max_drawdown'] > -0.40: best_score = score best_params = params best_metrics = metrics return best_params, best_metrics # ==================== 7. 可视化 ==================== def plot_results(results, title="Backtest Results"): """绘制回测结果""" fig, axes = plt.subplots(3, 1, figsize=(12, 10)) # 净值曲线 ax1 = axes[0] ax1.plot(results.index, results['nav'], label='Strategy', linewidth=2) ax1.plot(results.index, results['index_nav'], label='Index', linewidth=1, alpha=0.7) ax1.set_title(f'{title} - NAV') ax1.set_ylabel('NAV') ax1.legend() ax1.grid(True, alpha=0.3) # 仓位变化 ax2 = axes[1] ax2.fill_between(results.index, 0, results['position'], alpha=0.3, label='Position') ax2.set_ylabel('Position') ax2.set_ylim(0, 1.1) ax2.legend() ax2.grid(True, alpha=0.3) # 回撤 ax3 = axes[2] running_max = results['nav'].expanding().max() drawdown = (results['nav'] - running_max) / running_max ax3.fill_between(results.index, drawdown, 0, alpha=0.3, color='red') ax3.set_ylabel('Drawdown') ax3.set_xlabel('Date') ax3.grid(True, alpha=0.3) plt.tight_layout() return fig # ==================== 8. 主程序 ==================== def main(): print("="*60) print("创业板50指数量化交易策略回测") print("="*60) # 加载数据(使用模拟数据演示) print("\n[1] 生成模拟数据...") data = generate_mock_data('2017-01-01', '2025-12-31') print(f"数据区间: {data.index[0]} ~ {data.index[-1]}, 共{len(data)}个交易日") # 划分训练集和验证集 train_end = '2023-12-31' val_start = '2024-01-01' # 训练阶段(参数优化) print("\n[2] 训练阶段:参数优化 (2018-2023)...") best_params, train_metrics = grid_search(data, None) print(f"\n最优参数:") for k, v in best_params.items(): print(f" {k}: {v}") print(f"\n训练集表现 (2018-2023):") print(f" 策略年化收益: {train_metrics['annual_return']*100:.2f}%") print(f" 指数年化收益: {train_metrics['index_annual']*100:.2f}%") print(f" 超额收益: {train_metrics['excess_annual']*100:.2f}%") print(f" 最大回撤: {train_metrics['max_drawdown']*100:.2f}%") print(f" 夏普比率: {train_metrics['sharpe']:.2f}") print(f" 卡玛比率: {train_metrics['calmar']:.2f}") print(f" 胜率: {train_metrics['win_rate']*100:.1f}%") print(f" Beta: {train_metrics['beta']:.2f}") # 使用最优参数回测训练集(获取完整结果) strategy = CYB50Strategy(best_params) train_results, _ = backtest(data, strategy, start_date='2018-02-01', end_date=train_end) # 验证阶段(样本外) print(f"\n[3] 验证阶段:样本外测试 (2024-2025)...") strategy_val = CYB50Strategy(best_params) val_results, val_metrics = backtest(data, strategy_val, start_date=val_start, end_date='2025-12-31') print(f"\n验证集表现 (2024-2025):") print(f" 策略年化收益: {val_metrics['annual_return']*100:.2f}%") print(f" 指数年化收益: {val_metrics['index_annual']*100:.2f}%") print(f" 超额收益: {val_metrics['excess_annual']*100:.2f}%") print(f" 最大回撤: {val_metrics['max_drawdown']*100:.2f}%") print(f" 夏普比率: {val_metrics['sharpe']:.2f}") print(f" 卡玛比率: {val_metrics['calmar']:.2f}") # 过拟合检测 sharpe_decay = (train_metrics['sharpe'] - val_metrics['sharpe']) / train_metrics['sharpe'] if train_metrics['sharpe'] != 0 else 0 print(f"\n[4] 过拟合检测:") print(f" 夏普比率衰减: {sharpe_decay*100:.1f}%") if sharpe_decay > 0.5: print(" ⚠️ 警告:可能存在严重过拟合") elif sharpe_decay > 0.3: print(" ⚠️ 注意:轻度过拟合,建议简化参数") else: print(" ✓ 无过拟合,策略稳健") # 保存结果 print(f"\n[5] 保存结果...") train_results.to_csv('train_results.csv') val_results.to_csv('val_results.csv') print(" 训练集结果: train_results.csv") print(" 验证集结果: val_results.csv") # 绘图 print(f"\n[6] 生成图表...") fig1 = plot_results(train_results, "Training Set (2018-2023)") fig1.savefig('train_backtest.png', dpi=150, bbox_inches='tight') print(" 训练集图表: train_backtest.png") fig2 = plot_results(val_results, "Validation Set (2024-2025)") fig2.savefig('val_backtest.png', dpi=150, bbox_inches='tight') print(" 验证集图表: val_backtest.png") print("\n" + "="*60) print("回测完成") print("="*60) if __name__ == "__main__": main()