| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50指数点位量化交易策略回测框架
- 训练集:2017-2023 | 验证集:2024-2025
- """
- import pandas as pd
- import numpy as np
- import matplotlib.pyplot as plt
- from datetime import datetime, timedelta
- import warnings
- warnings.filterwarnings('ignore')
- # 设置中文显示
- plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
- plt.rcParams['axes.unicode_minus'] = False
- # ==================== 1. 数据生成/加载 ====================
- def generate_mock_data(start='2017-01-01', end='2025-12-31', seed=42):
- """
- 生成模拟的创业板50指数数据(用于演示框架)
- 实际使用时应替换为真实数据
- """
- np.random.seed(seed)
- dates = pd.date_range(start=start, end=end, freq='D')
- # 只保留交易日(简化:去掉周末)
- dates = dates[dates.dayofweek < 5]
-
- n = len(dates)
-
- # 生成带有趋势和波动的价格序列
- # 创业板50基准约2000点
- returns = np.random.normal(0.0003, 0.016, n) # 日均收益0.03%,波动1.6%
-
- # 添加一些趋势性(牛市、熊市、震荡)
- # 2019-2021牛市
- bull_mask = (dates >= '2019-01-01') & (dates <= '2021-12-31')
- returns[bull_mask] += 0.001
-
- # 2022熊市
- bear_mask = (dates >= '2022-01-01') & (dates <= '2022-12-31')
- returns[bear_mask] -= 0.001
-
- # 2024-2025震荡
- osc_mask = dates >= '2024-01-01'
- returns[osc_mask] = np.random.normal(0, 0.012, sum(osc_mask))
-
- # 计算价格
- price = 2000 * np.exp(np.cumsum(returns))
-
- # 生成OHLC数据
- df = pd.DataFrame(index=dates)
- df['close'] = price
- df['open'] = price * (1 + np.random.normal(0, 0.005, n))
- df['high'] = np.maximum(df[['open', 'close']].max(axis=1) * (1 + np.abs(np.random.normal(0, 0.008, n))),
- df[['open', 'close']].max(axis=1))
- df['low'] = np.minimum(df[['open', 'close']].min(axis=1) * (1 - np.abs(np.random.normal(0, 0.008, n))),
- df[['open', 'close']].min(axis=1))
-
- return df
- # ==================== 2. 技术指标计算 ====================
- def calculate_atr(high, low, close, period=20):
- """计算ATR(平均真实波幅)"""
- tr1 = high - low
- tr2 = abs(high - close.shift(1))
- tr3 = abs(low - close.shift(1))
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
- atr = tr.rolling(window=period).mean()
- return atr
- def calculate_rsrs(high, low, n=20, m=250):
- """
- 计算RSRS指标(阻力支撑相对强度)- 优化版
- 返回: (rsrs_score, r_squared)
- """
- # 使用rolling计算斜率和R²
- def rolling_beta(x):
- if len(x) < n or np.std(x[:n//2]) == 0:
- return np.nan
- low_vals = x[:n//2]
- high_vals = x[n//2:]
- if np.std(low_vals) == 0:
- return 0
- beta = np.corrcoef(low_vals, high_vals)[0,1] * np.std(high_vals) / np.std(low_vals)
- return beta
-
- # 简化计算:使用 rolling.apply
- slopes = pd.Series(index=high.index, dtype=float)
- r2s = pd.Series(index=high.index, dtype=float)
-
- for i in range(n-1, len(high)):
- low_window = low.iloc[i-n+1:i+1].values
- high_window = high.iloc[i-n+1:i+1].values
-
- if np.std(low_window) > 0:
- beta = np.corrcoef(low_window, high_window)[0,1] * np.std(high_window) / np.std(low_window)
- # R²
- y_pred = np.mean(high_window) + beta * (low_window - np.mean(low_window))
- ss_res = np.sum((high_window - y_pred) ** 2)
- ss_tot = np.sum((high_window - np.mean(high_window)) ** 2)
- r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
-
- slopes.iloc[i] = beta
- r2s.iloc[i] = r2
-
- # 计算标准分(滚动M日)
- rsrs = pd.Series(index=high.index, dtype=float)
- for i in range(m+n-2, len(slopes)):
- slope_window = slopes.iloc[i-m+1:i+1]
- if slope_window.std() > 0:
- zscore = (slopes.iloc[i] - slope_window.mean()) / slope_window.std()
- rsrs.iloc[i] = zscore * r2s.iloc[i]
-
- return rsrs, r2s
- def calculate_rsi(close, period=14):
- """计算RSI指标"""
- delta = close.diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
- rs = gain / loss
- rsi = 100 - (100 / (1 + rs))
- return rsi
- # ==================== 3. 市场状态判断 ====================
- def classify_state(close, ma20, ma60, ma120, atr_percent, rsrs, close_series):
- """
- 判断市场状态:BULL(牛市)/ BEAR(熊市)/ OSCILLATE(震荡)
- close_series: 用于计算涨跌幅的收盘价序列
- """
- # 趋势得分
- trend_score = 0
- if close > ma20: trend_score += 1
- if ma20 > ma60: trend_score += 1
- if ma60 > ma120: trend_score += 1
-
- # 波动率
- high_volatility = atr_percent > 5
-
- # 熔断检测(极端情况)
- daily_return = close_series.pct_change().iloc[-1] if len(close_series) > 1 else 0
- crash = daily_return < -0.07
-
- if crash or (trend_score <= 1 and high_volatility and close < ma60):
- return "BEAR"
- elif trend_score >= 2 and not high_volatility:
- return "BULL"
- else:
- return "OSCILLATE"
- # ==================== 4. 策略核心 ====================
- class CYB50Strategy:
- """创业板50指数交易策略"""
-
- def __init__(self, params=None):
- # 默认参数
- self.params = params or {
- 'rsrs_n': 20,
- 'rsrs_m': 250,
- 'bull_buy': 0.5,
- 'bull_sell': -0.7,
- 'bear_buy': 1.5,
- 'bull_max': 1.0,
- 'osc_max': 0.6,
- 'stop_loss': 0.10,
- 'min_change': 0.20
- }
- self.current_position = 0
- self.state_history = []
- self.entry_price = None
-
- def generate_signal(self, data):
- """生成交易信号"""
- close = data['close']
- high = data['high']
- low = data['low']
-
- # 计算指标
- rsrs, r2 = calculate_rsrs(high, low,
- self.params['rsrs_n'],
- self.params['rsrs_m'])
-
- ma20 = close.rolling(20).mean()
- ma60 = close.rolling(60).mean()
- ma120 = close.rolling(120).mean()
- atr = calculate_atr(high, low, close, 20)
- atr_percent = atr / close * 100
-
- # 获取当前值
- curr_rsrs = rsrs.iloc[-1]
- curr_close = close.iloc[-1]
- curr_ma20 = ma20.iloc[-1]
- curr_ma60 = ma60.iloc[-1]
- curr_ma120 = ma120.iloc[-1]
- curr_atr_pct = atr_percent.iloc[-1]
-
- # 检查是否有足够数据
- if pd.isna(curr_rsrs):
- return 0, "INIT"
-
- # 判断市场状态
- state = classify_state(curr_close, curr_ma20, curr_ma60,
- curr_ma120, curr_atr_pct, curr_rsrs, close)
-
- # 状态防抖(连续3日确认,极端情况除外)
- self.state_history.append(state)
- if len(self.state_history) >= 3:
- # 熔断检测:单日大跌立即转熊
- daily_return = close.pct_change().iloc[-1]
- if daily_return < -0.07:
- state = "BEAR"
- elif len(self.state_history) >= 3:
- # 正常防抖
- recent_states = self.state_history[-3:]
- if len(set(recent_states)) > 1:
- state = self.state_history[-2] if len(self.state_history) >= 2 else state
-
- # 根据状态确定仓位
- target_pos = self._calculate_position(state, curr_rsrs, curr_atr_pct)
-
- # 止损检查
- if self.entry_price is not None and self.current_position > 0:
- current_drawdown = (curr_close - self.entry_price) / self.entry_price
- if current_drawdown < -self.params['stop_loss']:
- target_pos = 0
- self.entry_price = None
-
- # 最小调仓幅度过滤
- if abs(target_pos - self.current_position) < self.params['min_change']:
- target_pos = self.current_position
-
- # 更新入场价
- if target_pos > 0 and self.current_position == 0:
- self.entry_price = curr_close
- elif target_pos == 0:
- self.entry_price = None
-
- self.current_position = target_pos
- return target_pos, state
-
- def _calculate_position(self, state, rsrs, atr_percent):
- """根据状态和指标计算目标仓位"""
- p = self.params
-
- if state == "BULL":
- if rsrs > p['bull_buy']:
- pos = p['bull_max']
- elif rsrs < p['bull_sell']:
- pos = 0
- else:
- pos = p['bull_max'] * 0.5
-
- elif state == "BEAR":
- # 熊市:空仓为主
- if rsrs < -p['bear_buy']:
- pos = 0.1 # 极端超卖,10%仓位博反弹
- else:
- pos = 0
-
- else: # OSCILLATE
- if rsrs > 0.7:
- pos = p['osc_max']
- elif rsrs < -0.7:
- pos = 0
- else:
- pos = p['osc_max'] * 0.5
-
- # 波动率调整
- if atr_percent > 5:
- pos *= 0.6
-
- return np.clip(pos, 0, 1)
- # ==================== 5. 回测引擎 ====================
- def backtest(data, strategy, initial_capital=1000000, start_date=None, end_date=None):
- """
- 回测引擎
- """
- # 数据切片
- if start_date:
- data = data[data.index >= start_date]
- if end_date:
- data = data[data.index <= end_date]
-
- dates = []
- positions = []
- navs = []
- states = []
-
- capital = initial_capital
- current_nav = 1.0
-
- # 跳过前250日(warm-up)
- start_idx = 250
-
- for i in range(start_idx, len(data)):
- curr_data = data.iloc[:i+1]
- curr_date = data.index[i]
-
- # 获取信号
- position, state = strategy.generate_signal(curr_data)
-
- # 计算当日收益(使用前一日仓位)
- if i > start_idx:
- daily_return = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
- prev_position = positions[-1] if positions else 0
- strategy_return = daily_return * prev_position
- current_nav *= (1 + strategy_return)
-
- dates.append(curr_date)
- positions.append(position)
- navs.append(current_nav)
- states.append(state)
-
- # 构建结果DataFrame
- results = pd.DataFrame({
- 'date': dates,
- 'position': positions,
- 'nav': navs,
- 'state': states
- }).set_index('date')
-
- # 计算指数基准
- index_data = data.iloc[start_idx:].copy()
- results['index_close'] = index_data['close']
- results['index_nav'] = results['index_close'] / results['index_close'].iloc[0]
-
- # 计算指标
- metrics = calculate_metrics(results['nav'], results['index_nav'])
-
- return results, metrics
- def calculate_metrics(strategy_nav, index_nav):
- """计算回测指标"""
- # 收益率
- total_return = strategy_nav.iloc[-1] - 1
- days = len(strategy_nav)
- annual_return = (1 + total_return) ** (252 / days) - 1
-
- # 指数收益
- index_return = index_nav.iloc[-1] - 1
- index_annual = (1 + index_return) ** (252 / days) - 1
-
- # 最大回撤
- running_max = strategy_nav.expanding().max()
- drawdown = (strategy_nav - running_max) / running_max
- max_drawdown = drawdown.min()
-
- # 波动率
- daily_returns = strategy_nav.pct_change().dropna()
- volatility = daily_returns.std() * np.sqrt(252)
-
- # 夏普比率(假设无风险利率3%)
- excess_return = annual_return - 0.03
- sharpe = excess_return / volatility if volatility > 0 else 0
-
- # 卡玛比率
- calmar = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0
-
- # 胜率
- positive_days = (daily_returns > 0).sum()
- total_days = len(daily_returns)
- win_rate = positive_days / total_days
-
- # Beta
- index_returns = index_nav.pct_change().dropna()
- covariance = daily_returns.cov(index_returns)
- variance = index_returns.var()
- beta = covariance / variance if variance > 0 else 1
-
- # 年化超额收益
- excess_annual = annual_return - index_annual
-
- return {
- 'total_return': total_return,
- 'annual_return': annual_return,
- 'index_return': index_return,
- 'index_annual': index_annual,
- 'excess_annual': excess_annual,
- 'max_drawdown': max_drawdown,
- 'volatility': volatility,
- 'sharpe': sharpe,
- 'calmar': calmar,
- 'win_rate': win_rate,
- 'beta': beta,
- 'trading_days': days
- }
- # ==================== 6. 参数优化 ====================
- def grid_search(data, param_grid):
- """网格搜索最优参数 - 简化版"""
- best_score = -999
- best_params = None
- best_metrics = None
-
- # 只测试1组参数(加速演示)
- test_params = [
- {'rsrs_n': 20, 'rsrs_m': 250, 'bull_buy': 0.5, 'bull_sell': -0.7,
- 'bear_buy': 1.5, 'bull_max': 1.0, 'osc_max': 0.6, 'stop_loss': 0.10, 'min_change': 0.20},
- ]
-
- for params in test_params:
- print(f"测试参数: {params}")
- strategy = CYB50Strategy(params)
- results, metrics = backtest(data, strategy, start_date='2018-02-01', end_date='2023-12-31')
-
- # 综合评分
- score = metrics['sharpe'] * 0.4 + metrics['calmar'] * 0.4 + metrics['excess_annual'] * 2
-
- print(f" 年化: {metrics['annual_return']*100:.1f}%, 回撤: {metrics['max_drawdown']*100:.1f}%, 夏普: {metrics['sharpe']:.2f}, 评分: {score:.2f}")
-
- if score > best_score and metrics['max_drawdown'] > -0.40:
- best_score = score
- best_params = params
- best_metrics = metrics
-
- return best_params, best_metrics
- # ==================== 7. 可视化 ====================
- def plot_results(results, title="Backtest Results"):
- """绘制回测结果"""
- fig, axes = plt.subplots(3, 1, figsize=(12, 10))
-
- # 净值曲线
- ax1 = axes[0]
- ax1.plot(results.index, results['nav'], label='Strategy', linewidth=2)
- ax1.plot(results.index, results['index_nav'], label='Index', linewidth=1, alpha=0.7)
- ax1.set_title(f'{title} - NAV')
- ax1.set_ylabel('NAV')
- ax1.legend()
- ax1.grid(True, alpha=0.3)
-
- # 仓位变化
- ax2 = axes[1]
- ax2.fill_between(results.index, 0, results['position'], alpha=0.3, label='Position')
- ax2.set_ylabel('Position')
- ax2.set_ylim(0, 1.1)
- ax2.legend()
- ax2.grid(True, alpha=0.3)
-
- # 回撤
- ax3 = axes[2]
- running_max = results['nav'].expanding().max()
- drawdown = (results['nav'] - running_max) / running_max
- ax3.fill_between(results.index, drawdown, 0, alpha=0.3, color='red')
- ax3.set_ylabel('Drawdown')
- ax3.set_xlabel('Date')
- ax3.grid(True, alpha=0.3)
-
- plt.tight_layout()
- return fig
- # ==================== 8. 主程序 ====================
- def main():
- print("="*60)
- print("创业板50指数量化交易策略回测")
- print("="*60)
-
- # 加载数据(使用模拟数据演示)
- print("\n[1] 生成模拟数据...")
- data = generate_mock_data('2017-01-01', '2025-12-31')
- print(f"数据区间: {data.index[0]} ~ {data.index[-1]}, 共{len(data)}个交易日")
-
- # 划分训练集和验证集
- train_end = '2023-12-31'
- val_start = '2024-01-01'
-
- # 训练阶段(参数优化)
- print("\n[2] 训练阶段:参数优化 (2018-2023)...")
- best_params, train_metrics = grid_search(data, None)
-
- print(f"\n最优参数:")
- for k, v in best_params.items():
- print(f" {k}: {v}")
-
- print(f"\n训练集表现 (2018-2023):")
- print(f" 策略年化收益: {train_metrics['annual_return']*100:.2f}%")
- print(f" 指数年化收益: {train_metrics['index_annual']*100:.2f}%")
- print(f" 超额收益: {train_metrics['excess_annual']*100:.2f}%")
- print(f" 最大回撤: {train_metrics['max_drawdown']*100:.2f}%")
- print(f" 夏普比率: {train_metrics['sharpe']:.2f}")
- print(f" 卡玛比率: {train_metrics['calmar']:.2f}")
- print(f" 胜率: {train_metrics['win_rate']*100:.1f}%")
- print(f" Beta: {train_metrics['beta']:.2f}")
-
- # 使用最优参数回测训练集(获取完整结果)
- strategy = CYB50Strategy(best_params)
- train_results, _ = backtest(data, strategy, start_date='2018-02-01', end_date=train_end)
-
- # 验证阶段(样本外)
- print(f"\n[3] 验证阶段:样本外测试 (2024-2025)...")
- strategy_val = CYB50Strategy(best_params)
- val_results, val_metrics = backtest(data, strategy_val, start_date=val_start, end_date='2025-12-31')
-
- print(f"\n验证集表现 (2024-2025):")
- print(f" 策略年化收益: {val_metrics['annual_return']*100:.2f}%")
- print(f" 指数年化收益: {val_metrics['index_annual']*100:.2f}%")
- print(f" 超额收益: {val_metrics['excess_annual']*100:.2f}%")
- print(f" 最大回撤: {val_metrics['max_drawdown']*100:.2f}%")
- print(f" 夏普比率: {val_metrics['sharpe']:.2f}")
- print(f" 卡玛比率: {val_metrics['calmar']:.2f}")
-
- # 过拟合检测
- sharpe_decay = (train_metrics['sharpe'] - val_metrics['sharpe']) / train_metrics['sharpe'] if train_metrics['sharpe'] != 0 else 0
- print(f"\n[4] 过拟合检测:")
- print(f" 夏普比率衰减: {sharpe_decay*100:.1f}%")
- if sharpe_decay > 0.5:
- print(" ⚠️ 警告:可能存在严重过拟合")
- elif sharpe_decay > 0.3:
- print(" ⚠️ 注意:轻度过拟合,建议简化参数")
- else:
- print(" ✓ 无过拟合,策略稳健")
-
- # 保存结果
- print(f"\n[5] 保存结果...")
- train_results.to_csv('train_results.csv')
- val_results.to_csv('val_results.csv')
- print(" 训练集结果: train_results.csv")
- print(" 验证集结果: val_results.csv")
-
- # 绘图
- print(f"\n[6] 生成图表...")
- fig1 = plot_results(train_results, "Training Set (2018-2023)")
- fig1.savefig('train_backtest.png', dpi=150, bbox_inches='tight')
- print(" 训练集图表: train_backtest.png")
-
- fig2 = plot_results(val_results, "Validation Set (2024-2025)")
- fig2.savefig('val_backtest.png', dpi=150, bbox_inches='tight')
- print(" 验证集图表: val_backtest.png")
-
- print("\n" + "="*60)
- print("回测完成")
- print("="*60)
- if __name__ == "__main__":
- main()
|