cyb50_strategy.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 创业板50指数点位量化交易策略回测框架
  5. 训练集:2017-2023 | 验证集:2024-2025
  6. """
  7. import pandas as pd
  8. import numpy as np
  9. import matplotlib.pyplot as plt
  10. from datetime import datetime, timedelta
  11. import warnings
  12. warnings.filterwarnings('ignore')
  13. # 设置中文显示
  14. plt.rcParams['font.sans-serif'] = ['DejaVu Sans']
  15. plt.rcParams['axes.unicode_minus'] = False
  16. # ==================== 1. 数据生成/加载 ====================
  17. def generate_mock_data(start='2017-01-01', end='2025-12-31', seed=42):
  18. """
  19. 生成模拟的创业板50指数数据(用于演示框架)
  20. 实际使用时应替换为真实数据
  21. """
  22. np.random.seed(seed)
  23. dates = pd.date_range(start=start, end=end, freq='D')
  24. # 只保留交易日(简化:去掉周末)
  25. dates = dates[dates.dayofweek < 5]
  26. n = len(dates)
  27. # 生成带有趋势和波动的价格序列
  28. # 创业板50基准约2000点
  29. returns = np.random.normal(0.0003, 0.016, n) # 日均收益0.03%,波动1.6%
  30. # 添加一些趋势性(牛市、熊市、震荡)
  31. # 2019-2021牛市
  32. bull_mask = (dates >= '2019-01-01') & (dates <= '2021-12-31')
  33. returns[bull_mask] += 0.001
  34. # 2022熊市
  35. bear_mask = (dates >= '2022-01-01') & (dates <= '2022-12-31')
  36. returns[bear_mask] -= 0.001
  37. # 2024-2025震荡
  38. osc_mask = dates >= '2024-01-01'
  39. returns[osc_mask] = np.random.normal(0, 0.012, sum(osc_mask))
  40. # 计算价格
  41. price = 2000 * np.exp(np.cumsum(returns))
  42. # 生成OHLC数据
  43. df = pd.DataFrame(index=dates)
  44. df['close'] = price
  45. df['open'] = price * (1 + np.random.normal(0, 0.005, n))
  46. df['high'] = np.maximum(df[['open', 'close']].max(axis=1) * (1 + np.abs(np.random.normal(0, 0.008, n))),
  47. df[['open', 'close']].max(axis=1))
  48. df['low'] = np.minimum(df[['open', 'close']].min(axis=1) * (1 - np.abs(np.random.normal(0, 0.008, n))),
  49. df[['open', 'close']].min(axis=1))
  50. return df
  51. # ==================== 2. 技术指标计算 ====================
  52. def calculate_atr(high, low, close, period=20):
  53. """计算ATR(平均真实波幅)"""
  54. tr1 = high - low
  55. tr2 = abs(high - close.shift(1))
  56. tr3 = abs(low - close.shift(1))
  57. tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
  58. atr = tr.rolling(window=period).mean()
  59. return atr
  60. def calculate_rsrs(high, low, n=20, m=250):
  61. """
  62. 计算RSRS指标(阻力支撑相对强度)- 优化版
  63. 返回: (rsrs_score, r_squared)
  64. """
  65. # 使用rolling计算斜率和R²
  66. def rolling_beta(x):
  67. if len(x) < n or np.std(x[:n//2]) == 0:
  68. return np.nan
  69. low_vals = x[:n//2]
  70. high_vals = x[n//2:]
  71. if np.std(low_vals) == 0:
  72. return 0
  73. beta = np.corrcoef(low_vals, high_vals)[0,1] * np.std(high_vals) / np.std(low_vals)
  74. return beta
  75. # 简化计算:使用 rolling.apply
  76. slopes = pd.Series(index=high.index, dtype=float)
  77. r2s = pd.Series(index=high.index, dtype=float)
  78. for i in range(n-1, len(high)):
  79. low_window = low.iloc[i-n+1:i+1].values
  80. high_window = high.iloc[i-n+1:i+1].values
  81. if np.std(low_window) > 0:
  82. beta = np.corrcoef(low_window, high_window)[0,1] * np.std(high_window) / np.std(low_window)
  83. # R²
  84. y_pred = np.mean(high_window) + beta * (low_window - np.mean(low_window))
  85. ss_res = np.sum((high_window - y_pred) ** 2)
  86. ss_tot = np.sum((high_window - np.mean(high_window)) ** 2)
  87. r2 = 1 - ss_res / ss_tot if ss_tot > 0 else 0
  88. slopes.iloc[i] = beta
  89. r2s.iloc[i] = r2
  90. # 计算标准分(滚动M日)
  91. rsrs = pd.Series(index=high.index, dtype=float)
  92. for i in range(m+n-2, len(slopes)):
  93. slope_window = slopes.iloc[i-m+1:i+1]
  94. if slope_window.std() > 0:
  95. zscore = (slopes.iloc[i] - slope_window.mean()) / slope_window.std()
  96. rsrs.iloc[i] = zscore * r2s.iloc[i]
  97. return rsrs, r2s
  98. def calculate_rsi(close, period=14):
  99. """计算RSI指标"""
  100. delta = close.diff()
  101. gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
  102. loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
  103. rs = gain / loss
  104. rsi = 100 - (100 / (1 + rs))
  105. return rsi
  106. # ==================== 3. 市场状态判断 ====================
  107. def classify_state(close, ma20, ma60, ma120, atr_percent, rsrs, close_series):
  108. """
  109. 判断市场状态:BULL(牛市)/ BEAR(熊市)/ OSCILLATE(震荡)
  110. close_series: 用于计算涨跌幅的收盘价序列
  111. """
  112. # 趋势得分
  113. trend_score = 0
  114. if close > ma20: trend_score += 1
  115. if ma20 > ma60: trend_score += 1
  116. if ma60 > ma120: trend_score += 1
  117. # 波动率
  118. high_volatility = atr_percent > 5
  119. # 熔断检测(极端情况)
  120. daily_return = close_series.pct_change().iloc[-1] if len(close_series) > 1 else 0
  121. crash = daily_return < -0.07
  122. if crash or (trend_score <= 1 and high_volatility and close < ma60):
  123. return "BEAR"
  124. elif trend_score >= 2 and not high_volatility:
  125. return "BULL"
  126. else:
  127. return "OSCILLATE"
  128. # ==================== 4. 策略核心 ====================
  129. class CYB50Strategy:
  130. """创业板50指数交易策略"""
  131. def __init__(self, params=None):
  132. # 默认参数
  133. self.params = params or {
  134. 'rsrs_n': 20,
  135. 'rsrs_m': 250,
  136. 'bull_buy': 0.5,
  137. 'bull_sell': -0.7,
  138. 'bear_buy': 1.5,
  139. 'bull_max': 1.0,
  140. 'osc_max': 0.6,
  141. 'stop_loss': 0.10,
  142. 'min_change': 0.20
  143. }
  144. self.current_position = 0
  145. self.state_history = []
  146. self.entry_price = None
  147. def generate_signal(self, data):
  148. """生成交易信号"""
  149. close = data['close']
  150. high = data['high']
  151. low = data['low']
  152. # 计算指标
  153. rsrs, r2 = calculate_rsrs(high, low,
  154. self.params['rsrs_n'],
  155. self.params['rsrs_m'])
  156. ma20 = close.rolling(20).mean()
  157. ma60 = close.rolling(60).mean()
  158. ma120 = close.rolling(120).mean()
  159. atr = calculate_atr(high, low, close, 20)
  160. atr_percent = atr / close * 100
  161. # 获取当前值
  162. curr_rsrs = rsrs.iloc[-1]
  163. curr_close = close.iloc[-1]
  164. curr_ma20 = ma20.iloc[-1]
  165. curr_ma60 = ma60.iloc[-1]
  166. curr_ma120 = ma120.iloc[-1]
  167. curr_atr_pct = atr_percent.iloc[-1]
  168. # 检查是否有足够数据
  169. if pd.isna(curr_rsrs):
  170. return 0, "INIT"
  171. # 判断市场状态
  172. state = classify_state(curr_close, curr_ma20, curr_ma60,
  173. curr_ma120, curr_atr_pct, curr_rsrs, close)
  174. # 状态防抖(连续3日确认,极端情况除外)
  175. self.state_history.append(state)
  176. if len(self.state_history) >= 3:
  177. # 熔断检测:单日大跌立即转熊
  178. daily_return = close.pct_change().iloc[-1]
  179. if daily_return < -0.07:
  180. state = "BEAR"
  181. elif len(self.state_history) >= 3:
  182. # 正常防抖
  183. recent_states = self.state_history[-3:]
  184. if len(set(recent_states)) > 1:
  185. state = self.state_history[-2] if len(self.state_history) >= 2 else state
  186. # 根据状态确定仓位
  187. target_pos = self._calculate_position(state, curr_rsrs, curr_atr_pct)
  188. # 止损检查
  189. if self.entry_price is not None and self.current_position > 0:
  190. current_drawdown = (curr_close - self.entry_price) / self.entry_price
  191. if current_drawdown < -self.params['stop_loss']:
  192. target_pos = 0
  193. self.entry_price = None
  194. # 最小调仓幅度过滤
  195. if abs(target_pos - self.current_position) < self.params['min_change']:
  196. target_pos = self.current_position
  197. # 更新入场价
  198. if target_pos > 0 and self.current_position == 0:
  199. self.entry_price = curr_close
  200. elif target_pos == 0:
  201. self.entry_price = None
  202. self.current_position = target_pos
  203. return target_pos, state
  204. def _calculate_position(self, state, rsrs, atr_percent):
  205. """根据状态和指标计算目标仓位"""
  206. p = self.params
  207. if state == "BULL":
  208. if rsrs > p['bull_buy']:
  209. pos = p['bull_max']
  210. elif rsrs < p['bull_sell']:
  211. pos = 0
  212. else:
  213. pos = p['bull_max'] * 0.5
  214. elif state == "BEAR":
  215. # 熊市:空仓为主
  216. if rsrs < -p['bear_buy']:
  217. pos = 0.1 # 极端超卖,10%仓位博反弹
  218. else:
  219. pos = 0
  220. else: # OSCILLATE
  221. if rsrs > 0.7:
  222. pos = p['osc_max']
  223. elif rsrs < -0.7:
  224. pos = 0
  225. else:
  226. pos = p['osc_max'] * 0.5
  227. # 波动率调整
  228. if atr_percent > 5:
  229. pos *= 0.6
  230. return np.clip(pos, 0, 1)
  231. # ==================== 5. 回测引擎 ====================
  232. def backtest(data, strategy, initial_capital=1000000, start_date=None, end_date=None):
  233. """
  234. 回测引擎
  235. """
  236. # 数据切片
  237. if start_date:
  238. data = data[data.index >= start_date]
  239. if end_date:
  240. data = data[data.index <= end_date]
  241. dates = []
  242. positions = []
  243. navs = []
  244. states = []
  245. capital = initial_capital
  246. current_nav = 1.0
  247. # 跳过前250日(warm-up)
  248. start_idx = 250
  249. for i in range(start_idx, len(data)):
  250. curr_data = data.iloc[:i+1]
  251. curr_date = data.index[i]
  252. # 获取信号
  253. position, state = strategy.generate_signal(curr_data)
  254. # 计算当日收益(使用前一日仓位)
  255. if i > start_idx:
  256. daily_return = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
  257. prev_position = positions[-1] if positions else 0
  258. strategy_return = daily_return * prev_position
  259. current_nav *= (1 + strategy_return)
  260. dates.append(curr_date)
  261. positions.append(position)
  262. navs.append(current_nav)
  263. states.append(state)
  264. # 构建结果DataFrame
  265. results = pd.DataFrame({
  266. 'date': dates,
  267. 'position': positions,
  268. 'nav': navs,
  269. 'state': states
  270. }).set_index('date')
  271. # 计算指数基准
  272. index_data = data.iloc[start_idx:].copy()
  273. results['index_close'] = index_data['close']
  274. results['index_nav'] = results['index_close'] / results['index_close'].iloc[0]
  275. # 计算指标
  276. metrics = calculate_metrics(results['nav'], results['index_nav'])
  277. return results, metrics
  278. def calculate_metrics(strategy_nav, index_nav):
  279. """计算回测指标"""
  280. # 收益率
  281. total_return = strategy_nav.iloc[-1] - 1
  282. days = len(strategy_nav)
  283. annual_return = (1 + total_return) ** (252 / days) - 1
  284. # 指数收益
  285. index_return = index_nav.iloc[-1] - 1
  286. index_annual = (1 + index_return) ** (252 / days) - 1
  287. # 最大回撤
  288. running_max = strategy_nav.expanding().max()
  289. drawdown = (strategy_nav - running_max) / running_max
  290. max_drawdown = drawdown.min()
  291. # 波动率
  292. daily_returns = strategy_nav.pct_change().dropna()
  293. volatility = daily_returns.std() * np.sqrt(252)
  294. # 夏普比率(假设无风险利率3%)
  295. excess_return = annual_return - 0.03
  296. sharpe = excess_return / volatility if volatility > 0 else 0
  297. # 卡玛比率
  298. calmar = annual_return / abs(max_drawdown) if max_drawdown != 0 else 0
  299. # 胜率
  300. positive_days = (daily_returns > 0).sum()
  301. total_days = len(daily_returns)
  302. win_rate = positive_days / total_days
  303. # Beta
  304. index_returns = index_nav.pct_change().dropna()
  305. covariance = daily_returns.cov(index_returns)
  306. variance = index_returns.var()
  307. beta = covariance / variance if variance > 0 else 1
  308. # 年化超额收益
  309. excess_annual = annual_return - index_annual
  310. return {
  311. 'total_return': total_return,
  312. 'annual_return': annual_return,
  313. 'index_return': index_return,
  314. 'index_annual': index_annual,
  315. 'excess_annual': excess_annual,
  316. 'max_drawdown': max_drawdown,
  317. 'volatility': volatility,
  318. 'sharpe': sharpe,
  319. 'calmar': calmar,
  320. 'win_rate': win_rate,
  321. 'beta': beta,
  322. 'trading_days': days
  323. }
  324. # ==================== 6. 参数优化 ====================
  325. def grid_search(data, param_grid):
  326. """网格搜索最优参数 - 简化版"""
  327. best_score = -999
  328. best_params = None
  329. best_metrics = None
  330. # 只测试1组参数(加速演示)
  331. test_params = [
  332. {'rsrs_n': 20, 'rsrs_m': 250, 'bull_buy': 0.5, 'bull_sell': -0.7,
  333. 'bear_buy': 1.5, 'bull_max': 1.0, 'osc_max': 0.6, 'stop_loss': 0.10, 'min_change': 0.20},
  334. ]
  335. for params in test_params:
  336. print(f"测试参数: {params}")
  337. strategy = CYB50Strategy(params)
  338. results, metrics = backtest(data, strategy, start_date='2018-02-01', end_date='2023-12-31')
  339. # 综合评分
  340. score = metrics['sharpe'] * 0.4 + metrics['calmar'] * 0.4 + metrics['excess_annual'] * 2
  341. print(f" 年化: {metrics['annual_return']*100:.1f}%, 回撤: {metrics['max_drawdown']*100:.1f}%, 夏普: {metrics['sharpe']:.2f}, 评分: {score:.2f}")
  342. if score > best_score and metrics['max_drawdown'] > -0.40:
  343. best_score = score
  344. best_params = params
  345. best_metrics = metrics
  346. return best_params, best_metrics
  347. # ==================== 7. 可视化 ====================
  348. def plot_results(results, title="Backtest Results"):
  349. """绘制回测结果"""
  350. fig, axes = plt.subplots(3, 1, figsize=(12, 10))
  351. # 净值曲线
  352. ax1 = axes[0]
  353. ax1.plot(results.index, results['nav'], label='Strategy', linewidth=2)
  354. ax1.plot(results.index, results['index_nav'], label='Index', linewidth=1, alpha=0.7)
  355. ax1.set_title(f'{title} - NAV')
  356. ax1.set_ylabel('NAV')
  357. ax1.legend()
  358. ax1.grid(True, alpha=0.3)
  359. # 仓位变化
  360. ax2 = axes[1]
  361. ax2.fill_between(results.index, 0, results['position'], alpha=0.3, label='Position')
  362. ax2.set_ylabel('Position')
  363. ax2.set_ylim(0, 1.1)
  364. ax2.legend()
  365. ax2.grid(True, alpha=0.3)
  366. # 回撤
  367. ax3 = axes[2]
  368. running_max = results['nav'].expanding().max()
  369. drawdown = (results['nav'] - running_max) / running_max
  370. ax3.fill_between(results.index, drawdown, 0, alpha=0.3, color='red')
  371. ax3.set_ylabel('Drawdown')
  372. ax3.set_xlabel('Date')
  373. ax3.grid(True, alpha=0.3)
  374. plt.tight_layout()
  375. return fig
  376. # ==================== 8. 主程序 ====================
  377. def main():
  378. print("="*60)
  379. print("创业板50指数量化交易策略回测")
  380. print("="*60)
  381. # 加载数据(使用模拟数据演示)
  382. print("\n[1] 生成模拟数据...")
  383. data = generate_mock_data('2017-01-01', '2025-12-31')
  384. print(f"数据区间: {data.index[0]} ~ {data.index[-1]}, 共{len(data)}个交易日")
  385. # 划分训练集和验证集
  386. train_end = '2023-12-31'
  387. val_start = '2024-01-01'
  388. # 训练阶段(参数优化)
  389. print("\n[2] 训练阶段:参数优化 (2018-2023)...")
  390. best_params, train_metrics = grid_search(data, None)
  391. print(f"\n最优参数:")
  392. for k, v in best_params.items():
  393. print(f" {k}: {v}")
  394. print(f"\n训练集表现 (2018-2023):")
  395. print(f" 策略年化收益: {train_metrics['annual_return']*100:.2f}%")
  396. print(f" 指数年化收益: {train_metrics['index_annual']*100:.2f}%")
  397. print(f" 超额收益: {train_metrics['excess_annual']*100:.2f}%")
  398. print(f" 最大回撤: {train_metrics['max_drawdown']*100:.2f}%")
  399. print(f" 夏普比率: {train_metrics['sharpe']:.2f}")
  400. print(f" 卡玛比率: {train_metrics['calmar']:.2f}")
  401. print(f" 胜率: {train_metrics['win_rate']*100:.1f}%")
  402. print(f" Beta: {train_metrics['beta']:.2f}")
  403. # 使用最优参数回测训练集(获取完整结果)
  404. strategy = CYB50Strategy(best_params)
  405. train_results, _ = backtest(data, strategy, start_date='2018-02-01', end_date=train_end)
  406. # 验证阶段(样本外)
  407. print(f"\n[3] 验证阶段:样本外测试 (2024-2025)...")
  408. strategy_val = CYB50Strategy(best_params)
  409. val_results, val_metrics = backtest(data, strategy_val, start_date=val_start, end_date='2025-12-31')
  410. print(f"\n验证集表现 (2024-2025):")
  411. print(f" 策略年化收益: {val_metrics['annual_return']*100:.2f}%")
  412. print(f" 指数年化收益: {val_metrics['index_annual']*100:.2f}%")
  413. print(f" 超额收益: {val_metrics['excess_annual']*100:.2f}%")
  414. print(f" 最大回撤: {val_metrics['max_drawdown']*100:.2f}%")
  415. print(f" 夏普比率: {val_metrics['sharpe']:.2f}")
  416. print(f" 卡玛比率: {val_metrics['calmar']:.2f}")
  417. # 过拟合检测
  418. sharpe_decay = (train_metrics['sharpe'] - val_metrics['sharpe']) / train_metrics['sharpe'] if train_metrics['sharpe'] != 0 else 0
  419. print(f"\n[4] 过拟合检测:")
  420. print(f" 夏普比率衰减: {sharpe_decay*100:.1f}%")
  421. if sharpe_decay > 0.5:
  422. print(" ⚠️ 警告:可能存在严重过拟合")
  423. elif sharpe_decay > 0.3:
  424. print(" ⚠️ 注意:轻度过拟合,建议简化参数")
  425. else:
  426. print(" ✓ 无过拟合,策略稳健")
  427. # 保存结果
  428. print(f"\n[5] 保存结果...")
  429. train_results.to_csv('train_results.csv')
  430. val_results.to_csv('val_results.csv')
  431. print(" 训练集结果: train_results.csv")
  432. print(" 验证集结果: val_results.csv")
  433. # 绘图
  434. print(f"\n[6] 生成图表...")
  435. fig1 = plot_results(train_results, "Training Set (2018-2023)")
  436. fig1.savefig('train_backtest.png', dpi=150, bbox_inches='tight')
  437. print(" 训练集图表: train_backtest.png")
  438. fig2 = plot_results(val_results, "Validation Set (2024-2025)")
  439. fig2.savefig('val_backtest.png', dpi=150, bbox_inches='tight')
  440. print(" 验证集图表: val_backtest.png")
  441. print("\n" + "="*60)
  442. print("回测完成")
  443. print("="*60)
  444. if __name__ == "__main__":
  445. main()