#!/usr/bin/env python3 """ DualDirection 策略 + 择时过滤 保持原策略所有参数不变,只增加日线趋势和30分钟状态过滤 """ import csv from datetime import datetime import math class DualDirectionWithTiming: def __init__(self, initial_capital=1000000): self.initial_capital = initial_capital self.position_size_pct = 1.0 self.stop_loss_pct = 0.008 self.take_profit_pct = 0.02 self.max_hold_bars = 16 self.min_trend_prob = 0.3 self.require_daily_uptrend = True self.long_signal_count = 0 self.filtered_count = 0 self.trades = [] self.capital = initial_capital def load_daily_data(self, filepath): daily_data = {} with open(filepath, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: try: dt = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S') date_str = dt.strftime('%Y-%m-%d') daily_data[date_str] = { 'open': float(row['open']), 'high': float(row['high']), 'low': float(row['low']), 'close': float(row['close']) } except: continue dates = sorted(daily_data.keys()) closes = [daily_data[d]['close'] for d in dates] for i, date in enumerate(dates): if i < 19: daily_data[date]['ma20'] = None daily_data[date]['trend'] = 0 else: ma20 = sum(closes[i-19:i+1]) / 20 daily_data[date]['ma20'] = ma20 close = closes[i] if close > ma20 * 1.02: daily_data[date]['trend'] = 1 elif close < ma20 * 0.98: daily_data[date]['trend'] = -1 else: daily_data[date]['trend'] = 0 return daily_data def detect_market_regime(self, data, current_idx): if current_idx < 16: return 0, 0.0 window = data[current_idx-16:current_idx] closes = [row['Close'] for row in window] highs = [row['High'] for row in window] lows = [row['Low'] for row in window] start_price = closes[0] end_price = closes[-1] period_return = (end_price / start_price - 1) * 100 max_price = max(highs) min_price = min(lows) price_range = (max_price - min_price) / start_price * 100 gains = [] losses = [] for i in range(1, len(closes)): change = closes[i] - closes[i-1] gains.append(max(0, change)) losses.append(max(0, -change)) avg_gain = sum(gains[-14:]) / 14 if len(gains) >= 14 else sum(gains) / len(gains) avg_loss = sum(losses[-14:]) / 14 if len(losses) >= 14 else sum(losses) / len(losses) if avg_loss == 0: rsi = 100 else: rs = avg_gain / avg_loss rsi = 100 - (100 / (1 + rs)) returns = [(closes[i] - closes[i-1]) / closes[i-1] * 100 for i in range(1, len(closes))] volatility = math.sqrt(sum(r**2 for r in returns) / len(returns)) if returns else 0 reversal_score = 0 if rsi > 70 or rsi < 30: reversal_score += 2 elif rsi > 65 or rsi < 35: reversal_score += 1 if price_range > 4 and abs(period_return) < 1.5: reversal_score += 1 if reversal_score >= 3: return 2, 0.3 trend_score = 0 if abs(period_return) >= 2.0: trend_score += 3 elif abs(period_return) >= 1.0: trend_score += 2 elif abs(period_return) >= 0.5: trend_score += 1 if 0.5 < volatility < 2.0: trend_score += 1 first_half = closes[:len(closes)//2] second_half = closes[len(closes)//2:] first_avg = sum(first_half) / len(first_half) second_avg = sum(second_half) / len(second_half) if (period_return > 0 and second_avg > first_avg) or (period_return < 0 and second_avg < first_avg): trend_score += 1 if trend_score >= 4: prob = min(0.95, 0.5 + abs(period_return) / 10) return 1, prob return 0, 0.2 def calculate_indicators(self, data): for i, row in enumerate(data): if i < 24: row['RSI'] = 50 row['MACD_hist'] = 0 row['BB_lower'] = row['Close'] * 0.98 row['Volume_Ratio'] = 1.0 row['Price_Momentum'] = 0 continue closes = [data[j]['Close'] for j in range(i-23, i+1)] highs = [data[j]['High'] for j in range(i-23, i+1)] lows = [data[j]['Low'] for j in range(i-23, i+1)] volumes = [data[j]['Volume'] for j in range(i-23, i+1)] gains = [] losses = [] for j in range(1, 15): change = closes[-j] - closes[-j-1] gains.append(max(0, change)) losses.append(max(0, -change)) avg_gain = sum(gains) / 14 avg_loss = sum(losses) / 14 if avg_loss == 0: row['RSI'] = 100 else: rs = avg_gain / avg_loss row['RSI'] = 100 - (100 / (1 + rs)) bb_middle = sum(closes[-20:]) / 20 variance = sum((c - bb_middle) ** 2 for c in closes[-20:]) / 20 bb_std = variance ** 0.5 row['BB_lower'] = bb_middle - bb_std * 2 ema12 = sum(closes[-12:]) / 12 ema26 = sum(closes[-26:]) / 26 if len(closes) >= 26 else sum(closes) / len(closes) row['MACD'] = ema12 - ema26 row['MACD_hist'] = row['MACD'] vol_ma = sum(volumes[-12:]) / 12 row['Volume_Ratio'] = row['Volume'] / vol_ma if vol_ma > 0 else 1 row['Price_Momentum'] = (row['Close'] - closes[-6]) / closes[-6] if closes[-6] > 0 else 0 return data def calculate_long_score(self, row, prev_rows): long_score = 0 long_signals = [] if row['RSI'] < 30: long_score += 2 long_signals.append("RSI超卖") elif row['RSI'] < 35: long_score += 1 long_signals.append("RSI偏弱") if row['Close'] <= row['BB_lower'] * 1.01: long_score += 2 long_signals.append("触及下轨") elif row['Close'] <= row['BB_lower'] * 1.03: long_score += 1 long_signals.append("接近下轨") if len(prev_rows) > 0: prev_macd_hist = prev_rows[-1]['MACD_hist'] if row['MACD_hist'] > 0 and prev_macd_hist <= 0: long_score += 2 long_signals.append("MACD金叉") elif row['MACD_hist'] > prev_macd_hist: long_score += 1 long_signals.append("MACD改善") if row['Price_Momentum'] > 0.005: long_score += 1 long_signals.append("动量向上") if row['Volume_Ratio'] > 1.5: long_score += 1 long_signals.append("放量") return long_score, long_signals def run_backtest(self, data_file, daily_file): print("="*70) print("DualDirection + 择时过滤 回测") print("="*70) print(f"\nDualDirection参数: 止损0.8% 止盈2% 最大持仓16周期") print(f"择时过滤: 日线向上 + 30分钟趋势概率>=0.3") print(f"\n[1/4] 加载30分钟数据...") data = [] with open(data_file, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: data.append({ 'DateTime': row['DateTime'], 'Open': float(row['Open']), 'High': float(row['High']), 'Low': float(row['Low']), 'Close': float(row['Close']), 'Volume': float(row['Volume']) }) print(f" {len(data)}条") print(f"\n[2/4] 加载日线数据...") daily_data = self.load_daily_data(daily_file) print(f" {len(daily_data)}条") print(f"\n[3/4] 计算技术指标...") data = self.calculate_indicators(data) print(f"\n[4/4] 执行回测...") position = 0 entry_price = 0 entry_idx = 0 for i in range(24, len(data)): row = data[i] current_time = row['DateTime'] current_price = row['Close'] date_str = current_time[:10] daily_info = daily_data.get(date_str, {'trend': 0}) daily_trend = daily_info['trend'] regime_state, trend_prob = self.detect_market_regime(data, i) prev_rows = data[max(0, i-5):i] long_score, long_signals = self.calculate_long_score(row, prev_rows) if position > 0: holding_bars = i - entry_idx pnl_pct = (current_price - entry_price) / entry_price exit_reason = None if pnl_pct <= -self.stop_loss_pct: exit_reason = f"止损({current_price:.2f})" elif pnl_pct >= self.take_profit_pct: exit_reason = f"止盈({current_price:.2f})" elif holding_bars >= self.max_hold_bars: exit_reason = f"时间平仓({holding_bars}周期)" elif row['RSI'] > 75: exit_reason = f"RSI超买({row['RSI']:.1f})" if exit_reason: pnl = (current_price - entry_price) * position self.capital += pnl self.trades.append({ 'action': 'CLOSE', 'time': current_time, 'price': current_price, 'pnl': pnl, 'pnl_pct': pnl_pct * 100, 'reason': exit_reason }) position = 0 entry_price = 0 elif long_score >= 4 and position == 0: self.long_signal_count += 1 can_trade = True if self.require_daily_uptrend and daily_trend != 1: can_trade = False if regime_state != 1 or trend_prob < self.min_trend_prob: can_trade = False if can_trade: position_value = self.capital * self.position_size_pct position = position_value / current_price entry_price = current_price entry_idx = i self.trades.append({ 'action': 'OPEN', 'time': current_time, 'price': current_price, 'value': position_value, 'reason': f"信号{long_score}分|日线向上|趋势{trend_prob:.2f}" }) else: self.filtered_count += 1 closed = [t for t in self.trades if t['action']=='CLOSE'] print(f" 信号: {self.long_signal_count} 过滤: {self.filtered_count} 交易: {len(closed)}") return self.generate_report() def generate_report(self): closed_trades = [t for t in self.trades if t['action'] == 'CLOSE'] if not closed_trades: print("\n无交易") return None wins = [t for t in closed_trades if t['pnl'] > 0] losses = [t for t in closed_trades if t['pnl'] <= 0] total_pnl = sum(t['pnl'] for t in closed_trades) final_capital = self.initial_capital + total_pnl total_return = (final_capital / self.initial_capital - 1) * 100 win_rate = len(wins) / len(closed_trades) * 100 total_profit = sum(t['pnl'] for t in wins) if wins else 0 total_loss = abs(sum(t['pnl'] for t in losses)) if losses else 0 profit_factor = total_profit / total_loss if total_loss > 0 else 0 print("\n" + "="*70) print("回测报告 - DualDirection + 择时过滤") print("="*70) print(f" 收益率: {total_return:+.2f}%") print(f" 信号: {self.long_signal_count} 过滤: {self.filtered_count} 交易: {len(closed_trades)}") print(f" 胜率: {win_rate:.2f}% 盈亏比: {profit_factor:.2f}") print(f"\n最近5笔:") for t in closed_trades[-5:]: print(f" {t['time']} | {t['pnl']:+10,.2f} | {t['reason']}") print("="*70) return {'total_return': total_return, 'win_rate': win_rate} if __name__ == '__main__': backtest = DualDirectionWithTiming() backtest.run_backtest('cyb50_30min_2023_to_20260325.csv', '../data-fetch/data/399673_SZ_day_20150101_20260325.csv')