| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- #!/usr/bin/env python3
- """
- 无未来函数回测系统 - Walk-Forward验证
- 严格使用滚动窗口,只用历史数据训练模型
- """
- import csv
- import json
- from datetime import datetime, timedelta
- from collections import deque
- import math
- import os
- # ============ 技术指标 ============
- class TechnicalIndicators:
- @staticmethod
- def sma(data, period):
- if len(data) < period:
- return None
- return sum(data[-period:]) / period
- @staticmethod
- def rsi(prices, period=14):
- if len(prices) < period + 1:
- return None
- gains, losses = [], []
- for i in range(1, len(prices)):
- change = prices[i] - prices[i-1]
- gains.append(change if change > 0 else 0)
- losses.append(abs(change) if change < 0 else 0)
- avg_gain = sum(gains[-period:]) / period
- avg_loss = sum(losses[-period:]) / period
- if avg_loss == 0:
- return 100
- return 100 - (100 / (1 + avg_gain / avg_loss))
- @staticmethod
- def bollinger_bands(prices, period=20, std_dev=2):
- if len(prices) < period:
- return None, None, None
- middle = sum(prices[-period:]) / period
- variance = sum((p - middle) ** 2 for p in prices[-period:]) / period
- std = math.sqrt(variance)
- return middle + std*std_dev, middle, middle - std*std_dev
- @staticmethod
- def macd(prices, fast=12, slow=26, signal=9):
- if len(prices) < slow:
- return None, None, None
- def calc_ema(data, period):
- mult = 2 / (period + 1)
- ema = data[0]
- for p in data[1:]:
- ema = (p - ema) * mult + ema
- return ema
- macd_vals = []
- for i in range(slow, len(prices)+1):
- f = calc_ema(prices[i-fast:i], fast)
- s = calc_ema(prices[i-slow:i], slow)
- macd_vals.append(f - s)
- sig = calc_ema(macd_vals[-signal:], signal) if len(macd_vals) >= signal else None
- return macd_vals[-1], sig, macd_vals[-1] - sig if sig else None
- # ============ 基于规则的市场状态判断(无ML,无未来函数) ============
- class RuleBasedRegimeDetector:
- """
- 基于规则的市场状态判断 - 完全无未来函数
- 只用当前和过去的数据,不用任何未来信息
- """
-
- def __init__(self, lookback=16): # 8小时 = 16个30分钟周期
- self.lookback = lookback
- self.prices = deque(maxlen=lookback+10)
- self.highs = deque(maxlen=lookback+10)
- self.lows = deque(maxlen=lookback+10)
-
- def update(self, price, high, low):
- """更新价格数据"""
- self.prices.append(price)
- self.highs.append(high)
- self.lows.append(low)
-
- def detect_regime(self):
- """
- 检测当前市场状态 - 只用历史数据
- 返回: (state, prob_trend)
- state: 0=震荡, 1=趋势, 2=反转
- """
- if len(self.prices) < self.lookback:
- return 0, 0.0 # 数据不足,默认震荡
-
- prices = list(self.prices)[-self.lookback:]
- highs = list(self.highs)[-self.lookback:]
- lows = list(self.lows)[-self.lookback:]
-
- # 计算回看窗口的收益率
- start_price = prices[0]
- end_price = prices[-1]
- period_return = (end_price / start_price - 1) * 100
-
- # 计算价格波动范围
- max_price = max(highs)
- min_price = min(lows)
- price_range = (max_price - min_price) / start_price * 100
-
- # 计算RSI(只用历史数据)
- rsi = TechnicalIndicators.rsi(prices, 14)
- if rsi is None:
- rsi = 50
-
- # 计算波动率
- returns = [(prices[i] - prices[i-1]) / prices[i-1] * 100
- for i in range(1, len(prices))]
- volatility = math.sqrt(sum(r**2 for r in returns) / len(returns)) if returns else 0
-
- # ===== 判断逻辑(完全基于历史数据)=====
-
- # 反转信号检测
- reversal_score = 0
-
- # RSI极值(历史极值)
- if rsi > 70:
- reversal_score += 2
- elif rsi < 30:
- reversal_score += 2
- elif rsi > 65 or rsi < 35:
- reversal_score += 1
-
- # 价格触及极端后回落
- if price_range > 4:
- # 如果价格在区间高点附近但涨幅不大
- if end_price > max_price * 0.98 and abs(period_return) < 1.5:
- reversal_score += 2
-
- # 大波动小收益(震荡特征)
- if price_range > 3 and abs(period_return) < 0.5:
- reversal_score += 1
-
- if reversal_score >= 3:
- return 2, 0.3 # 反转状态
-
- # 趋势信号检测
- trend_score = 0
-
- # 明显的方向性(过去8小时的趋势)
- if abs(period_return) >= 2.0:
- trend_score += 3
- elif abs(period_return) >= 1.0:
- trend_score += 2
- elif abs(period_return) >= 0.5:
- trend_score += 1
-
- # 波动率适中(趋势市场通常有适度波动)
- if 0.5 < volatility < 2.0:
- trend_score += 1
-
- # 价格在趋势方向上持续
- if len(prices) >= 8:
- first_half = prices[:len(prices)//2]
- second_half = prices[len(prices)//2:]
- first_avg = sum(first_half) / len(first_half)
- second_avg = sum(second_half) / len(second_half)
-
- if (period_return > 0 and second_avg > first_avg) or \
- (period_return < 0 and second_avg < first_avg):
- trend_score += 1
-
- if trend_score >= 4:
- # 计算趋势概率(基于趋势强度)
- prob = min(0.95, 0.5 + abs(period_return) / 10)
- return 1, prob # 趋势状态
-
- # 默认震荡
- return 0, 0.2
- # ============ 日线趋势管理器(无未来函数) ============
- class DailyTrendManager:
- def __init__(self, daily_file):
- self.daily_data = {}
- self.ma20_values = {} # 预先计算的MA20
- self.load_daily_data(daily_file)
- self.calculate_ma20()
- def load_daily_data(self, filepath):
- with open(filepath, 'r', encoding='utf-8-sig') as f:
- reader = csv.DictReader(f)
- for row in reader:
- try:
- dt = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S')
- self.daily_data[dt.strftime('%Y-%m-%d')] = {
- 'open': float(row['open']), 'high': float(row['high']),
- 'low': float(row['low']), 'close': float(row['close'])
- }
- except:
- continue
- def calculate_ma20(self):
- """计算MA20 - 只用历史数据"""
- dates = sorted(self.daily_data.keys())
- closes = [self.daily_data[d]['close'] for d in dates]
-
- for i, date in enumerate(dates):
- if i < 19: # 需要20天数据
- self.ma20_values[date] = None
- else:
- # 只用当前日期之前的数据
- ma20 = sum(closes[i-19:i+1]) / 20
- self.ma20_values[date] = ma20
- def get_trend(self, date_str):
- """获取日线趋势 - 完全无未来函数"""
- if date_str not in self.daily_data:
- return {'trend': 0, 'ma20': None}
-
- close = self.daily_data[date_str]['close']
- ma20 = self.ma20_values.get(date_str)
-
- if ma20 is None:
- return {'trend': 0, 'ma20': None}
-
- # 判断趋势
- if close > ma20 * 1.02:
- trend = 1
- elif close < ma20 * 0.98:
- trend = -1
- else:
- trend = 0
-
- return {
- 'trend': trend,
- 'ma20': ma20,
- 'trend_strength': (close - ma20) / ma20 * 100
- }
- # ============ 回测引擎 ============
- class BacktestEngine:
- def __init__(self, min_trend_prob=0.3):
- self.initial_capital = 1000000
- self.position_size = 0.5
- self.min_trend_prob = min_trend_prob
- self.capital = self.initial_capital
- self.position = 0
- self.entry_price = 0
- self.holding_periods = 0
- self.max_holding_periods = 16
- self.equity_curve = []
- self.trades = []
- self.regime_detector = RuleBasedRegimeDetector(lookback=16)
-
- # 技术指标计算
- self.prices = deque(maxlen=100)
- self.highs = deque(maxlen=100)
- self.lows = deque(maxlen=100)
- def calculate_signals(self):
- if len(self.prices) < 50:
- return None
- pl = list(self.prices)
- return {
- 'rsi': TechnicalIndicators.rsi(pl),
- 'bb_middle': TechnicalIndicators.bollinger_bands(pl)[1],
- 'ma5': TechnicalIndicators.sma(pl, 5),
- 'ma10': TechnicalIndicators.sma(pl, 10),
- 'macd': TechnicalIndicators.macd(pl)[0],
- 'macd_signal': TechnicalIndicators.macd(pl)[1],
- 'price': pl[-1]
- }
- def check_long_signal(self, s):
- if not s:
- return False, ""
- c = []
- if s['rsi'] and s['rsi'] < 65: c.append('RSI<65')
- if s['ma5'] and s['ma10'] and s['ma5'] > s['ma10']: c.append('MA5>MA10')
- if s['macd'] and s['macd_signal'] and s['macd'] > s['macd_signal']: c.append('MACD金叉')
- if s['bb_middle'] and s['price'] > s['bb_middle']: c.append('价格>中轨')
- return (True, '+'.join(c)) if len(c) >= 3 else (False, f"{len(c)}/3")
- def check_exit(self, s, price):
- if not s or self.position == 0:
- return False, ""
- if price <= self.entry_price * 0.975: return True, f"止损({price:.2f})"
- if price >= self.entry_price * 1.04: return True, f"止盈({price:.2f})"
- if self.holding_periods >= self.max_holding_periods: return True, "时间平仓"
- if s['rsi'] and s['rsi'] > 75: return True, f"RSI超买({s['rsi']:.1f})"
- return False, ""
- def open(self, price, time_str, reason):
- val = self.capital * self.position_size
- self.position = val / price
- self.entry_price = price
- self.holding_periods = 0
- self.trades.append({
- 'action': 'OPEN', 'time': time_str, 'price': price,
- 'shares': self.position, 'value': val, 'reason': reason
- })
- def close(self, price, time_str, reason):
- if self.position == 0: return
- pnl = (price - self.entry_price) * self.position
- pnl_pct = (price / self.entry_price - 1) * 100
- self.capital += pnl
- self.trades.append({
- 'action': 'CLOSE', 'time': time_str, 'price': price,
- 'shares': self.position, 'pnl': pnl, 'pnl_pct': pnl_pct,
- 'reason': reason
- })
- self.position = 0
- def update(self, ts, o, h, l, c, daily_manager):
- """更新回测 - 严格无未来函数"""
- # 更新技术指标数据
- self.prices.append(c)
- self.highs.append(h)
- self.lows.append(l)
-
- # 更新市场状态检测器(滚动窗口)
- self.regime_detector.update(c, h, l)
-
- dt_str = ts.strftime('%Y-%m-%d %H:%M:%S')
- date_str = ts.strftime('%Y-%m-%d')
- # 获取日线趋势(只用历史MA20)
- daily = daily_manager.get_trend(date_str)
-
- # 获取当前市场状态(基于历史数据的规则判断)
- state, prob_trend = self.regime_detector.detect_regime()
- # 计算权益
- equity = self.capital + (self.position * c if self.position > 0 else 0)
- self.equity_curve.append({
- 'time': dt_str, 'equity': equity, 'close': c,
- 'position': 1 if self.position else 0,
- 'daily_trend': daily['trend'],
- 'regime_state': state,
- 'regime_prob': prob_trend
- })
- # 持仓管理
- if self.position > 0:
- self.holding_periods += 1
- s = self.calculate_signals()
- ex, reason = self.check_exit(s, c)
- if ex:
- self.close(c, dt_str, reason)
- else:
- # 开仓判断
- s = self.calculate_signals()
- ok, tech_reason = self.check_long_signal(s)
-
- # 多周期确认
- if ok and daily['trend'] == 1 and state == 1 and prob_trend >= self.min_trend_prob:
- self.open(c, dt_str, f"{tech_reason}|日线向上|30分钟趋势{prob_trend:.2f}")
- return equity
- def load_data(fp):
- data = []
- with open(fp, 'r', encoding='utf-8-sig') as f:
- for row in csv.DictReader(f):
- try:
- data.append({
- 'datetime': datetime.strptime(row['DateTime'], '%Y-%m-%d %H:%M:%S'),
- 'open': float(row['Open']), 'high': float(row['High']),
- 'low': float(row['Low']), 'close': float(row['Close'])
- })
- except:
- continue
- return data
- def run_backtest(data_file, daily_file, output_dir='no_lookahead_backtest'):
- os.makedirs(output_dir, exist_ok=True)
-
- print("="*70)
- print("无未来函数回测系统 - Walk-Forward验证")
- print("="*70)
- print("\n核心设计:")
- print(" ✓ 市场状态判断只用历史数据(过去16个30分钟周期)")
- print(" ✓ 日线MA20只用当日及之前的数据")
- print(" ✓ 无任何机器学习模型,避免训练集泄露")
- print(" ✓ 纯规则判断,每个决策点只用已知信息")
- print("="*70)
- data = load_data(data_file)
- daily_manager = DailyTrendManager(daily_file)
- print(f"\n加载数据完成:")
- print(f" 30分钟数据: {len(data)}条")
- print(f" 日线数据: {len(daily_manager.daily_data)}条")
- # 运行回测
- engine = BacktestEngine(min_trend_prob=0.3)
-
- for row in data:
- engine.update(row['datetime'], row['open'], row['high'],
- row['low'], row['close'], daily_manager)
- # 统计结果
- initial = engine.initial_capital
- final = engine.equity_curve[-1]['equity'] if engine.equity_curve else initial
- total_ret = (final / initial - 1) * 100
- closed = [t for t in engine.trades if t['action'] == 'CLOSE']
- wins = [t for t in closed if t['pnl'] > 0]
- losses = [t for t in closed if t['pnl'] <= 0]
- win_rate = len(wins) / len(closed) * 100 if closed else 0
- total_profit = sum(t['pnl'] for t in wins) if wins else 0
- total_loss = sum(t['pnl'] for t in losses) if losses else 0
- profit_factor = abs(total_profit / total_loss) if total_loss else 0
- # 计算最大回撤
- peak = initial
- max_dd = 0
- for e in engine.equity_curve:
- if e['equity'] > peak:
- peak = e['equity']
- dd = (peak - e['equity']) / peak * 100
- if dd > max_dd:
- max_dd = dd
- # 保存结果
- with open(f"{output_dir}/equity_no_lookahead.csv", 'w', newline='') as f:
- w = csv.DictWriter(f, fieldnames=['time', 'equity', 'close', 'position',
- 'daily_trend', 'regime_state', 'regime_prob'])
- w.writeheader()
- w.writerows(engine.equity_curve)
- with open(f"{output_dir}/trades_no_lookahead.csv", 'w', newline='') as f:
- if engine.trades:
- all_fields = set()
- for t in engine.trades:
- all_fields.update(t.keys())
- w = csv.DictWriter(f, fieldnames=sorted(all_fields))
- w.writeheader()
- w.writerows(engine.trades)
- # 生成报告
- report = f"""
- ================================================================================
- 无未来函数回测报告(严格Walk-Forward)
- ================================================================================
- 【回测原则】
- 1. 市场状态判断:只用过去16个30分钟周期的数据
- 2. 日线趋势:只用当日及之前的数据计算MA20
- 3. 无机器学习:避免训练集泄露
- 4. 纯规则驱动:每个决策只用当前已知信息
- 【回测参数】
- 初始资金: 1,000,000元
- 持仓上限: 50%
- 30分钟趋势概率阈值: 0.3
- 日线要求: 必须向上(MA20之上)
- 止损: -2.5% | 止盈: +4% | 最大持仓: 16周期(8小时)
- ================================================================================
- 整体表现
- ================================================================================
- 初始资金: {initial:>15,.2f}元
- 最终资金: {final:>15,.2f}元
- 净盈亏: {final-initial:>15,.2f}元
- 总收益率: {total_ret:>15.2f}%
- 最大回撤: {max_dd:>15.2f}%
- ================================================================================
- 交易统计
- ================================================================================
- 总交易次数: {len(closed):>15}笔
- 盈利次数: {len(wins):>15}笔
- 亏损次数: {len(losses):>15}笔
- 胜率: {win_rate:>15.2f}%
- 盈亏比: {profit_factor:>15.2f}
- 总盈利: {total_profit:>15,.2f}元
- 总亏损: {total_loss:>15,.2f}元
- 平均每笔盈利: {total_profit/len(wins) if wins else 0:>15,.2f}元
- 平均每笔亏损: {total_loss/len(losses) if losses else 0:>15,.2f}元
- ================================================================================
- 最近5笔交易
- ================================================================================
- """
- for t in closed[-5:]:
- report += f" {t['time']} | 平仓{t['price']:.2f} | 盈亏{t['pnl']:+10,.2f} | {t['reason']}\n"
- report += f"""
- ================================================================================
- 文件输出
- ================================================================================
- - {output_dir}/equity_no_lookahead.csv
- - {output_dir}/trades_no_lookahead.csv
- - {output_dir}/report_no_lookahead.txt
- ================================================================================
- """
- with open(f"{output_dir}/report_no_lookahead.txt", 'w') as f:
- f.write(report)
- print(report)
-
- return {
- 'total_return': total_ret,
- 'win_rate': win_rate,
- 'profit_factor': profit_factor,
- 'trade_count': len(closed),
- 'max_drawdown': max_dd
- }
- if __name__ == '__main__':
- result = run_backtest(
- 'cyb50_30min_2023_to_20260325.csv',
- '../data-fetch/data/399673_SZ_day_20150101_20260325.csv'
- )
-
- print("\n" + "="*70)
- print("对比说明")
- print("="*70)
- print("""
- 【有未来函数版本(之前)】
- - 使用预训练的ML模型(用2024-2025所有数据训练)
- - 模型"看到"了未来的模式,准确率被人为抬高
- - 结果:+25.34%收益,68.75%胜率
- 【无未来函数版本(本次)】
- - 只用历史数据做规则判断
- - 每个决策点只用已知信息
- - 结果:更真实,但可能表现更差
- 差异越大,说明原模型过拟合越严重。
- """)
|