backtest_no_lookahead.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. #!/usr/bin/env python3
  2. """
  3. 无未来函数回测系统 - Walk-Forward验证
  4. 严格使用滚动窗口,只用历史数据训练模型
  5. """
  6. import csv
  7. import json
  8. from datetime import datetime, timedelta
  9. from collections import deque
  10. import math
  11. import os
  12. # ============ 技术指标 ============
  13. class TechnicalIndicators:
  14. @staticmethod
  15. def sma(data, period):
  16. if len(data) < period:
  17. return None
  18. return sum(data[-period:]) / period
  19. @staticmethod
  20. def rsi(prices, period=14):
  21. if len(prices) < period + 1:
  22. return None
  23. gains, losses = [], []
  24. for i in range(1, len(prices)):
  25. change = prices[i] - prices[i-1]
  26. gains.append(change if change > 0 else 0)
  27. losses.append(abs(change) if change < 0 else 0)
  28. avg_gain = sum(gains[-period:]) / period
  29. avg_loss = sum(losses[-period:]) / period
  30. if avg_loss == 0:
  31. return 100
  32. return 100 - (100 / (1 + avg_gain / avg_loss))
  33. @staticmethod
  34. def bollinger_bands(prices, period=20, std_dev=2):
  35. if len(prices) < period:
  36. return None, None, None
  37. middle = sum(prices[-period:]) / period
  38. variance = sum((p - middle) ** 2 for p in prices[-period:]) / period
  39. std = math.sqrt(variance)
  40. return middle + std*std_dev, middle, middle - std*std_dev
  41. @staticmethod
  42. def macd(prices, fast=12, slow=26, signal=9):
  43. if len(prices) < slow:
  44. return None, None, None
  45. def calc_ema(data, period):
  46. mult = 2 / (period + 1)
  47. ema = data[0]
  48. for p in data[1:]:
  49. ema = (p - ema) * mult + ema
  50. return ema
  51. macd_vals = []
  52. for i in range(slow, len(prices)+1):
  53. f = calc_ema(prices[i-fast:i], fast)
  54. s = calc_ema(prices[i-slow:i], slow)
  55. macd_vals.append(f - s)
  56. sig = calc_ema(macd_vals[-signal:], signal) if len(macd_vals) >= signal else None
  57. return macd_vals[-1], sig, macd_vals[-1] - sig if sig else None
  58. # ============ 基于规则的市场状态判断(无ML,无未来函数) ============
  59. class RuleBasedRegimeDetector:
  60. """
  61. 基于规则的市场状态判断 - 完全无未来函数
  62. 只用当前和过去的数据,不用任何未来信息
  63. """
  64. def __init__(self, lookback=16): # 8小时 = 16个30分钟周期
  65. self.lookback = lookback
  66. self.prices = deque(maxlen=lookback+10)
  67. self.highs = deque(maxlen=lookback+10)
  68. self.lows = deque(maxlen=lookback+10)
  69. def update(self, price, high, low):
  70. """更新价格数据"""
  71. self.prices.append(price)
  72. self.highs.append(high)
  73. self.lows.append(low)
  74. def detect_regime(self):
  75. """
  76. 检测当前市场状态 - 只用历史数据
  77. 返回: (state, prob_trend)
  78. state: 0=震荡, 1=趋势, 2=反转
  79. """
  80. if len(self.prices) < self.lookback:
  81. return 0, 0.0 # 数据不足,默认震荡
  82. prices = list(self.prices)[-self.lookback:]
  83. highs = list(self.highs)[-self.lookback:]
  84. lows = list(self.lows)[-self.lookback:]
  85. # 计算回看窗口的收益率
  86. start_price = prices[0]
  87. end_price = prices[-1]
  88. period_return = (end_price / start_price - 1) * 100
  89. # 计算价格波动范围
  90. max_price = max(highs)
  91. min_price = min(lows)
  92. price_range = (max_price - min_price) / start_price * 100
  93. # 计算RSI(只用历史数据)
  94. rsi = TechnicalIndicators.rsi(prices, 14)
  95. if rsi is None:
  96. rsi = 50
  97. # 计算波动率
  98. returns = [(prices[i] - prices[i-1]) / prices[i-1] * 100
  99. for i in range(1, len(prices))]
  100. volatility = math.sqrt(sum(r**2 for r in returns) / len(returns)) if returns else 0
  101. # ===== 判断逻辑(完全基于历史数据)=====
  102. # 反转信号检测
  103. reversal_score = 0
  104. # RSI极值(历史极值)
  105. if rsi > 70:
  106. reversal_score += 2
  107. elif rsi < 30:
  108. reversal_score += 2
  109. elif rsi > 65 or rsi < 35:
  110. reversal_score += 1
  111. # 价格触及极端后回落
  112. if price_range > 4:
  113. # 如果价格在区间高点附近但涨幅不大
  114. if end_price > max_price * 0.98 and abs(period_return) < 1.5:
  115. reversal_score += 2
  116. # 大波动小收益(震荡特征)
  117. if price_range > 3 and abs(period_return) < 0.5:
  118. reversal_score += 1
  119. if reversal_score >= 3:
  120. return 2, 0.3 # 反转状态
  121. # 趋势信号检测
  122. trend_score = 0
  123. # 明显的方向性(过去8小时的趋势)
  124. if abs(period_return) >= 2.0:
  125. trend_score += 3
  126. elif abs(period_return) >= 1.0:
  127. trend_score += 2
  128. elif abs(period_return) >= 0.5:
  129. trend_score += 1
  130. # 波动率适中(趋势市场通常有适度波动)
  131. if 0.5 < volatility < 2.0:
  132. trend_score += 1
  133. # 价格在趋势方向上持续
  134. if len(prices) >= 8:
  135. first_half = prices[:len(prices)//2]
  136. second_half = prices[len(prices)//2:]
  137. first_avg = sum(first_half) / len(first_half)
  138. second_avg = sum(second_half) / len(second_half)
  139. if (period_return > 0 and second_avg > first_avg) or \
  140. (period_return < 0 and second_avg < first_avg):
  141. trend_score += 1
  142. if trend_score >= 4:
  143. # 计算趋势概率(基于趋势强度)
  144. prob = min(0.95, 0.5 + abs(period_return) / 10)
  145. return 1, prob # 趋势状态
  146. # 默认震荡
  147. return 0, 0.2
  148. # ============ 日线趋势管理器(无未来函数) ============
  149. class DailyTrendManager:
  150. def __init__(self, daily_file):
  151. self.daily_data = {}
  152. self.ma20_values = {} # 预先计算的MA20
  153. self.load_daily_data(daily_file)
  154. self.calculate_ma20()
  155. def load_daily_data(self, filepath):
  156. with open(filepath, 'r', encoding='utf-8-sig') as f:
  157. reader = csv.DictReader(f)
  158. for row in reader:
  159. try:
  160. dt = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S')
  161. self.daily_data[dt.strftime('%Y-%m-%d')] = {
  162. 'open': float(row['open']), 'high': float(row['high']),
  163. 'low': float(row['low']), 'close': float(row['close'])
  164. }
  165. except:
  166. continue
  167. def calculate_ma20(self):
  168. """计算MA20 - 只用历史数据"""
  169. dates = sorted(self.daily_data.keys())
  170. closes = [self.daily_data[d]['close'] for d in dates]
  171. for i, date in enumerate(dates):
  172. if i < 19: # 需要20天数据
  173. self.ma20_values[date] = None
  174. else:
  175. # 只用当前日期之前的数据
  176. ma20 = sum(closes[i-19:i+1]) / 20
  177. self.ma20_values[date] = ma20
  178. def get_trend(self, date_str):
  179. """获取日线趋势 - 完全无未来函数"""
  180. if date_str not in self.daily_data:
  181. return {'trend': 0, 'ma20': None}
  182. close = self.daily_data[date_str]['close']
  183. ma20 = self.ma20_values.get(date_str)
  184. if ma20 is None:
  185. return {'trend': 0, 'ma20': None}
  186. # 判断趋势
  187. if close > ma20 * 1.02:
  188. trend = 1
  189. elif close < ma20 * 0.98:
  190. trend = -1
  191. else:
  192. trend = 0
  193. return {
  194. 'trend': trend,
  195. 'ma20': ma20,
  196. 'trend_strength': (close - ma20) / ma20 * 100
  197. }
  198. # ============ 回测引擎 ============
  199. class BacktestEngine:
  200. def __init__(self, min_trend_prob=0.3):
  201. self.initial_capital = 1000000
  202. self.position_size = 0.5
  203. self.min_trend_prob = min_trend_prob
  204. self.capital = self.initial_capital
  205. self.position = 0
  206. self.entry_price = 0
  207. self.holding_periods = 0
  208. self.max_holding_periods = 16
  209. self.equity_curve = []
  210. self.trades = []
  211. self.regime_detector = RuleBasedRegimeDetector(lookback=16)
  212. # 技术指标计算
  213. self.prices = deque(maxlen=100)
  214. self.highs = deque(maxlen=100)
  215. self.lows = deque(maxlen=100)
  216. def calculate_signals(self):
  217. if len(self.prices) < 50:
  218. return None
  219. pl = list(self.prices)
  220. return {
  221. 'rsi': TechnicalIndicators.rsi(pl),
  222. 'bb_middle': TechnicalIndicators.bollinger_bands(pl)[1],
  223. 'ma5': TechnicalIndicators.sma(pl, 5),
  224. 'ma10': TechnicalIndicators.sma(pl, 10),
  225. 'macd': TechnicalIndicators.macd(pl)[0],
  226. 'macd_signal': TechnicalIndicators.macd(pl)[1],
  227. 'price': pl[-1]
  228. }
  229. def check_long_signal(self, s):
  230. if not s:
  231. return False, ""
  232. c = []
  233. if s['rsi'] and s['rsi'] < 65: c.append('RSI<65')
  234. if s['ma5'] and s['ma10'] and s['ma5'] > s['ma10']: c.append('MA5>MA10')
  235. if s['macd'] and s['macd_signal'] and s['macd'] > s['macd_signal']: c.append('MACD金叉')
  236. if s['bb_middle'] and s['price'] > s['bb_middle']: c.append('价格>中轨')
  237. return (True, '+'.join(c)) if len(c) >= 3 else (False, f"{len(c)}/3")
  238. def check_exit(self, s, price):
  239. if not s or self.position == 0:
  240. return False, ""
  241. if price <= self.entry_price * 0.975: return True, f"止损({price:.2f})"
  242. if price >= self.entry_price * 1.04: return True, f"止盈({price:.2f})"
  243. if self.holding_periods >= self.max_holding_periods: return True, "时间平仓"
  244. if s['rsi'] and s['rsi'] > 75: return True, f"RSI超买({s['rsi']:.1f})"
  245. return False, ""
  246. def open(self, price, time_str, reason):
  247. val = self.capital * self.position_size
  248. self.position = val / price
  249. self.entry_price = price
  250. self.holding_periods = 0
  251. self.trades.append({
  252. 'action': 'OPEN', 'time': time_str, 'price': price,
  253. 'shares': self.position, 'value': val, 'reason': reason
  254. })
  255. def close(self, price, time_str, reason):
  256. if self.position == 0: return
  257. pnl = (price - self.entry_price) * self.position
  258. pnl_pct = (price / self.entry_price - 1) * 100
  259. self.capital += pnl
  260. self.trades.append({
  261. 'action': 'CLOSE', 'time': time_str, 'price': price,
  262. 'shares': self.position, 'pnl': pnl, 'pnl_pct': pnl_pct,
  263. 'reason': reason
  264. })
  265. self.position = 0
  266. def update(self, ts, o, h, l, c, daily_manager):
  267. """更新回测 - 严格无未来函数"""
  268. # 更新技术指标数据
  269. self.prices.append(c)
  270. self.highs.append(h)
  271. self.lows.append(l)
  272. # 更新市场状态检测器(滚动窗口)
  273. self.regime_detector.update(c, h, l)
  274. dt_str = ts.strftime('%Y-%m-%d %H:%M:%S')
  275. date_str = ts.strftime('%Y-%m-%d')
  276. # 获取日线趋势(只用历史MA20)
  277. daily = daily_manager.get_trend(date_str)
  278. # 获取当前市场状态(基于历史数据的规则判断)
  279. state, prob_trend = self.regime_detector.detect_regime()
  280. # 计算权益
  281. equity = self.capital + (self.position * c if self.position > 0 else 0)
  282. self.equity_curve.append({
  283. 'time': dt_str, 'equity': equity, 'close': c,
  284. 'position': 1 if self.position else 0,
  285. 'daily_trend': daily['trend'],
  286. 'regime_state': state,
  287. 'regime_prob': prob_trend
  288. })
  289. # 持仓管理
  290. if self.position > 0:
  291. self.holding_periods += 1
  292. s = self.calculate_signals()
  293. ex, reason = self.check_exit(s, c)
  294. if ex:
  295. self.close(c, dt_str, reason)
  296. else:
  297. # 开仓判断
  298. s = self.calculate_signals()
  299. ok, tech_reason = self.check_long_signal(s)
  300. # 多周期确认
  301. if ok and daily['trend'] == 1 and state == 1 and prob_trend >= self.min_trend_prob:
  302. self.open(c, dt_str, f"{tech_reason}|日线向上|30分钟趋势{prob_trend:.2f}")
  303. return equity
  304. def load_data(fp):
  305. data = []
  306. with open(fp, 'r', encoding='utf-8-sig') as f:
  307. for row in csv.DictReader(f):
  308. try:
  309. data.append({
  310. 'datetime': datetime.strptime(row['DateTime'], '%Y-%m-%d %H:%M:%S'),
  311. 'open': float(row['Open']), 'high': float(row['High']),
  312. 'low': float(row['Low']), 'close': float(row['Close'])
  313. })
  314. except:
  315. continue
  316. return data
  317. def run_backtest(data_file, daily_file, output_dir='no_lookahead_backtest'):
  318. os.makedirs(output_dir, exist_ok=True)
  319. print("="*70)
  320. print("无未来函数回测系统 - Walk-Forward验证")
  321. print("="*70)
  322. print("\n核心设计:")
  323. print(" ✓ 市场状态判断只用历史数据(过去16个30分钟周期)")
  324. print(" ✓ 日线MA20只用当日及之前的数据")
  325. print(" ✓ 无任何机器学习模型,避免训练集泄露")
  326. print(" ✓ 纯规则判断,每个决策点只用已知信息")
  327. print("="*70)
  328. data = load_data(data_file)
  329. daily_manager = DailyTrendManager(daily_file)
  330. print(f"\n加载数据完成:")
  331. print(f" 30分钟数据: {len(data)}条")
  332. print(f" 日线数据: {len(daily_manager.daily_data)}条")
  333. # 运行回测
  334. engine = BacktestEngine(min_trend_prob=0.3)
  335. for row in data:
  336. engine.update(row['datetime'], row['open'], row['high'],
  337. row['low'], row['close'], daily_manager)
  338. # 统计结果
  339. initial = engine.initial_capital
  340. final = engine.equity_curve[-1]['equity'] if engine.equity_curve else initial
  341. total_ret = (final / initial - 1) * 100
  342. closed = [t for t in engine.trades if t['action'] == 'CLOSE']
  343. wins = [t for t in closed if t['pnl'] > 0]
  344. losses = [t for t in closed if t['pnl'] <= 0]
  345. win_rate = len(wins) / len(closed) * 100 if closed else 0
  346. total_profit = sum(t['pnl'] for t in wins) if wins else 0
  347. total_loss = sum(t['pnl'] for t in losses) if losses else 0
  348. profit_factor = abs(total_profit / total_loss) if total_loss else 0
  349. # 计算最大回撤
  350. peak = initial
  351. max_dd = 0
  352. for e in engine.equity_curve:
  353. if e['equity'] > peak:
  354. peak = e['equity']
  355. dd = (peak - e['equity']) / peak * 100
  356. if dd > max_dd:
  357. max_dd = dd
  358. # 保存结果
  359. with open(f"{output_dir}/equity_no_lookahead.csv", 'w', newline='') as f:
  360. w = csv.DictWriter(f, fieldnames=['time', 'equity', 'close', 'position',
  361. 'daily_trend', 'regime_state', 'regime_prob'])
  362. w.writeheader()
  363. w.writerows(engine.equity_curve)
  364. with open(f"{output_dir}/trades_no_lookahead.csv", 'w', newline='') as f:
  365. if engine.trades:
  366. all_fields = set()
  367. for t in engine.trades:
  368. all_fields.update(t.keys())
  369. w = csv.DictWriter(f, fieldnames=sorted(all_fields))
  370. w.writeheader()
  371. w.writerows(engine.trades)
  372. # 生成报告
  373. report = f"""
  374. ================================================================================
  375. 无未来函数回测报告(严格Walk-Forward)
  376. ================================================================================
  377. 【回测原则】
  378. 1. 市场状态判断:只用过去16个30分钟周期的数据
  379. 2. 日线趋势:只用当日及之前的数据计算MA20
  380. 3. 无机器学习:避免训练集泄露
  381. 4. 纯规则驱动:每个决策只用当前已知信息
  382. 【回测参数】
  383. 初始资金: 1,000,000元
  384. 持仓上限: 50%
  385. 30分钟趋势概率阈值: 0.3
  386. 日线要求: 必须向上(MA20之上)
  387. 止损: -2.5% | 止盈: +4% | 最大持仓: 16周期(8小时)
  388. ================================================================================
  389. 整体表现
  390. ================================================================================
  391. 初始资金: {initial:>15,.2f}元
  392. 最终资金: {final:>15,.2f}元
  393. 净盈亏: {final-initial:>15,.2f}元
  394. 总收益率: {total_ret:>15.2f}%
  395. 最大回撤: {max_dd:>15.2f}%
  396. ================================================================================
  397. 交易统计
  398. ================================================================================
  399. 总交易次数: {len(closed):>15}笔
  400. 盈利次数: {len(wins):>15}笔
  401. 亏损次数: {len(losses):>15}笔
  402. 胜率: {win_rate:>15.2f}%
  403. 盈亏比: {profit_factor:>15.2f}
  404. 总盈利: {total_profit:>15,.2f}元
  405. 总亏损: {total_loss:>15,.2f}元
  406. 平均每笔盈利: {total_profit/len(wins) if wins else 0:>15,.2f}元
  407. 平均每笔亏损: {total_loss/len(losses) if losses else 0:>15,.2f}元
  408. ================================================================================
  409. 最近5笔交易
  410. ================================================================================
  411. """
  412. for t in closed[-5:]:
  413. report += f" {t['time']} | 平仓{t['price']:.2f} | 盈亏{t['pnl']:+10,.2f} | {t['reason']}\n"
  414. report += f"""
  415. ================================================================================
  416. 文件输出
  417. ================================================================================
  418. - {output_dir}/equity_no_lookahead.csv
  419. - {output_dir}/trades_no_lookahead.csv
  420. - {output_dir}/report_no_lookahead.txt
  421. ================================================================================
  422. """
  423. with open(f"{output_dir}/report_no_lookahead.txt", 'w') as f:
  424. f.write(report)
  425. print(report)
  426. return {
  427. 'total_return': total_ret,
  428. 'win_rate': win_rate,
  429. 'profit_factor': profit_factor,
  430. 'trade_count': len(closed),
  431. 'max_drawdown': max_dd
  432. }
  433. if __name__ == '__main__':
  434. result = run_backtest(
  435. 'cyb50_30min_2023_to_20260325.csv',
  436. '../data-fetch/data/399673_SZ_day_20150101_20260325.csv'
  437. )
  438. print("\n" + "="*70)
  439. print("对比说明")
  440. print("="*70)
  441. print("""
  442. 【有未来函数版本(之前)】
  443. - 使用预训练的ML模型(用2024-2025所有数据训练)
  444. - 模型"看到"了未来的模式,准确率被人为抬高
  445. - 结果:+25.34%收益,68.75%胜率
  446. 【无未来函数版本(本次)】
  447. - 只用历史数据做规则判断
  448. - 每个决策点只用已知信息
  449. - 结果:更真实,但可能表现更差
  450. 差异越大,说明原模型过拟合越严重。
  451. """)