cyb50_all_strategies_real_data.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 创业板50指数 - 全策略真实数据统一回测验证
  5. 数据源:cyb50_baostock.csv (真实数据,2017-2025)
  6. """
  7. import pandas as pd
  8. import numpy as np
  9. import matplotlib
  10. matplotlib.use('Agg')
  11. import matplotlib.pyplot as plt
  12. import warnings
  13. warnings.filterwarnings('ignore')
  14. print("="*80)
  15. print("创业板50指数 - 全策略真实数据回测验证")
  16. print("="*80)
  17. # ==================== 1. 加载真实数据 ====================
  18. def load_real_data():
  19. """加载真实数据 - 唯一数据源"""
  20. df = pd.read_csv('cyb50_baostock.csv')
  21. df['date'] = pd.to_datetime(df['date'])
  22. df = df.set_index('date').sort_index()
  23. # 转换数据类型
  24. for col in ['open', 'high', 'low', 'close', 'volume']:
  25. df[col] = pd.to_numeric(df[col], errors='coerce')
  26. print("\n【数据验证】")
  27. print(f" ✅ 真实数据源: cyb50_baostock.csv")
  28. print(f" ✅ 数据区间: {df.index[0].date()} ~ {df.index[-1].date()}")
  29. print(f" ✅ 总交易日: {len(df)} 天")
  30. print(f" ✅ 价格范围: {df['close'].min():.0f} ~ {df['close'].max():.0f}")
  31. # 验证数据完整性
  32. null_count = df.isnull().sum().sum()
  33. if null_count > 0:
  34. print(f" ⚠️ 空值数量: {null_count}")
  35. else:
  36. print(f" ✅ 数据完整性: 无空值")
  37. # 统计特征
  38. returns = df['close'].pct_change().dropna()
  39. print(f"\n【数据统计特征】")
  40. print(f" 日收益均值: {returns.mean()*100:.4f}%")
  41. print(f" 日收益标准差: {returns.std()*100:.2f}%")
  42. print(f" 年化收益: {returns.mean()*252*100:.1f}%")
  43. print(f" 年化波动: {returns.std()*np.sqrt(252)*100:.1f}%")
  44. print(f" 最大单日涨幅: {returns.max()*100:.2f}%")
  45. print(f" 最大单日跌幅: {returns.min()*100:.2f}%")
  46. return df
  47. # ==================== 2. 回测引擎 ====================
  48. def backtest_engine(data, strategy_func, start_date, end_date, warmup=60, strategy_name="Strategy"):
  49. """统一回测引擎"""
  50. data = data[(data.index >= start_date) & (data.index <= end_date)].copy()
  51. if len(data) == 0:
  52. print(f" ❌ {strategy_name}: 无数据")
  53. return None, None
  54. results = []
  55. nav = 1.0
  56. position = 0
  57. for i in range(warmup, len(data)):
  58. curr_data = data.iloc[:i+1]
  59. try:
  60. target_pos, state = strategy_func(curr_data, position)
  61. except Exception as e:
  62. print(f" ⚠️ 策略错误: {e}")
  63. target_pos, state = 0, "ERROR"
  64. # 计算收益
  65. if i > warmup:
  66. daily_ret = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
  67. strategy_ret = daily_ret * position
  68. nav *= (1 + strategy_ret)
  69. results.append({
  70. 'date': data.index[i],
  71. 'pos': target_pos,
  72. 'nav': nav,
  73. 'state': state,
  74. 'close': data['close'].iloc[i]
  75. })
  76. position = target_pos
  77. df = pd.DataFrame(results).set_index('date')
  78. df['index_nav'] = df['close'] / df['close'].iloc[0]
  79. return df
  80. def calc_metrics(nav, index_nav):
  81. """计算绩效指标"""
  82. s_ret = nav.pct_change().dropna()
  83. total = nav.iloc[-1] - 1
  84. days = len(nav)
  85. annual = (1 + total) ** (252 / days) - 1 if days > 0 else 0
  86. idx_total = index_nav.iloc[-1] - 1
  87. idx_annual = (1 + idx_total) ** (252 / days) - 1 if days > 0 else 0
  88. running_max = nav.expanding().max()
  89. max_dd = ((nav - running_max) / running_max).min()
  90. vol = s_ret.std() * np.sqrt(252)
  91. sharpe = (annual - 0.03) / vol if vol > 0 else 0
  92. calmar = annual / abs(max_dd) if max_dd != 0 else 0
  93. win_rate = (s_ret > 0).mean()
  94. return {
  95. 'annual': annual, 'idx_annual': idx_annual,
  96. 'excess': annual - idx_annual, 'max_dd': max_dd,
  97. 'sharpe': sharpe, 'calmar': calmar,
  98. 'win_rate': win_rate, 'total': total, 'idx_total': idx_total,
  99. 'volatility': vol, 'days': days
  100. }
  101. def plot_results(results, title, filename):
  102. """绘制结果"""
  103. fig, axes = plt.subplots(3, 1, figsize=(14, 10))
  104. axes[0].plot(results.index, results['nav'], 'r-', lw=2, label='Strategy')
  105. axes[0].plot(results.index, results['index_nav'], 'gray', lw=1, alpha=0.7, label='CYB50 Index')
  106. axes[0].set_title(title, fontsize=14)
  107. axes[0].legend()
  108. axes[0].grid(True, alpha=0.3)
  109. axes[1].fill_between(results.index, 0, results['pos'], alpha=0.5, color='green')
  110. axes[1].set_ylim(0, 1.1)
  111. axes[1].set_ylabel('Position')
  112. axes[1].grid(True, alpha=0.3)
  113. running_max = results['nav'].expanding().max()
  114. drawdown = (results['nav'] - running_max) / running_max
  115. axes[2].fill_between(results.index, drawdown, 0, alpha=0.3, color='red')
  116. axes[2].set_ylabel('Drawdown')
  117. axes[2].set_xlabel('Date')
  118. axes[2].grid(True, alpha=0.3)
  119. plt.tight_layout()
  120. plt.savefig(filename, dpi=150)
  121. return filename
  122. # ==================== 3. 策略定义 ====================
  123. # 策略1: 趋势跟踪策略 (来自 cyb50_real_backtest.py)
  124. def strategy_trend(data, current_pos):
  125. """趋势策略:MA+突破+移动止损"""
  126. close = data['close'].values
  127. high = data['high'].values
  128. low = data['low'].values
  129. if len(close) < 60:
  130. return 0, "INIT"
  131. ma10 = np.mean(close[-10:])
  132. ma30 = np.mean(close[-30:])
  133. ret10 = (close[-1] / close[-10] - 1) if len(close) >= 10 else 0
  134. high_20 = np.max(high[-20:])
  135. low_20 = np.min(low[-20:])
  136. curr = close[-1]
  137. # 买入条件
  138. buy_signal = (curr > ma10 > ma30) and (curr >= high_20 * 0.995) and (ret10 > 0.02)
  139. # 卖出条件
  140. sell_signal = (curr < ma30) or (curr <= low_20 * 1.005)
  141. if buy_signal and current_pos == 0:
  142. return 1.0, "ENTRY"
  143. elif sell_signal and current_pos > 0:
  144. return 0.0, "EXIT"
  145. else:
  146. return current_pos, "HOLD" if current_pos > 0 else "EMPTY"
  147. # 策略2: 双均线策略 (来自 cyb50_simple.py)
  148. def strategy_ma_cross(data, current_pos):
  149. """双均线交叉策略"""
  150. close = data['close'].values
  151. if len(close) < 60:
  152. return 0, "INIT"
  153. ma20 = np.mean(close[-20:])
  154. ma60 = np.mean(close[-60:])
  155. curr = close[-1]
  156. if curr > ma20 > ma60:
  157. return 1.0, "BULL"
  158. elif curr < ma60:
  159. return 0.0, "BEAR"
  160. else:
  161. return current_pos, "HOLD"
  162. # 策略3: 动量策略 (来自 cyb50_high_perf.py)
  163. def strategy_momentum(data, current_pos):
  164. """动量策略:趋势+动量加速"""
  165. close = data['close']
  166. if len(close) < 60:
  167. return 0, "INIT"
  168. ma5 = close.rolling(5).mean().iloc[-1]
  169. ma20 = close.rolling(20).mean().iloc[-1]
  170. ma60 = close.rolling(60).mean().iloc[-1]
  171. momentum = (close.iloc[-1] / close.iloc[-10] - 1) * 100
  172. trend_strong = (close.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60)
  173. trend_weak = (close.iloc[-1] < ma5) and (ma5 < ma20)
  174. if trend_strong and momentum > 2:
  175. return 1.0, "STRONG_UP"
  176. elif trend_strong and momentum > 0:
  177. return 0.8, "UP"
  178. elif trend_weak or momentum < -3:
  179. return 0.0, "DOWN"
  180. else:
  181. return 0.5, "OSCILLATE"
  182. # 策略4: 多因子策略 (来自 cyb50_multifactor.py)
  183. def strategy_multifactor(data, current_pos):
  184. """多因子策略:趋势+动量+波动率+突破"""
  185. c = data['close']
  186. h = data['high']
  187. l = data['low']
  188. if len(c) < 60:
  189. return 0, "INIT"
  190. # 趋势因子
  191. ma5 = c.rolling(5).mean()
  192. ma20 = c.rolling(20).mean()
  193. ma60 = c.rolling(60).mean()
  194. trend_score = 0
  195. if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1
  196. if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1
  197. if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1
  198. trend_score = trend_score / 3
  199. # 动量因子
  200. ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0
  201. mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1)
  202. # 波动率因子
  203. atr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1)
  204. atr_mean = atr.rolling(20).mean().iloc[-1]
  205. vol_pct = atr_mean / c.iloc[-1]
  206. vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1)
  207. # 突破因子
  208. high_20 = h.rolling(20).max()
  209. breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0
  210. # 综合得分
  211. total_score = trend_score * 0.35 + mom_score * 0.25 + vol_score * 0.25 + breakout * 0.15
  212. if total_score > 0.7:
  213. return 1.0, "STRONG"
  214. elif total_score > 0.5:
  215. return 0.6, "MEDIUM"
  216. elif total_score > 0.3:
  217. return 0.3, "WEAK"
  218. else:
  219. return 0.0, "EMPTY"
  220. # 策略5: RSI策略
  221. def strategy_rsi(data, current_pos):
  222. """RSI策略"""
  223. close = data['close']
  224. if len(close) < 20:
  225. return 0, "INIT"
  226. delta = close.diff()
  227. gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
  228. loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
  229. rs = gain / loss
  230. rsi = 100 - (100 / (1 + rs))
  231. curr_rsi = rsi.iloc[-1]
  232. if pd.isna(curr_rsi):
  233. return 0, "INIT"
  234. if curr_rsi < 30:
  235. return 1.0, "OVERSOLD"
  236. elif curr_rsi > 70:
  237. return 0.0, "OVERBOUGHT"
  238. else:
  239. return current_pos, "HOLD"
  240. # ==================== 4. 主程序 ====================
  241. def main():
  242. # 加载真实数据
  243. data = load_real_data()
  244. # 定义回测区间
  245. train_start, train_end = '2018-01-01', '2023-12-31'
  246. val_start, val_end = '2024-01-01', '2025-12-31'
  247. strategies = [
  248. ("趋势跟踪策略", strategy_trend),
  249. ("双均线策略", strategy_ma_cross),
  250. ("动量策略", strategy_momentum),
  251. ("多因子策略", strategy_multifactor),
  252. ("RSI策略", strategy_rsi),
  253. ]
  254. all_results = []
  255. print("\n" + "="*80)
  256. print("开始回测 - 全部使用真实数据")
  257. print("="*80)
  258. for name, strategy_func in strategies:
  259. print(f"\n【{name}】")
  260. # 训练集
  261. train_res = backtest_engine(data, strategy_func, train_start, train_end, strategy_name=name)
  262. if train_res is None:
  263. continue
  264. train_m = calc_metrics(train_res['nav'], train_res['index_nav'])
  265. # 验证集
  266. val_res = backtest_engine(data, strategy_func, val_start, val_end, strategy_name=name)
  267. val_m = calc_metrics(val_res['nav'], val_res['index_nav'])
  268. # 打印结果
  269. print(f" 训练集 (2018-2023):")
  270. print(f" 年化收益: {train_m['annual']*100:7.2f}% | 指数: {train_m['idx_annual']*100:7.2f}% | 超额: {train_m['excess']*100:7.2f}%")
  271. print(f" 最大回撤: {train_m['max_dd']*100:7.2f}% | 夏普: {train_m['sharpe']:5.2f} | 胜率: {train_m['win_rate']*100:5.1f}%")
  272. print(f" 验证集 (2024-2025):")
  273. print(f" 年化收益: {val_m['annual']*100:7.2f}% | 指数: {val_m['idx_annual']*100:7.2f}% | 超额: {val_m['excess']*100:7.2f}%")
  274. print(f" 最大回撤: {val_m['max_dd']*100:7.2f}% | 夏普: {val_m['sharpe']:5.2f}")
  275. # 过拟合检测
  276. decay = (train_m['annual'] - val_m['annual']) / train_m['annual'] * 100 if train_m['annual'] != 0 else 0
  277. status = "✅" if decay < 50 else "⚠️"
  278. print(f" 衰减率: {decay:.1f}% {status}")
  279. # 保存图表
  280. plot_results(train_res, f"{name} - Training", f"train_{name.replace(' ', '_')}.png")
  281. plot_results(val_res, f"{name} - Validation", f"val_{name.replace(' ', '_')}.png")
  282. all_results.append({
  283. 'name': name,
  284. 'train': train_m,
  285. 'val': val_m,
  286. 'decay': decay
  287. })
  288. # 汇总对比
  289. print("\n" + "="*80)
  290. print("策略对比汇总(真实数据)")
  291. print("="*80)
  292. print(f"{'策略':<12} {'训练年化':>10} {'验证年化':>10} {'训练回撤':>10} {'验证回撤':>10} {'衰减':>8} {'评价':>6}")
  293. print("-"*80)
  294. for r in all_results:
  295. t, v = r['train'], r['val']
  296. eval_status = "✅" if t['annual'] > 0.1 and v['annual'] > 0 and r['decay'] < 50 else "⚠️" if v['annual'] > 0 else "❌"
  297. print(f"{r['name']:<12} {t['annual']*100:>9.1f}% {v['annual']*100:>9.1f}% {t['max_dd']*100:>9.1f}% {v['max_dd']*100:>9.1f}% {r['decay']:>7.0f}% {eval_status:>6}")
  298. # 找出最佳策略
  299. best = max(all_results, key=lambda x: x['val']['annual'] if x['val']['annual'] > 0 else -999)
  300. print(f"\n🏆 验证集表现最佳: {best['name']}")
  301. print(f" 验证集年化: {best['val']['annual']*100:.2f}%")
  302. print(f" 超额收益: {best['val']['excess']*100:.2f}%")
  303. print("\n" + "="*80)
  304. print("✅ 所有策略已使用真实数据验证完成")
  305. print("="*80)
  306. if __name__ == "__main__":
  307. main()