cyb50_multifactor.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 创业板50指数 - 多因子杠杆策略(目标:年化30%+)
  5. 策略:趋势 + 动量 + 波动率 + 杠杆
  6. """
  7. import pandas as pd
  8. import numpy as np
  9. import matplotlib
  10. matplotlib.use('Agg')
  11. import matplotlib.pyplot as plt
  12. import warnings
  13. warnings.filterwarnings('ignore')
  14. def load_real_data():
  15. """加载创业板50指数真实数据 - cyb50_baostock.csv"""
  16. df = pd.read_csv('cyb50_baostock.csv')
  17. df['date'] = pd.to_datetime(df['date'])
  18. df = df.set_index('date').sort_index()
  19. # 转换数据类型
  20. for col in ['open', 'high', 'low', 'close', 'volume']:
  21. df[col] = pd.to_numeric(df[col], errors='coerce')
  22. print(f"真实数据加载成功: {df.index[0].date()} ~ {df.index[-1].date()}")
  23. return df
  24. class MultiFactorStrategy:
  25. """多因子策略 - 稳健高收益版(无杠杆)"""
  26. def __init__(self, leverage=1.0):
  27. self.leverage = leverage
  28. self.pos = 0
  29. self.entry = 0
  30. self.peak = 0
  31. self.max_pos = 1.0 * leverage
  32. def calculate_factors(self, data):
  33. """计算多因子得分"""
  34. c = data['close']
  35. h = data['high']
  36. l = data['low']
  37. # 1. 趋势因子(三均线得分)
  38. ma5 = c.rolling(5).mean()
  39. ma20 = c.rolling(20).mean()
  40. ma60 = c.rolling(60).mean()
  41. trend_score = 0
  42. if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1
  43. if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1
  44. if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1
  45. trend_score = trend_score / 3
  46. # 2. 动量因子(20日涨幅)
  47. ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0
  48. mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1) # 降低敏感度
  49. # 3. 波动率因子
  50. atr = self._atr(h, l, c, 20)
  51. vol_pct = atr / c.iloc[-1]
  52. vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1)
  53. # 4. 突破因子(创20日新高)
  54. high_20 = h.rolling(20).max()
  55. breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0
  56. # 综合得分
  57. total_score = (trend_score * 0.35 + mom_score * 0.25 +
  58. vol_score * 0.25 + breakout * 0.15)
  59. return total_score, trend_score, mom_score, vol_score
  60. def _atr(self, h, l, c, n):
  61. """计算ATR"""
  62. tr1 = h - l
  63. tr2 = (h - c.shift(1)).abs()
  64. tr3 = (l - c.shift(1)).abs()
  65. tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
  66. return tr.rolling(n).mean().iloc[-1]
  67. def generate_signal(self, data):
  68. """生成交易信号"""
  69. score, trend, mom, vol = self.calculate_factors(data)
  70. curr_price = data['close'].iloc[-1]
  71. # 简化仓位决策
  72. if score > 0.7: # 强信号
  73. target_pos = self.max_pos
  74. elif score > 0.5: # 中等信号
  75. target_pos = self.max_pos * 0.6
  76. elif score > 0.3: # 弱信号
  77. target_pos = self.max_pos * 0.3
  78. else:
  79. target_pos = 0
  80. # 风险管理
  81. if self.pos > 0:
  82. if curr_price > self.peak:
  83. self.peak = curr_price
  84. drawdown = (curr_price - self.peak) / self.peak
  85. if drawdown < -0.10: # 10%移动止损
  86. target_pos = 0
  87. elif drawdown < -0.06: # 6%减仓
  88. target_pos = target_pos * 0.5
  89. if self.entry > 0:
  90. loss = (curr_price - self.entry) / self.entry
  91. if loss < -0.08: # 8%止损
  92. target_pos = 0
  93. # 状态更新
  94. if target_pos > 0 and self.pos == 0:
  95. self.entry = curr_price
  96. self.peak = curr_price
  97. state = "ENTRY"
  98. elif target_pos == 0 and self.pos > 0:
  99. self.entry = 0
  100. self.peak = 0
  101. state = "EXIT"
  102. elif target_pos == self.max_pos:
  103. state = "FULL"
  104. elif target_pos > 0:
  105. state = "PARTIAL"
  106. else:
  107. state = "EMPTY"
  108. self.pos = target_pos
  109. return target_pos, state, score
  110. def backtest(data, strategy, start, end, warmup=60):
  111. """回测引擎"""
  112. data = data[(data.index >= start) & (data.index <= end)]
  113. results = []
  114. nav = 1.0
  115. for i in range(warmup, len(data)):
  116. curr = data.iloc[:i+1]
  117. pos, state, score = strategy.generate_signal(curr)
  118. if i > warmup:
  119. ret = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
  120. # 杠杆收益计算
  121. strategy_ret = ret * results[-1]['pos']
  122. nav *= (1 + strategy_ret)
  123. results.append({
  124. 'date': data.index[i],
  125. 'pos': pos,
  126. 'nav': nav,
  127. 'state': state,
  128. 'score': score,
  129. 'price': data['close'].iloc[i]
  130. })
  131. df = pd.DataFrame(results).set_index('date')
  132. df['idx_nav'] = df['price'] / df['price'].iloc[0]
  133. return df
  134. def calc_metrics(nav, idx_nav):
  135. """计算绩效指标"""
  136. s_ret = nav.pct_change().dropna()
  137. total = nav.iloc[-1] - 1
  138. days = len(nav)
  139. annual = (1 + total) ** (252/days) - 1
  140. idx_total = idx_nav.iloc[-1] - 1
  141. idx_annual = (1 + idx_total) ** (252/days) - 1
  142. # 最大回撤
  143. running_max = nav.expanding().max()
  144. max_dd = ((nav - running_max) / running_max).min()
  145. # 波动率和夏普
  146. vol = s_ret.std() * np.sqrt(252)
  147. sharpe = (annual - 0.03) / vol if vol > 0 else 0
  148. calmar = annual / abs(max_dd) if max_dd != 0 else 0
  149. return {
  150. 'annual': annual, 'idx_annual': idx_annual,
  151. 'excess': annual - idx_annual, 'max_dd': max_dd,
  152. 'sharpe': sharpe, 'calmar': calmar,
  153. 'total': total, 'idx_total': idx_total,
  154. 'volatility': vol
  155. }
  156. def plot_results(df, title, fn):
  157. """绘制结果"""
  158. fig, axes = plt.subplots(3, 1, figsize=(14, 10))
  159. # 净值
  160. axes[0].plot(df.index, df['nav'], 'r-', lw=2, label='Strategy')
  161. axes[0].plot(df.index, df['idx_nav'], 'gray', lw=1, alpha=0.7, label='Index')
  162. axes[0].set_title(title, fontsize=14)
  163. axes[0].legend()
  164. axes[0].grid(True, alpha=0.3)
  165. # 仓位
  166. axes[1].fill_between(df.index, 0, df['pos'], alpha=0.5, color='green')
  167. axes[1].axhline(y=1.0, color='red', linestyle='--', alpha=0.5, label='Full Position')
  168. axes[1].set_ylim(0, 1.2)
  169. axes[1].set_ylabel('Position')
  170. axes[1].legend()
  171. axes[1].grid(True, alpha=0.3)
  172. # 回撤
  173. running_max = df['nav'].expanding().max()
  174. drawdown = (df['nav'] - running_max) / running_max
  175. axes[2].fill_between(df.index, drawdown, 0, alpha=0.3, color='red')
  176. axes[2].set_ylabel('Drawdown')
  177. axes[2].set_xlabel('Date')
  178. axes[2].grid(True, alpha=0.3)
  179. plt.tight_layout()
  180. plt.savefig(fn, dpi=150)
  181. print(f" 图表保存: {fn}")
  182. def main():
  183. print("="*70)
  184. print("创业板50 - 多因子稳健策略(目标年化25%+)")
  185. print("="*70)
  186. # 加载真实数据
  187. print("\n[1] 加载真实数据...")
  188. data = load_real_data()
  189. print(f" {data.index[0].date()} ~ {data.index[-1].date()}")
  190. # 训练
  191. print("\n[2] 训练阶段 (2018-2023)...")
  192. s = MultiFactorStrategy(leverage=1.0) # 无杠杆
  193. train = backtest(data, s, '2018-01-01', '2023-12-31')
  194. m = calc_metrics(train['nav'], train['idx_nav'])
  195. print(f"\n ╔══════════════════════════════════════╗")
  196. print(f" ║ 训 练 集 结 果 ║")
  197. print(f" ╠══════════════════════════════════════╣")
  198. print(f" ║ 策略总收益: {m['total']*100:8.1f}% ║")
  199. print(f" ║ 指数总收益: {m['idx_total']*100:8.1f}% ║")
  200. print(f" ║ ───────────────────────────────── ║")
  201. print(f" ║ 策略年化: {m['annual']*100:8.1f}% ║")
  202. print(f" ║ 指数年化: {m['idx_annual']*100:8.1f}% ║")
  203. print(f" ║ 超额收益: {m['excess']*100:8.1f}% ║")
  204. print(f" ║ ───────────────────────────────── ║")
  205. print(f" ║ 最大回撤: {m['max_dd']*100:8.1f}% ║")
  206. print(f" ║ 年化波动: {m['volatility']*100:8.1f}% ║")
  207. print(f" ║ 夏普比率: {m['sharpe']:8.2f} ║")
  208. print(f" ║ 卡玛比率: {m['calmar']:8.2f} ║")
  209. print(f" ╚══════════════════════════════════════╝")
  210. plot_results(train, "Training Set 2018-2023", "train_stable.png")
  211. # 验证
  212. print("\n[3] 验证阶段 (2024-2025)...")
  213. s2 = MultiFactorStrategy(leverage=1.0)
  214. val = backtest(data, s2, '2024-01-01', '2025-12-31')
  215. m2 = calc_metrics(val['nav'], val['idx_nav'])
  216. print(f"\n ╔══════════════════════════════════════╗")
  217. print(f" ║ 验 证 集 结 果 ║")
  218. print(f" ╠══════════════════════════════════════╣")
  219. print(f" ║ 策略总收益: {m2['total']*100:8.1f}% ║")
  220. print(f" ║ 指数总收益: {m2['idx_total']*100:8.1f}% ║")
  221. print(f" ║ ───────────────────────────────── ║")
  222. print(f" ║ 策略年化: {m2['annual']*100:8.1f}% ║")
  223. print(f" ║ 指数年化: {m2['idx_annual']*100:8.1f}% ║")
  224. print(f" ║ 超额收益: {m2['excess']*100:8.1f}% ║")
  225. print(f" ║ ───────────────────────────────── ║")
  226. print(f" ║ 最大回撤: {m2['max_dd']*100:8.1f}% ║")
  227. print(f" ║ 夏普比率: {m2['sharpe']:8.2f} ║")
  228. print(f" ╚══════════════════════════════════════╝")
  229. plot_results(val, "Validation Set 2024-2025", "val_stable.png")
  230. # 评价
  231. print("\n[4] 策略评价:")
  232. decay = (m['annual']-m2['annual'])/m['annual']*100 if m['annual'] > 0 else 0
  233. print(f" 年化收益衰减: {decay:.0f}%")
  234. if m['annual'] >= 0.25:
  235. print(" ✅ 训练集年化≥25%")
  236. elif m['annual'] >= 0.15:
  237. print(" ⚠️ 训练集收益一般")
  238. else:
  239. print(" ❌ 训练集收益不足")
  240. if m2['annual'] >= 0.15:
  241. print(" ✅ 验证集年化≥15%")
  242. elif m2['annual'] > 0:
  243. print(" ⚠️ 验证集正收益但未达15%")
  244. else:
  245. print(" ❌ 验证集亏损")
  246. if decay < 50:
  247. print(" ✅ 策略稳健(衰减<50%)")
  248. else:
  249. print(" ⚠️ 策略有过拟合风险")
  250. print("\n" + "="*70)
  251. if m['annual'] >= 0.25 and m2['annual'] > 0.10 and decay < 60:
  252. print("✅ 策略优秀!可实盘测试")
  253. elif m['annual'] >= 0.20 and m2['annual'] > 0:
  254. print("⚠️ 策略尚可,建议继续优化")
  255. else:
  256. print("❌ 策略需重新设计")
  257. print("="*70)
  258. if __name__ == "__main__":
  259. main()