| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- #!/usr/bin/env python3
- """
- 基于 cyb50_30min_dual_direction.py 的只做多T+1回测
- 使用与 auto_report_long_only_t1.py 完全相同的策略逻辑
- """
- import csv
- import json
- from datetime import datetime, timedelta
- from collections import deque
- import os
- class DualDirectionLongOnlyBacktest:
- """
- 只做多回测 - 基于 cyb50_30min_dual_direction 策略
- """
-
- def __init__(self, initial_capital=1000000):
- self.initial_capital = initial_capital
- self.position_size_pct = 1.0 # 满仓
- self.stop_loss_pct = 0.008 # 0.8%止损
- self.take_profit_pct = 0.02 # 2%止盈
- self.max_hold_bars = 16 # 最大8小时
-
- # 统计数据
- self.long_signal_count = 0
- self.trades = []
- self.capital = initial_capital
-
- def calculate_indicators(self, data):
- """计算技术指标"""
- print("计算技术指标...")
-
- # 为每行数据添加指标
- for i, row in enumerate(data):
- if i < 24: # 需要至少24个周期的历史数据
- row['RSI'] = 50
- row['MACD'] = 0
- row['MACD_hist'] = 0
- row['K'] = 50
- row['D'] = 50
- row['J'] = 50
- row['BB_middle'] = row['Close']
- row['BB_upper'] = row['Close'] * 1.02
- row['BB_lower'] = row['Close'] * 0.98
- row['Volume_Ratio'] = 1.0
- row['Price_Momentum'] = 0
- row['Close_Open_Pct'] = 0
- row['MA6'] = row['Close']
- row['MA12'] = row['Close']
- continue
-
- closes = [data[j]['Close'] for j in range(i-23, i+1)]
- highs = [data[j]['High'] for j in range(i-23, i+1)]
- lows = [data[j]['Low'] for j in range(i-23, i+1)]
- volumes = [data[j]['Volume'] for j in range(i-23, i+1)]
-
- # MA
- row['MA6'] = sum(closes[-6:]) / 6
- row['MA12'] = sum(closes[-12:]) / 12
-
- # RSI
- gains = []
- losses = []
- for j in range(1, 15):
- change = closes[-j] - closes[-j-1]
- gains.append(max(0, change))
- losses.append(max(0, -change))
- avg_gain = sum(gains) / 14
- avg_loss = sum(losses) / 14
- if avg_loss == 0:
- row['RSI'] = 100
- else:
- rs = avg_gain / avg_loss
- row['RSI'] = 100 - (100 / (1 + rs))
-
- # 布林带
- bb_middle = sum(closes[-20:]) / 20
- variance = sum((c - bb_middle) ** 2 for c in closes[-20:]) / 20
- bb_std = variance ** 0.5
- row['BB_middle'] = bb_middle
- row['BB_upper'] = bb_middle + bb_std * 2
- row['BB_lower'] = bb_middle - bb_std * 2
-
- # 简化MACD
- ema12 = sum(closes[-12:]) / 12
- ema26 = sum(closes[-26:]) / 26 if len(closes) >= 26 else sum(closes) / len(closes)
- row['MACD'] = ema12 - ema26
- row['MACD_hist'] = row['MACD'] # 简化
-
- # KDJ (简化)
- low_9 = min(lows[-9:])
- high_9 = max(highs[-9:])
- if high_9 == low_9:
- rsv = 50
- else:
- rsv = (row['Close'] - low_9) / (high_9 - low_9) * 100
- row['K'] = rsv
- row['D'] = rsv
- row['J'] = 3 * rsv - 2 * rsv
-
- # 成交量比率
- vol_ma = sum(volumes[-12:]) / 12
- row['Volume_Ratio'] = row['Volume'] / vol_ma if vol_ma > 0 else 1
-
- # 价格动量
- row['Price_Momentum'] = (row['Close'] - closes[-6]) / closes[-6] if closes[-6] > 0 else 0
- row['Close_Open_Pct'] = (row['Close'] - row['Open']) / row['Open'] if row['Open'] > 0 else 0
-
- print(f" 指标计算完成,共{len(data)}条")
- return data
-
- def calculate_long_score(self, row, prev_rows):
- """计算做多信号强度 - 完全按照 DualDirection 逻辑"""
- long_score = 0
- long_signals = []
-
- # 1. RSI超卖做多
- if row['RSI'] < 30:
- long_score += 2
- long_signals.append("RSI超卖")
- elif row['RSI'] < 35:
- long_score += 1
- long_signals.append("RSI偏弱")
-
- # 2. 价格触及布林带下轨
- if row['Close'] <= row['BB_lower'] * 1.01:
- long_score += 2
- long_signals.append("触及下轨")
- elif row['Close'] <= row['BB_lower'] * 1.03:
- long_score += 1
- long_signals.append("接近下轨")
-
- # 3. MACD金叉或柱状图转正
- if len(prev_rows) > 0:
- prev_macd_hist = prev_rows[-1]['MACD_hist']
- if row['MACD_hist'] > 0 and prev_macd_hist <= 0:
- long_score += 2
- long_signals.append("MACD金叉")
- elif row['MACD_hist'] > prev_macd_hist:
- long_score += 1
- long_signals.append("MACD改善")
-
- # 4. 价格动量向上
- if row['Price_Momentum'] > 0.005:
- long_score += 1
- long_signals.append("动量向上")
-
- # 5. 成交量放大
- if row['Volume_Ratio'] > 1.5:
- long_score += 1
- long_signals.append("放量")
-
- return long_score, long_signals
-
- def run_backtest(self, data_file):
- """运行回测"""
- print("="*70)
- print("DualDirection 策略 - 只做多T+1回测")
- print("="*70)
- print(f"\n参数设置:")
- print(f" 初始资金: {self.initial_capital:,.0f}元")
- print(f" 仓位比例: {self.position_size_pct*100:.0f}%")
- print(f" 止损: {self.stop_loss_pct*100:.1f}%")
- print(f" 止盈: {self.take_profit_pct*100:.1f}%")
- print(f" 最大持仓: {self.max_hold_bars}周期(8小时)")
-
- # 1. 加载数据
- print(f"\n[1/3] 加载数据: {data_file}")
- data = []
- with open(data_file, 'r', encoding='utf-8-sig') as f:
- reader = csv.DictReader(f)
- for row in reader:
- data.append({
- 'DateTime': row['DateTime'],
- 'Open': float(row['Open']),
- 'High': float(row['High']),
- 'Low': float(row['Low']),
- 'Close': float(row['Close']),
- 'Volume': float(row['Volume'])
- })
- print(f" 加载完成: {len(data)}条")
- print(f" 时间范围: {data[0]['DateTime']} ~ {data[-1]['DateTime']}")
-
- # 2. 计算指标
- print("\n[2/3] 计算技术指标...")
- data = self.calculate_indicators(data)
-
- # 3. 生成信号并执行回测
- print("\n[3/3] 生成信号并执行回测...")
-
- position = 0
- entry_price = 0
- entry_time = None
- entry_idx = 0
-
- for i in range(24, len(data)):
- row = data[i]
- current_time = row['DateTime']
- current_price = row['Close']
-
- # 计算做多信号
- prev_rows = data[max(0, i-5):i]
- long_score, long_signals = self.calculate_long_score(row, prev_rows)
-
- # 持仓管理
- if position > 0:
- holding_bars = i - entry_idx
- pnl_pct = (current_price - entry_price) / entry_price
-
- exit_reason = None
-
- # 止损 -0.8%
- if pnl_pct <= -self.stop_loss_pct:
- exit_reason = f"止损({current_price:.2f})"
- # 止盈 +2%
- elif pnl_pct >= self.take_profit_pct:
- exit_reason = f"止盈({current_price:.2f})"
- # 最大持仓时间
- elif holding_bars >= self.max_hold_bars:
- exit_reason = f"时间平仓({holding_bars}周期)"
- # RSI超买
- elif row['RSI'] > 75:
- exit_reason = f"RSI超买({row['RSI']:.1f})"
-
- if exit_reason:
- pnl = (current_price - entry_price) * position
- self.capital += pnl
-
- self.trades.append({
- 'action': 'CLOSE',
- 'time': current_time,
- 'price': current_price,
- 'shares': position,
- 'pnl': pnl,
- 'pnl_pct': pnl_pct * 100,
- 'reason': exit_reason
- })
-
- position = 0
- entry_price = 0
- entry_time = None
-
- # 开仓判断 - 信号强度>=4
- elif long_score >= 4 and position == 0:
- position_value = self.capital * self.position_size_pct
- position = position_value / current_price
- entry_price = current_price
- entry_time = current_time
- entry_idx = i
- self.long_signal_count += 1
-
- self.trades.append({
- 'action': 'OPEN',
- 'time': current_time,
- 'price': current_price,
- 'shares': position,
- 'value': position_value,
- 'reason': f"做多信号(强度{long_score}): {'+'.join(long_signals[:3])}"
- })
-
- print(f" 做多信号: {self.long_signal_count}个")
- print(f" 实际交易: {len([t for t in self.trades if t['action']=='CLOSE'])}笔")
-
- return self.generate_report()
-
- def generate_report(self):
- """生成报告"""
- closed_trades = [t for t in self.trades if t['action'] == 'CLOSE']
-
- if not closed_trades:
- print("\n无交易记录")
- return None
-
- # 计算统计
- wins = [t for t in closed_trades if t['pnl'] > 0]
- losses = [t for t in closed_trades if t['pnl'] <= 0]
-
- total_pnl = sum(t['pnl'] for t in closed_trades)
- final_capital = self.initial_capital + total_pnl
- total_return = (final_capital / self.initial_capital - 1) * 100
-
- win_rate = len(wins) / len(closed_trades) * 100
-
- total_profit = sum(t['pnl'] for t in wins) if wins else 0
- total_loss = abs(sum(t['pnl'] for t in losses)) if losses else 0
- profit_factor = total_profit / total_loss if total_loss > 0 else 0
-
- # 计算最大回撤
- peak = self.initial_capital
- max_dd = 0
- current = self.initial_capital
-
- for t in closed_trades:
- current += t['pnl']
- if current > peak:
- peak = current
- dd = (peak - current) / peak * 100
- if dd > max_dd:
- max_dd = dd
-
- print("\n" + "="*70)
- print("回测报告 - DualDirection 只做多T+1")
- print("="*70)
- print(f"\n【整体表现】")
- print(f" 初始资金: {self.initial_capital:,.2f}元")
- print(f" 最终资金: {final_capital:,.2f}元")
- print(f" 净盈亏: {total_pnl:+,.2f}元")
- print(f" 总收益率: {total_return:+.2f}%")
- print(f" 最大回撤: {max_dd:.2f}%")
-
- print(f"\n【交易统计】")
- print(f" 总交易: {len(closed_trades)}笔")
- print(f" 盈利: {len(wins)}笔")
- print(f" 亏损: {len(losses)}笔")
- print(f" 胜率: {win_rate:.2f}%")
- print(f" 盈亏比: {profit_factor:.2f}")
- print(f" 总盈利: {total_profit:,.2f}元")
- print(f" 总亏损: {total_loss:,.2f}元")
-
- print(f"\n【最近10笔交易】")
- for t in closed_trades[-10:]:
- print(f" {t['time']} | {t['pnl']:+10,.2f}元 ({t['pnl_pct']:+.2f}%) | {t['reason']}")
-
- print("="*70)
-
- return {
- 'total_return': total_return,
- 'win_rate': win_rate,
- 'profit_factor': profit_factor,
- 'trade_count': len(closed_trades),
- 'max_drawdown': max_dd
- }
- if __name__ == '__main__':
- backtest = DualDirectionLongOnlyBacktest(initial_capital=1000000)
- result = backtest.run_backtest('cyb50_30min_2023_to_20260325.csv')
|