#!/usr/bin/env python3 """ 基于 cyb50_30min_dual_direction.py 的只做多T+1回测 使用与 auto_report_long_only_t1.py 完全相同的策略逻辑 """ import csv import json from datetime import datetime, timedelta from collections import deque import os class DualDirectionLongOnlyBacktest: """ 只做多回测 - 基于 cyb50_30min_dual_direction 策略 """ def __init__(self, initial_capital=1000000): self.initial_capital = initial_capital self.position_size_pct = 1.0 # 满仓 self.stop_loss_pct = 0.008 # 0.8%止损 self.take_profit_pct = 0.02 # 2%止盈 self.max_hold_bars = 16 # 最大8小时 # 统计数据 self.long_signal_count = 0 self.trades = [] self.capital = initial_capital def calculate_indicators(self, data): """计算技术指标""" print("计算技术指标...") # 为每行数据添加指标 for i, row in enumerate(data): if i < 24: # 需要至少24个周期的历史数据 row['RSI'] = 50 row['MACD'] = 0 row['MACD_hist'] = 0 row['K'] = 50 row['D'] = 50 row['J'] = 50 row['BB_middle'] = row['Close'] row['BB_upper'] = row['Close'] * 1.02 row['BB_lower'] = row['Close'] * 0.98 row['Volume_Ratio'] = 1.0 row['Price_Momentum'] = 0 row['Close_Open_Pct'] = 0 row['MA6'] = row['Close'] row['MA12'] = row['Close'] continue closes = [data[j]['Close'] for j in range(i-23, i+1)] highs = [data[j]['High'] for j in range(i-23, i+1)] lows = [data[j]['Low'] for j in range(i-23, i+1)] volumes = [data[j]['Volume'] for j in range(i-23, i+1)] # MA row['MA6'] = sum(closes[-6:]) / 6 row['MA12'] = sum(closes[-12:]) / 12 # RSI gains = [] losses = [] for j in range(1, 15): change = closes[-j] - closes[-j-1] gains.append(max(0, change)) losses.append(max(0, -change)) avg_gain = sum(gains) / 14 avg_loss = sum(losses) / 14 if avg_loss == 0: row['RSI'] = 100 else: rs = avg_gain / avg_loss row['RSI'] = 100 - (100 / (1 + rs)) # 布林带 bb_middle = sum(closes[-20:]) / 20 variance = sum((c - bb_middle) ** 2 for c in closes[-20:]) / 20 bb_std = variance ** 0.5 row['BB_middle'] = bb_middle row['BB_upper'] = bb_middle + bb_std * 2 row['BB_lower'] = bb_middle - bb_std * 2 # 简化MACD ema12 = sum(closes[-12:]) / 12 ema26 = sum(closes[-26:]) / 26 if len(closes) >= 26 else sum(closes) / len(closes) row['MACD'] = ema12 - ema26 row['MACD_hist'] = row['MACD'] # 简化 # KDJ (简化) low_9 = min(lows[-9:]) high_9 = max(highs[-9:]) if high_9 == low_9: rsv = 50 else: rsv = (row['Close'] - low_9) / (high_9 - low_9) * 100 row['K'] = rsv row['D'] = rsv row['J'] = 3 * rsv - 2 * rsv # 成交量比率 vol_ma = sum(volumes[-12:]) / 12 row['Volume_Ratio'] = row['Volume'] / vol_ma if vol_ma > 0 else 1 # 价格动量 row['Price_Momentum'] = (row['Close'] - closes[-6]) / closes[-6] if closes[-6] > 0 else 0 row['Close_Open_Pct'] = (row['Close'] - row['Open']) / row['Open'] if row['Open'] > 0 else 0 print(f" 指标计算完成,共{len(data)}条") return data def calculate_long_score(self, row, prev_rows): """计算做多信号强度 - 完全按照 DualDirection 逻辑""" long_score = 0 long_signals = [] # 1. RSI超卖做多 if row['RSI'] < 30: long_score += 2 long_signals.append("RSI超卖") elif row['RSI'] < 35: long_score += 1 long_signals.append("RSI偏弱") # 2. 价格触及布林带下轨 if row['Close'] <= row['BB_lower'] * 1.01: long_score += 2 long_signals.append("触及下轨") elif row['Close'] <= row['BB_lower'] * 1.03: long_score += 1 long_signals.append("接近下轨") # 3. MACD金叉或柱状图转正 if len(prev_rows) > 0: prev_macd_hist = prev_rows[-1]['MACD_hist'] if row['MACD_hist'] > 0 and prev_macd_hist <= 0: long_score += 2 long_signals.append("MACD金叉") elif row['MACD_hist'] > prev_macd_hist: long_score += 1 long_signals.append("MACD改善") # 4. 价格动量向上 if row['Price_Momentum'] > 0.005: long_score += 1 long_signals.append("动量向上") # 5. 成交量放大 if row['Volume_Ratio'] > 1.5: long_score += 1 long_signals.append("放量") return long_score, long_signals def run_backtest(self, data_file): """运行回测""" print("="*70) print("DualDirection 策略 - 只做多T+1回测") print("="*70) print(f"\n参数设置:") print(f" 初始资金: {self.initial_capital:,.0f}元") print(f" 仓位比例: {self.position_size_pct*100:.0f}%") print(f" 止损: {self.stop_loss_pct*100:.1f}%") print(f" 止盈: {self.take_profit_pct*100:.1f}%") print(f" 最大持仓: {self.max_hold_bars}周期(8小时)") # 1. 加载数据 print(f"\n[1/3] 加载数据: {data_file}") data = [] with open(data_file, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: data.append({ 'DateTime': row['DateTime'], 'Open': float(row['Open']), 'High': float(row['High']), 'Low': float(row['Low']), 'Close': float(row['Close']), 'Volume': float(row['Volume']) }) print(f" 加载完成: {len(data)}条") print(f" 时间范围: {data[0]['DateTime']} ~ {data[-1]['DateTime']}") # 2. 计算指标 print("\n[2/3] 计算技术指标...") data = self.calculate_indicators(data) # 3. 生成信号并执行回测 print("\n[3/3] 生成信号并执行回测...") position = 0 entry_price = 0 entry_time = None entry_idx = 0 for i in range(24, len(data)): row = data[i] current_time = row['DateTime'] current_price = row['Close'] # 计算做多信号 prev_rows = data[max(0, i-5):i] long_score, long_signals = self.calculate_long_score(row, prev_rows) # 持仓管理 if position > 0: holding_bars = i - entry_idx pnl_pct = (current_price - entry_price) / entry_price exit_reason = None # 止损 -0.8% if pnl_pct <= -self.stop_loss_pct: exit_reason = f"止损({current_price:.2f})" # 止盈 +2% elif pnl_pct >= self.take_profit_pct: exit_reason = f"止盈({current_price:.2f})" # 最大持仓时间 elif holding_bars >= self.max_hold_bars: exit_reason = f"时间平仓({holding_bars}周期)" # RSI超买 elif row['RSI'] > 75: exit_reason = f"RSI超买({row['RSI']:.1f})" if exit_reason: pnl = (current_price - entry_price) * position self.capital += pnl self.trades.append({ 'action': 'CLOSE', 'time': current_time, 'price': current_price, 'shares': position, 'pnl': pnl, 'pnl_pct': pnl_pct * 100, 'reason': exit_reason }) position = 0 entry_price = 0 entry_time = None # 开仓判断 - 信号强度>=4 elif long_score >= 4 and position == 0: position_value = self.capital * self.position_size_pct position = position_value / current_price entry_price = current_price entry_time = current_time entry_idx = i self.long_signal_count += 1 self.trades.append({ 'action': 'OPEN', 'time': current_time, 'price': current_price, 'shares': position, 'value': position_value, 'reason': f"做多信号(强度{long_score}): {'+'.join(long_signals[:3])}" }) print(f" 做多信号: {self.long_signal_count}个") print(f" 实际交易: {len([t for t in self.trades if t['action']=='CLOSE'])}笔") return self.generate_report() def generate_report(self): """生成报告""" closed_trades = [t for t in self.trades if t['action'] == 'CLOSE'] if not closed_trades: print("\n无交易记录") return None # 计算统计 wins = [t for t in closed_trades if t['pnl'] > 0] losses = [t for t in closed_trades if t['pnl'] <= 0] total_pnl = sum(t['pnl'] for t in closed_trades) final_capital = self.initial_capital + total_pnl total_return = (final_capital / self.initial_capital - 1) * 100 win_rate = len(wins) / len(closed_trades) * 100 total_profit = sum(t['pnl'] for t in wins) if wins else 0 total_loss = abs(sum(t['pnl'] for t in losses)) if losses else 0 profit_factor = total_profit / total_loss if total_loss > 0 else 0 # 计算最大回撤 peak = self.initial_capital max_dd = 0 current = self.initial_capital for t in closed_trades: current += t['pnl'] if current > peak: peak = current dd = (peak - current) / peak * 100 if dd > max_dd: max_dd = dd print("\n" + "="*70) print("回测报告 - DualDirection 只做多T+1") print("="*70) print(f"\n【整体表现】") print(f" 初始资金: {self.initial_capital:,.2f}元") print(f" 最终资金: {final_capital:,.2f}元") print(f" 净盈亏: {total_pnl:+,.2f}元") print(f" 总收益率: {total_return:+.2f}%") print(f" 最大回撤: {max_dd:.2f}%") print(f"\n【交易统计】") print(f" 总交易: {len(closed_trades)}笔") print(f" 盈利: {len(wins)}笔") print(f" 亏损: {len(losses)}笔") print(f" 胜率: {win_rate:.2f}%") print(f" 盈亏比: {profit_factor:.2f}") print(f" 总盈利: {total_profit:,.2f}元") print(f" 总亏损: {total_loss:,.2f}元") print(f"\n【最近10笔交易】") for t in closed_trades[-10:]: print(f" {t['time']} | {t['pnl']:+10,.2f}元 ({t['pnl_pct']:+.2f}%) | {t['reason']}") print("="*70) return { 'total_return': total_return, 'win_rate': win_rate, 'profit_factor': profit_factor, 'trade_count': len(closed_trades), 'max_drawdown': max_dd } if __name__ == '__main__': backtest = DualDirectionLongOnlyBacktest(initial_capital=1000000) result = backtest.run_backtest('cyb50_30min_2023_to_20260325.csv')