cyb50_high_perf.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 创业板50指数真实数据回测 - 高收益优化版
  5. """
  6. import pandas as pd
  7. import numpy as np
  8. import matplotlib
  9. matplotlib.use('Agg')
  10. import matplotlib.pyplot as plt
  11. import warnings
  12. warnings.filterwarnings('ignore')
  13. # 尝试获取真实数据
  14. def get_real_data():
  15. """获取创业板50指数真实数据"""
  16. try:
  17. import akshare as ak
  18. # 获取创业板指数据作为代理(创业板50数据较难获取)
  19. print("尝试从akshare获取数据...")
  20. df = ak.index_zh_a_hist(symbol="399006", period="daily",
  21. start_date="20170101", end_date="20251231")
  22. df['日期'] = pd.to_datetime(df['日期'])
  23. df = df.rename(columns={
  24. '日期': 'date',
  25. '开盘': 'open',
  26. '收盘': 'close',
  27. '最高': 'high',
  28. '最低': 'low',
  29. '成交量': 'volume'
  30. })
  31. df = df.set_index('date').sort_index()
  32. print(f"成功获取真实数据: {df.index[0]} ~ {df.index[-1]}, 共{len(df)}条")
  33. return df
  34. except Exception as e:
  35. print(f"获取真实数据失败: {e}")
  36. return None
  37. # 如果没有真实数据,生成更贴近真实的数据
  38. def generate_realistic_data():
  39. """基于创业板历史特征生成更真实的模拟数据"""
  40. np.random.seed(42)
  41. dates = pd.date_range(start='2017-01-01', end='2025-12-31', freq='D')
  42. dates = dates[dates.dayofweek < 5]
  43. n = len(dates)
  44. # 创业板实际历史特征:
  45. # 2017: 震荡下跌 (-10%)
  46. # 2018: 大跌 (-28%)
  47. # 2019: 大涨 (+44%)
  48. # 2020: 大涨 (+65%)
  49. # 2021: 上涨 (+12%)
  50. # 2022: 大跌 (-29%)
  51. # 2023: 下跌 (-19%)
  52. # 2024: 震荡反弹 (+15%假设)
  53. # 2025: 继续上涨 (+10%假设)
  54. returns = np.random.normal(0, 0.015, n)
  55. # 按年份调整
  56. for i, date in enumerate(dates):
  57. year = date.year
  58. if year == 2017:
  59. returns[i] += np.random.normal(-0.0004, 0.012)
  60. elif year == 2018:
  61. returns[i] += np.random.normal(-0.0012, 0.018)
  62. elif year == 2019:
  63. returns[i] += np.random.normal(0.0015, 0.018)
  64. elif year == 2020:
  65. returns[i] += np.random.normal(0.0020, 0.022)
  66. elif year == 2021:
  67. returns[i] += np.random.normal(0.0005, 0.015)
  68. elif year == 2022:
  69. returns[i] += np.random.normal(-0.0013, 0.020)
  70. elif year == 2023:
  71. returns[i] += np.random.normal(-0.0009, 0.015)
  72. elif year == 2024:
  73. returns[i] += np.random.normal(0.0006, 0.018)
  74. elif year == 2025:
  75. returns[i] += np.random.normal(0.0004, 0.012)
  76. # 生成价格序列
  77. price = 1800
  78. prices = [price]
  79. for r in returns:
  80. price *= (1 + r)
  81. prices.append(price)
  82. prices = prices[1:]
  83. df = pd.DataFrame(index=dates)
  84. df['close'] = prices
  85. df['open'] = df['close'] * (1 + np.random.normal(0, 0.005, n))
  86. df['high'] = df[['open', 'close']].max(axis=1) * (1 + np.abs(np.random.normal(0, 0.008, n)))
  87. df['low'] = df[['open', 'close']].min(axis=1) * (1 - np.abs(np.random.normal(0, 0.008, n)))
  88. return df
  89. # ==================== 高性能策略 ====================
  90. class HighPerformanceStrategy:
  91. """
  92. 高收益策略:趋势跟踪 + 动量加速 + 智能止盈
  93. """
  94. def __init__(self, params=None):
  95. self.params = params or {
  96. 'fast_ma': 5, # 超短均线,快速响应
  97. 'slow_ma': 20, # 月均线
  98. 'trend_ma': 60, # 季均线
  99. 'momentum_period': 10,
  100. 'volatility_period': 20,
  101. 'max_position': 1.0,
  102. 'profit_take': 0.15, # 15%止盈
  103. 'trailing_stop': 0.08, # 8%移动止损
  104. }
  105. # 确保所有参数都有默认值
  106. default_params = {
  107. 'fast_ma': 5,
  108. 'slow_ma': 20,
  109. 'trend_ma': 60,
  110. 'momentum_period': 10,
  111. 'volatility_period': 20,
  112. 'max_position': 1.0,
  113. 'profit_take': 0.15,
  114. 'trailing_stop': 0.08,
  115. }
  116. if params:
  117. for key, val in default_params.items():
  118. if key not in params:
  119. self.params[key] = val
  120. self.position = 0
  121. self.entry_price = None
  122. self.max_price = None
  123. def generate_signal(self, data):
  124. """生成交易信号"""
  125. close = data['close']
  126. p = self.params
  127. # 计算指标
  128. ma_fast = close.rolling(p['fast_ma']).mean().iloc[-1]
  129. ma_slow = close.rolling(p['slow_ma']).mean().iloc[-1]
  130. ma_trend = close.rolling(p['trend_ma']).mean().iloc[-1]
  131. # 动量
  132. momentum = (close.iloc[-1] / close.iloc[-p['momentum_period']] - 1) * 100
  133. # 波动率
  134. returns = close.pct_change()
  135. vol = returns.rolling(p['volatility_period']).std().iloc[-1] * np.sqrt(252) * 100
  136. curr_price = close.iloc[-1]
  137. # 趋势强度
  138. trend_strong = (curr_price > ma_fast) and (ma_fast > ma_slow) and (ma_slow > ma_trend)
  139. trend_weak = (curr_price < ma_fast) and (ma_fast < ma_slow)
  140. # 信号生成
  141. if trend_strong and momentum > 2:
  142. # 强势上涨,满仓
  143. target_pos = p['max_position']
  144. state = "STRONG_UP"
  145. elif trend_strong and momentum > 0:
  146. # 趋势向上但动量一般,80%仓位
  147. target_pos = p['max_position'] * 0.8
  148. state = "UP"
  149. elif trend_weak or momentum < -3:
  150. # 趋势转弱,空仓
  151. target_pos = 0
  152. state = "DOWN"
  153. else:
  154. # 震荡,50%仓位
  155. target_pos = p['max_position'] * 0.5
  156. state = "OSCILLATE"
  157. # 移动止盈
  158. if self.position > 0 and self.max_price:
  159. current_return = (curr_price - self.entry_price) / self.entry_price
  160. # 更新最高价
  161. if curr_price > self.max_price:
  162. self.max_price = curr_price
  163. # 移动止损:从最高点回撤8%离场
  164. drawdown_from_peak = (curr_price - self.max_price) / self.max_price
  165. if drawdown_from_peak < -p['trailing_stop']:
  166. target_pos = 0
  167. state = "TRAILING_STOP"
  168. # 固定止盈15%
  169. elif current_return > p['profit_take']:
  170. target_pos = 0.5 # 减半仓,锁定利润
  171. state = "PROFIT_TAKE"
  172. # 更新状态
  173. if target_pos > 0 and self.position == 0:
  174. self.entry_price = curr_price
  175. self.max_price = curr_price
  176. elif target_pos == 0:
  177. self.entry_price = None
  178. self.max_price = None
  179. self.position = target_pos
  180. return target_pos, state
  181. # ==================== 回测引擎 ====================
  182. def backtest(data, strategy, start_date=None, end_date=None, warmup=60):
  183. """回测引擎"""
  184. if start_date:
  185. data = data[data.index >= start_date]
  186. if end_date:
  187. data = data[data.index <= end_date]
  188. results = []
  189. nav = 1.0
  190. for i in range(warmup, len(data)):
  191. curr_data = data.iloc[:i+1]
  192. position, state = strategy.generate_signal(curr_data)
  193. if i > warmup:
  194. daily_return = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
  195. strategy_return = daily_return * results[-1]['position'] if results else 0
  196. nav *= (1 + strategy_return)
  197. results.append({
  198. 'date': data.index[i],
  199. 'position': position,
  200. 'nav': nav,
  201. 'state': state,
  202. 'close': data['close'].iloc[i]
  203. })
  204. df = pd.DataFrame(results).set_index('date')
  205. df['index_nav'] = df['close'] / df['close'].iloc[0]
  206. metrics = calculate_metrics(df['nav'], df['index_nav'])
  207. return df, metrics
  208. def calculate_metrics(strategy_nav, index_nav):
  209. """计算绩效指标"""
  210. s_returns = strategy_nav.pct_change().dropna()
  211. total_return = strategy_nav.iloc[-1] - 1
  212. days = len(strategy_nav)
  213. annual_return = (1 + total_return) ** (252 / days) - 1
  214. index_return = index_nav.iloc[-1] - 1
  215. index_annual = (1 + index_return) ** (252 / days) - 1
  216. running_max = strategy_nav.expanding().max()
  217. max_dd = ((strategy_nav - running_max) / running_max).min()
  218. volatility = s_returns.std() * np.sqrt(252)
  219. sharpe = (annual_return - 0.03) / volatility if volatility > 0 else 0
  220. calmar = annual_return / abs(max_dd) if max_dd != 0 else 0
  221. win_rate = (s_returns > 0).mean()
  222. return {
  223. 'annual_return': annual_return,
  224. 'index_annual': index_annual,
  225. 'excess_annual': annual_return - index_annual,
  226. 'max_drawdown': max_dd,
  227. 'sharpe': sharpe,
  228. 'calmar': calmar,
  229. 'win_rate': win_rate,
  230. 'total_return': total_return,
  231. 'index_return': index_return
  232. }
  233. def plot_results(results, title, filename):
  234. """绘制回测图表"""
  235. fig, axes = plt.subplots(3, 1, figsize=(14, 10))
  236. ax1 = axes[0]
  237. ax1.plot(results.index, results['nav'], label='Strategy', linewidth=2, color='red')
  238. ax1.plot(results.index, results['index_nav'], label='Index', linewidth=1, color='gray', alpha=0.7)
  239. ax1.set_title(f'{title}', fontsize=14)
  240. ax1.set_ylabel('NAV')
  241. ax1.legend()
  242. ax1.grid(True, alpha=0.3)
  243. ax2 = axes[1]
  244. colors = {'STRONG_UP': 'green', 'UP': 'lightgreen', 'DOWN': 'red',
  245. 'OSCILLATE': 'yellow', 'TRAILING_STOP': 'orange', 'PROFIT_TAKE': 'blue'}
  246. pos_colors = [colors.get(s, 'gray') for s in results['state']]
  247. ax2.fill_between(results.index, 0, results['position'], alpha=0.5, color='green')
  248. ax2.set_ylabel('Position')
  249. ax2.set_ylim(0, 1.1)
  250. ax2.grid(True, alpha=0.3)
  251. ax3 = axes[2]
  252. running_max = results['nav'].expanding().max()
  253. drawdown = (results['nav'] - running_max) / running_max
  254. ax3.fill_between(results.index, drawdown, 0, alpha=0.3, color='red')
  255. ax3.set_ylabel('Drawdown')
  256. ax3.set_xlabel('Date')
  257. ax3.grid(True, alpha=0.3)
  258. plt.tight_layout()
  259. plt.savefig(filename, dpi=150, bbox_inches='tight')
  260. print(f" 图表已保存: {filename}")
  261. # ==================== 主程序 ====================
  262. def main():
  263. print("="*70)
  264. print("创业板50指数高收益策略回测")
  265. print("="*70)
  266. # 获取数据
  267. print("\n[1] 获取数据...")
  268. data = get_real_data()
  269. if data is None:
  270. print("使用高仿真模拟数据(基于历史特征)...")
  271. data = generate_realistic_data()
  272. print(f" 数据区间: {data.index[0].date()} ~ {data.index[-1].date()}")
  273. # 训练阶段
  274. print("\n[2] 训练阶段 (2018-2023) - 优化参数...")
  275. # 测试多组参数,找最优
  276. best_params = None
  277. best_score = -999
  278. test_configs = [
  279. {'fast_ma': 5, 'slow_ma': 20, 'profit_take': 0.15, 'trailing_stop': 0.08},
  280. {'fast_ma': 3, 'slow_ma': 15, 'profit_take': 0.12, 'trailing_stop': 0.06},
  281. {'fast_ma': 10, 'slow_ma': 30, 'profit_take': 0.20, 'trailing_stop': 0.10},
  282. ]
  283. for cfg in test_configs:
  284. strategy = HighPerformanceStrategy(cfg)
  285. results, metrics = backtest(data, strategy, start_date='2018-01-01', end_date='2023-12-31')
  286. # 评分:收益优先
  287. score = metrics['annual_return'] * 0.5 + metrics['calmar'] * 0.3 + metrics['sharpe'] * 0.2
  288. print(f"\n 参数: {cfg}")
  289. print(f" 年化: {metrics['annual_return']*100:.1f}%, 回撤: {metrics['max_drawdown']*100:.1f}%, 评分: {score:.2f}")
  290. if score > best_score and metrics['max_drawdown'] > -0.40:
  291. best_score = score
  292. best_params = cfg
  293. print(f"\n 最优参数: {best_params}")
  294. # 用最优参数重新回测训练集
  295. strategy = HighPerformanceStrategy(best_params)
  296. train_results, train_metrics = backtest(data, strategy, start_date='2018-01-01', end_date='2023-12-31')
  297. print(f"\n 训练集最终表现:")
  298. print(f" ┌─────────────────────────────────────┐")
  299. print(f" │ 策略年化收益: {train_metrics['annual_return']*100:>8.2f}% │")
  300. print(f" │ 指数年化收益: {train_metrics['index_annual']*100:>8.2f}% │")
  301. print(f" │ 超额收益: {train_metrics['excess_annual']*100:>8.2f}% │")
  302. print(f" │ 最大回撤: {train_metrics['max_drawdown']*100:>8.2f}% │")
  303. print(f" │ 夏普比率: {train_metrics['sharpe']:>8.2f} │")
  304. print(f" │ 卡玛比率: {train_metrics['calmar']:>8.2f} │")
  305. print(f" │ 胜率: {train_metrics['win_rate']*100:>8.1f}% │")
  306. print(f" └─────────────────────────────────────┘")
  307. plot_results(train_results, "Training Set (2018-2023)", "train_high_perf.png")
  308. # 验证阶段
  309. print(f"\n[3] 验证阶段 (2024-2025) - 样本外测试...")
  310. strategy_val = HighPerformanceStrategy(best_params)
  311. val_results, val_metrics = backtest(data, strategy_val, start_date='2024-01-01', end_date='2025-12-31')
  312. print(f"\n 验证集最终表现:")
  313. print(f" ┌─────────────────────────────────────┐")
  314. print(f" │ 策略年化收益: {val_metrics['annual_return']*100:>8.2f}% │")
  315. print(f" │ 指数年化收益: {val_metrics['index_annual']*100:>8.2f}% │")
  316. print(f" │ 超额收益: {val_metrics['excess_annual']*100:>8.2f}% │")
  317. print(f" │ 最大回撤: {val_metrics['max_drawdown']*100:>8.2f}% │")
  318. print(f" │ 夏普比率: {val_metrics['sharpe']:>8.2f} │")
  319. print(f" │ 卡玛比率: {val_metrics['calmar']:>8.2f} │")
  320. print(f" └─────────────────────────────────────┘")
  321. plot_results(val_results, "Validation Set (2024-2025)", "val_high_perf.png")
  322. # 过拟合检测
  323. print(f"\n[4] 过拟合检测:")
  324. return_decay = (train_metrics['annual_return'] - val_metrics['annual_return']) / train_metrics['annual_return'] if train_metrics['annual_return'] != 0 else 0
  325. print(f" 年化收益衰减: {return_decay*100:.1f}%")
  326. if return_decay > 0.5:
  327. print(" ⚠️ 策略在验证集表现下降明显")
  328. else:
  329. print(" ✓ 策略稳健性良好")
  330. print("\n" + "="*70)
  331. print("回测完成!")
  332. print("="*70)
  333. if __name__ == "__main__":
  334. main()