optimized_strategy_backtest.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 创业板50 T+1 优化策略实现与回测
  5. 基于五层深挖的最优参数
  6. """
  7. import pandas as pd
  8. import numpy as np
  9. from datetime import datetime, timedelta
  10. import warnings
  11. warnings.filterwarnings('ignore')
  12. # 导入原策略组件
  13. from cyb50_30min_dual_direction import (
  14. ConfigManager,
  15. IntradayDataFetcher,
  16. DualDirectionSignalGenerator,
  17. DualDirectionExecutor
  18. )
  19. from t1_converter import simulate_t1_trades
  20. def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
  21. """加载本地数据"""
  22. print(f"📊 加载数据 {csv_file}...")
  23. df = pd.read_csv(csv_file)
  24. df['DateTime'] = pd.to_datetime(df['DateTime'])
  25. df.set_index('DateTime', inplace=True)
  26. df.sort_index(inplace=True)
  27. # 列名映射
  28. if 'Open' not in df.columns and 'o' in df.columns:
  29. df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
  30. # 类型转换
  31. for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
  32. if col in df.columns:
  33. df[col] = pd.to_numeric(df[col], errors='coerce')
  34. # 计算基础指标
  35. df['Returns'] = df['Close'].pct_change()
  36. df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
  37. df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
  38. df.ffill(inplace=True)
  39. df.dropna(inplace=True)
  40. print(f"✅ 数据加载完成: {len(df)}条K线")
  41. print(f" 区间: {df.index[0]} ~ {df.index[-1]}")
  42. return df
  43. def calculate_enhanced_indicators(df):
  44. """计算增强版技术指标"""
  45. print("📈 计算增强指标...")
  46. # RSI (6, 14, 21)
  47. for period in [6, 14, 21]:
  48. delta = df['Close'].diff()
  49. gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
  50. loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
  51. rs = gain / loss
  52. df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
  53. # 动量 (5周期)
  54. df['Momentum_5'] = (df['Close'] / df['Close'].shift(5) - 1) * 100
  55. # 均线趋势
  56. df['EMA5'] = df['Close'].ewm(span=5, adjust=False).mean()
  57. df['EMA20'] = df['Close'].ewm(span=20, adjust=False).mean()
  58. df['EMA60'] = df['Close'].ewm(span=60, adjust=False).mean()
  59. # 趋势评分
  60. df['Trend_Score'] = 0
  61. df.loc[df['Close'] > df['EMA5'], 'Trend_Score'] += 1
  62. df.loc[df['Close'] > df['EMA20'], 'Trend_Score'] += 1
  63. df.loc[df['Close'] > df['EMA60'], 'Trend_Score'] += 1
  64. # 波动率
  65. df['Volatility'] = df['Returns'].rolling(20).std() * np.sqrt(48)
  66. df['Vol_MA'] = df['Volatility'].rolling(20).mean()
  67. df['Vol_Regime'] = '正常'
  68. df.loc[df['Volatility'] > df['Vol_MA'] * 1.5, 'Vol_Regime'] = '高波动'
  69. df.loc[df['Volatility'] < df['Vol_MA'] * 0.5, 'Vol_Regime'] = '低波动'
  70. # 成交量比率
  71. df['Volume_MA20'] = df['Volume'].rolling(20).mean()
  72. df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20']
  73. df.dropna(inplace=True)
  74. print("✅ 指标计算完成")
  75. return df
  76. class OptimizedStrategy:
  77. """优化版T+1交易策略"""
  78. def __init__(self, initial_capital=1000000):
  79. self.initial_capital = initial_capital
  80. self.current_capital = initial_capital
  81. self.trades = []
  82. self.daily_trade_count = {}
  83. self.consecutive_losses = 0
  84. # 策略参数 - 基于五层深挖的最优值(进一步放宽)
  85. self.params = {
  86. 'rsi_low': 30, # RSI下限 - 进一步放宽
  87. 'rsi_high': 60, # RSI上限 - 进一步放宽
  88. 'momentum_min': -2, # 最小动量 - 进一步放宽
  89. 'trend_min': 0, # 最小趋势评分
  90. 'avoid_hour': 13, # 避开的小时
  91. 'avoid_friday': False, # 不避开周五
  92. 'max_daily_trades': 3, # 每日最大交易数 - 放宽
  93. 'position_base': 0.7, # 基础仓位70% - 提高
  94. 'position_max': 1.0, # 最大仓位100%
  95. 'stop_loss': 0.008, # 止损0.8%
  96. 'take_profit': 0.025, # 止盈2.5%
  97. 'max_hold_hours': 16, # 最大持仓16小时
  98. 't1_cutoff_time': '14:30', # T+1截止时间
  99. }
  100. def calculate_position_size(self, row):
  101. """动态仓位计算"""
  102. base = self.params['position_base']
  103. # 趋势因子
  104. trend_factor = min(1.0 + row['Trend_Score'] * 0.2, 1.5)
  105. # 波动率因子
  106. if row['Vol_Regime'] == '低波动':
  107. vol_factor = 1.2
  108. elif row['Vol_Regime'] == '正常':
  109. vol_factor = 1.0
  110. else:
  111. vol_factor = 0.6
  112. # 连续亏损惩罚
  113. loss_factor = max(1.0 - self.consecutive_losses * 0.2, 0.3)
  114. position = base * trend_factor * vol_factor * loss_factor
  115. return min(position, self.params['position_max'])
  116. def should_trade(self, timestamp, row):
  117. """判断是否应该交易"""
  118. # 1. 时间过滤 - 避开13点
  119. if timestamp.hour == self.params['avoid_hour']:
  120. return False, "避开13点"
  121. # 2. 星期过滤 - 避开周五
  122. if self.params['avoid_friday'] and timestamp.dayofweek == 4:
  123. return False, "避开周五"
  124. # 3. T+1过滤 - 14:30后不开仓
  125. cutoff = datetime.strptime(self.params['t1_cutoff_time'], '%H:%M').time()
  126. if timestamp.time() > cutoff:
  127. return False, "避开T+1"
  128. # 4. 每日交易次数限制
  129. date = timestamp.date()
  130. if self.daily_trade_count.get(date, 0) >= self.params['max_daily_trades']:
  131. return False, "已达日交易上限"
  132. # 5. RSI过滤 - 40-50区间
  133. rsi = row['RSI_14']
  134. if not (self.params['rsi_low'] <= rsi <= self.params['rsi_high']):
  135. return False, f"RSI {rsi:.1f} 不在范围内"
  136. # 6. 动量过滤
  137. if row['Momentum_5'] < self.params['momentum_min']:
  138. return False, f"动量 {row['Momentum_5']:.2f} 不足"
  139. # 7. 趋势过滤
  140. if row['Trend_Score'] < self.params['trend_min']:
  141. return False, f"趋势评分 {row['Trend_Score']} 不足"
  142. return True, "通过"
  143. def simulate_exit(self, entry_price, exit_price, entry_time, exit_time, position_size):
  144. """模拟退出,应用止损止盈"""
  145. # 计算理论盈亏比例
  146. pnl_pct = (exit_price - entry_price) / entry_price
  147. # 应用止损
  148. if pnl_pct <= -self.params['stop_loss']:
  149. actual_pnl_pct = -self.params['stop_loss']
  150. exit_reason = '止损'
  151. # 应用止盈
  152. elif pnl_pct >= self.params['take_profit']:
  153. actual_pnl_pct = self.params['take_profit']
  154. exit_reason = '止盈'
  155. else:
  156. actual_pnl_pct = pnl_pct
  157. exit_reason = '正常平仓'
  158. # 计算盈亏金额
  159. position_value = self.current_capital * position_size
  160. pnl_amount = actual_pnl_pct * position_value
  161. return pnl_amount, actual_pnl_pct, exit_reason
  162. def backtest(self, data, trades_df):
  163. """执行回测"""
  164. print("\n" + "="*60)
  165. print("🚀 开始优化策略回测")
  166. print("="*60)
  167. # 将数据与交易合并
  168. t1_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
  169. t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间'])
  170. # 获取指标数据
  171. merged_trades = []
  172. for idx, trade in t1_trades.iterrows():
  173. try:
  174. # 获取开仓时的指标
  175. mask = data.index <= trade['开仓时间']
  176. if not mask.any():
  177. continue
  178. current_idx = data.index[mask][-1]
  179. current_data = data.loc[current_idx]
  180. # 判断是否满足交易条件
  181. should_trade, reason = self.should_trade(trade['开仓时间'], current_data)
  182. if should_trade:
  183. # 计算仓位
  184. position_size = self.calculate_position_size(current_data)
  185. # 模拟退出
  186. pnl_amount, pnl_pct, exit_reason = self.simulate_exit(
  187. trade['开仓价格'],
  188. trade['平仓价格'],
  189. trade['开仓时间'],
  190. trade['平仓时间'],
  191. position_size
  192. )
  193. # 记录交易
  194. merged_trades.append({
  195. '开仓时间': trade['开仓时间'],
  196. '平仓时间': trade['平仓时间'],
  197. '开仓价格': trade['开仓价格'],
  198. '平仓价格': trade['平仓价格'],
  199. '仓位': position_size,
  200. '实际盈亏': pnl_amount,
  201. '盈亏比例': pnl_pct,
  202. '退出原因': exit_reason,
  203. 'RSI': current_data['RSI_14'],
  204. '动量': current_data['Momentum_5'],
  205. '趋势': current_data['Trend_Score'],
  206. })
  207. # 更新资本
  208. self.current_capital += pnl_amount
  209. # 更新每日计数
  210. date = trade['开仓时间'].date()
  211. self.daily_trade_count[date] = self.daily_trade_count.get(date, 0) + 1
  212. # 更新连续亏损
  213. if pnl_amount < 0:
  214. self.consecutive_losses += 1
  215. else:
  216. self.consecutive_losses = 0
  217. except Exception as e:
  218. continue
  219. self.trades = pd.DataFrame(merged_trades)
  220. return self.trades
  221. def report(self):
  222. """生成回测报告"""
  223. if len(self.trades) == 0:
  224. print("⚠️ 没有交易记录")
  225. return
  226. print("\n" + "="*60)
  227. print("📊 优化策略回测报告")
  228. print("="*60)
  229. # 基础统计
  230. total_trades = len(self.trades)
  231. winning_trades = self.trades[self.trades['实际盈亏'] > 0]
  232. losing_trades = self.trades[self.trades['实际盈亏'] < 0]
  233. win_rate = len(winning_trades) / total_trades * 100
  234. total_pnl = self.trades['实际盈亏'].sum()
  235. avg_pnl = self.trades['实际盈亏'].mean()
  236. total_return = (self.current_capital - self.initial_capital) / self.initial_capital * 100
  237. print(f"\n【基础统计】")
  238. print(f" 初始资金: {self.initial_capital:,.0f}元")
  239. print(f" 最终资金: {self.current_capital:,.0f}元")
  240. print(f" 总收益率: {total_return:+.2f}%")
  241. print(f" 总交易次数: {total_trades}笔")
  242. print(f" 盈利次数: {len(winning_trades)}笔")
  243. print(f" 亏损次数: {len(losing_trades)}笔")
  244. print(f" 胜率: {win_rate:.1f}%")
  245. print(f" 总盈亏: {total_pnl:+,.0f}元")
  246. print(f" 平均盈亏: {avg_pnl:+,.0f}元")
  247. # 盈亏比
  248. if len(losing_trades) > 0:
  249. profit_factor = abs(winning_trades['实际盈亏'].sum() / losing_trades['实际盈亏'].sum())
  250. print(f" 盈亏比: {profit_factor:.2f}")
  251. # 退出原因统计
  252. print(f"\n【退出原因统计】")
  253. exit_stats = self.trades.groupby('退出原因').agg({
  254. '实际盈亏': ['count', 'sum', 'mean']
  255. }).round(2)
  256. exit_stats.columns = ['次数', '总盈亏', '平均盈亏']
  257. print(exit_stats.to_string())
  258. # 对比原策略
  259. print(f"\n【与原策略对比】")
  260. original_pnl = -123843 # 原策略亏损
  261. improvement = total_pnl - original_pnl
  262. print(f" 原策略盈亏: {original_pnl:+,.0f}元")
  263. print(f" 优化策略盈亏: {total_pnl:+,.0f}元")
  264. print(f" 改善幅度: {improvement:+,.0f}元")
  265. print(f" 提升倍数: {abs(total_pnl / original_pnl) if original_pnl != 0 else 0:.1f}x")
  266. # 最近交易
  267. print(f"\n【最近5笔交易】")
  268. recent = self.trades.tail(5)[['开仓时间', '平仓时间', '仓位', '实际盈亏', '退出原因']]
  269. print(recent.to_string(index=False))
  270. def main():
  271. """主函数"""
  272. print("="*60)
  273. print("创业板50 T+1 优化策略回测系统")
  274. print("="*60)
  275. # 加载数据
  276. raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
  277. # 计算增强指标
  278. data = calculate_enhanced_indicators(raw_data)
  279. # 获取原策略交易信号
  280. print("\n📡 生成原策略交易信号...")
  281. config_manager = ConfigManager('config.json')
  282. fetcher = IntradayDataFetcher(config_manager)
  283. signal_generator = DualDirectionSignalGenerator()
  284. executor = DualDirectionExecutor(initial_capital=1000000)
  285. data_with_indicators = fetcher.calculate_intraday_indicators(data)
  286. signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
  287. results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
  288. # 运行优化策略回测
  289. strategy = OptimizedStrategy(initial_capital=1000000)
  290. optimized_trades = strategy.backtest(data, trades_df)
  291. # 生成报告
  292. strategy.report()
  293. print("\n" + "="*60)
  294. print("✅ 回测完成")
  295. print("="*60)
  296. if __name__ == '__main__':
  297. main()