| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- #!/usr/bin/env python3
- """
- DualDirection 策略 + 择时过滤
- 保持原策略所有参数不变,只增加日线趋势和30分钟状态过滤
- """
- import csv
- from datetime import datetime
- import math
- class DualDirectionWithTiming:
- def __init__(self, initial_capital=1000000):
- self.initial_capital = initial_capital
- self.position_size_pct = 1.0
- self.stop_loss_pct = 0.008
- self.take_profit_pct = 0.02
- self.max_hold_bars = 16
- self.min_trend_prob = 0.3
- self.require_daily_uptrend = True
-
- self.long_signal_count = 0
- self.filtered_count = 0
- self.trades = []
- self.capital = initial_capital
-
- def load_daily_data(self, filepath):
- daily_data = {}
- with open(filepath, 'r', encoding='utf-8-sig') as f:
- reader = csv.DictReader(f)
- for row in reader:
- try:
- dt = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S')
- date_str = dt.strftime('%Y-%m-%d')
- daily_data[date_str] = {
- 'open': float(row['open']),
- 'high': float(row['high']),
- 'low': float(row['low']),
- 'close': float(row['close'])
- }
- except:
- continue
-
- dates = sorted(daily_data.keys())
- closes = [daily_data[d]['close'] for d in dates]
-
- for i, date in enumerate(dates):
- if i < 19:
- daily_data[date]['ma20'] = None
- daily_data[date]['trend'] = 0
- else:
- ma20 = sum(closes[i-19:i+1]) / 20
- daily_data[date]['ma20'] = ma20
- close = closes[i]
- if close > ma20 * 1.02:
- daily_data[date]['trend'] = 1
- elif close < ma20 * 0.98:
- daily_data[date]['trend'] = -1
- else:
- daily_data[date]['trend'] = 0
-
- return daily_data
-
- def detect_market_regime(self, data, current_idx):
- if current_idx < 16:
- return 0, 0.0
-
- window = data[current_idx-16:current_idx]
- closes = [row['Close'] for row in window]
- highs = [row['High'] for row in window]
- lows = [row['Low'] for row in window]
-
- start_price = closes[0]
- end_price = closes[-1]
- period_return = (end_price / start_price - 1) * 100
-
- max_price = max(highs)
- min_price = min(lows)
- price_range = (max_price - min_price) / start_price * 100
-
- gains = []
- losses = []
- for i in range(1, len(closes)):
- change = closes[i] - closes[i-1]
- gains.append(max(0, change))
- losses.append(max(0, -change))
-
- avg_gain = sum(gains[-14:]) / 14 if len(gains) >= 14 else sum(gains) / len(gains)
- avg_loss = sum(losses[-14:]) / 14 if len(losses) >= 14 else sum(losses) / len(losses)
-
- if avg_loss == 0:
- rsi = 100
- else:
- rs = avg_gain / avg_loss
- rsi = 100 - (100 / (1 + rs))
-
- returns = [(closes[i] - closes[i-1]) / closes[i-1] * 100 for i in range(1, len(closes))]
- volatility = math.sqrt(sum(r**2 for r in returns) / len(returns)) if returns else 0
-
- reversal_score = 0
- if rsi > 70 or rsi < 30:
- reversal_score += 2
- elif rsi > 65 or rsi < 35:
- reversal_score += 1
-
- if price_range > 4 and abs(period_return) < 1.5:
- reversal_score += 1
-
- if reversal_score >= 3:
- return 2, 0.3
-
- trend_score = 0
- if abs(period_return) >= 2.0:
- trend_score += 3
- elif abs(period_return) >= 1.0:
- trend_score += 2
- elif abs(period_return) >= 0.5:
- trend_score += 1
-
- if 0.5 < volatility < 2.0:
- trend_score += 1
-
- first_half = closes[:len(closes)//2]
- second_half = closes[len(closes)//2:]
- first_avg = sum(first_half) / len(first_half)
- second_avg = sum(second_half) / len(second_half)
-
- if (period_return > 0 and second_avg > first_avg) or (period_return < 0 and second_avg < first_avg):
- trend_score += 1
-
- if trend_score >= 4:
- prob = min(0.95, 0.5 + abs(period_return) / 10)
- return 1, prob
-
- return 0, 0.2
-
- def calculate_indicators(self, data):
- for i, row in enumerate(data):
- if i < 24:
- row['RSI'] = 50
- row['MACD_hist'] = 0
- row['BB_lower'] = row['Close'] * 0.98
- row['Volume_Ratio'] = 1.0
- row['Price_Momentum'] = 0
- continue
-
- closes = [data[j]['Close'] for j in range(i-23, i+1)]
- highs = [data[j]['High'] for j in range(i-23, i+1)]
- lows = [data[j]['Low'] for j in range(i-23, i+1)]
- volumes = [data[j]['Volume'] for j in range(i-23, i+1)]
-
- gains = []
- losses = []
- for j in range(1, 15):
- change = closes[-j] - closes[-j-1]
- gains.append(max(0, change))
- losses.append(max(0, -change))
- avg_gain = sum(gains) / 14
- avg_loss = sum(losses) / 14
- if avg_loss == 0:
- row['RSI'] = 100
- else:
- rs = avg_gain / avg_loss
- row['RSI'] = 100 - (100 / (1 + rs))
-
- bb_middle = sum(closes[-20:]) / 20
- variance = sum((c - bb_middle) ** 2 for c in closes[-20:]) / 20
- bb_std = variance ** 0.5
- row['BB_lower'] = bb_middle - bb_std * 2
-
- ema12 = sum(closes[-12:]) / 12
- ema26 = sum(closes[-26:]) / 26 if len(closes) >= 26 else sum(closes) / len(closes)
- row['MACD'] = ema12 - ema26
- row['MACD_hist'] = row['MACD']
-
- vol_ma = sum(volumes[-12:]) / 12
- row['Volume_Ratio'] = row['Volume'] / vol_ma if vol_ma > 0 else 1
- row['Price_Momentum'] = (row['Close'] - closes[-6]) / closes[-6] if closes[-6] > 0 else 0
-
- return data
-
- def calculate_long_score(self, row, prev_rows):
- long_score = 0
- long_signals = []
-
- if row['RSI'] < 30:
- long_score += 2
- long_signals.append("RSI超卖")
- elif row['RSI'] < 35:
- long_score += 1
- long_signals.append("RSI偏弱")
-
- if row['Close'] <= row['BB_lower'] * 1.01:
- long_score += 2
- long_signals.append("触及下轨")
- elif row['Close'] <= row['BB_lower'] * 1.03:
- long_score += 1
- long_signals.append("接近下轨")
-
- if len(prev_rows) > 0:
- prev_macd_hist = prev_rows[-1]['MACD_hist']
- if row['MACD_hist'] > 0 and prev_macd_hist <= 0:
- long_score += 2
- long_signals.append("MACD金叉")
- elif row['MACD_hist'] > prev_macd_hist:
- long_score += 1
- long_signals.append("MACD改善")
-
- if row['Price_Momentum'] > 0.005:
- long_score += 1
- long_signals.append("动量向上")
-
- if row['Volume_Ratio'] > 1.5:
- long_score += 1
- long_signals.append("放量")
-
- return long_score, long_signals
-
- def run_backtest(self, data_file, daily_file):
- print("="*70)
- print("DualDirection + 择时过滤 回测")
- print("="*70)
- print(f"\nDualDirection参数: 止损0.8% 止盈2% 最大持仓16周期")
- print(f"择时过滤: 日线向上 + 30分钟趋势概率>=0.3")
-
- print(f"\n[1/4] 加载30分钟数据...")
- data = []
- with open(data_file, 'r', encoding='utf-8-sig') as f:
- reader = csv.DictReader(f)
- for row in reader:
- data.append({
- 'DateTime': row['DateTime'],
- 'Open': float(row['Open']),
- 'High': float(row['High']),
- 'Low': float(row['Low']),
- 'Close': float(row['Close']),
- 'Volume': float(row['Volume'])
- })
- print(f" {len(data)}条")
-
- print(f"\n[2/4] 加载日线数据...")
- daily_data = self.load_daily_data(daily_file)
- print(f" {len(daily_data)}条")
-
- print(f"\n[3/4] 计算技术指标...")
- data = self.calculate_indicators(data)
-
- print(f"\n[4/4] 执行回测...")
- position = 0
- entry_price = 0
- entry_idx = 0
-
- for i in range(24, len(data)):
- row = data[i]
- current_time = row['DateTime']
- current_price = row['Close']
- date_str = current_time[:10]
-
- daily_info = daily_data.get(date_str, {'trend': 0})
- daily_trend = daily_info['trend']
-
- regime_state, trend_prob = self.detect_market_regime(data, i)
-
- prev_rows = data[max(0, i-5):i]
- long_score, long_signals = self.calculate_long_score(row, prev_rows)
-
- if position > 0:
- holding_bars = i - entry_idx
- pnl_pct = (current_price - entry_price) / entry_price
-
- exit_reason = None
- if pnl_pct <= -self.stop_loss_pct:
- exit_reason = f"止损({current_price:.2f})"
- elif pnl_pct >= self.take_profit_pct:
- exit_reason = f"止盈({current_price:.2f})"
- elif holding_bars >= self.max_hold_bars:
- exit_reason = f"时间平仓({holding_bars}周期)"
- elif row['RSI'] > 75:
- exit_reason = f"RSI超买({row['RSI']:.1f})"
-
- if exit_reason:
- pnl = (current_price - entry_price) * position
- self.capital += pnl
- self.trades.append({
- 'action': 'CLOSE', 'time': current_time, 'price': current_price,
- 'pnl': pnl, 'pnl_pct': pnl_pct * 100, 'reason': exit_reason
- })
- position = 0
- entry_price = 0
-
- elif long_score >= 4 and position == 0:
- self.long_signal_count += 1
-
- can_trade = True
- if self.require_daily_uptrend and daily_trend != 1:
- can_trade = False
- if regime_state != 1 or trend_prob < self.min_trend_prob:
- can_trade = False
-
- if can_trade:
- position_value = self.capital * self.position_size_pct
- position = position_value / current_price
- entry_price = current_price
- entry_idx = i
- self.trades.append({
- 'action': 'OPEN', 'time': current_time, 'price': current_price,
- 'value': position_value,
- 'reason': f"信号{long_score}分|日线向上|趋势{trend_prob:.2f}"
- })
- else:
- self.filtered_count += 1
-
- closed = [t for t in self.trades if t['action']=='CLOSE']
- print(f" 信号: {self.long_signal_count} 过滤: {self.filtered_count} 交易: {len(closed)}")
- return self.generate_report()
-
- def generate_report(self):
- closed_trades = [t for t in self.trades if t['action'] == 'CLOSE']
- if not closed_trades:
- print("\n无交易")
- return None
-
- wins = [t for t in closed_trades if t['pnl'] > 0]
- losses = [t for t in closed_trades if t['pnl'] <= 0]
-
- total_pnl = sum(t['pnl'] for t in closed_trades)
- final_capital = self.initial_capital + total_pnl
- total_return = (final_capital / self.initial_capital - 1) * 100
- win_rate = len(wins) / len(closed_trades) * 100
-
- total_profit = sum(t['pnl'] for t in wins) if wins else 0
- total_loss = abs(sum(t['pnl'] for t in losses)) if losses else 0
- profit_factor = total_profit / total_loss if total_loss > 0 else 0
-
- print("\n" + "="*70)
- print("回测报告 - DualDirection + 择时过滤")
- print("="*70)
- print(f" 收益率: {total_return:+.2f}%")
- print(f" 信号: {self.long_signal_count} 过滤: {self.filtered_count} 交易: {len(closed_trades)}")
- print(f" 胜率: {win_rate:.2f}% 盈亏比: {profit_factor:.2f}")
- print(f"\n最近5笔:")
- for t in closed_trades[-5:]:
- print(f" {t['time']} | {t['pnl']:+10,.2f} | {t['reason']}")
- print("="*70)
-
- return {'total_return': total_return, 'win_rate': win_rate}
- if __name__ == '__main__':
- backtest = DualDirectionWithTiming()
- backtest.run_backtest('cyb50_30min_2023_to_20260325.csv', '../data-fetch/data/399673_SZ_day_20150101_20260325.csv')
|