#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 只做多T+1回测系统 不依赖pandas,使用纯Python实现 """ import csv import json from datetime import datetime, timedelta from collections import deque import math # ==================== 技术指标计算类 ==================== class TechnicalIndicators: """技术指标计算 - 纯Python实现""" @staticmethod def sma(data, period): """简单移动平均线""" if len(data) < period: return None return sum(data[-period:]) / period @staticmethod def ema(data, period): """指数移动平均线""" if len(data) < period: return None multiplier = 2 / (period + 1) ema = data[0] for price in data[1:]: ema = (price - ema) * multiplier + ema return ema @staticmethod def rsi(prices, period=14): """RSI计算""" if len(prices) < period + 1: return None gains = [] losses = [] for i in range(1, len(prices)): change = prices[i] - prices[i-1] if change > 0: gains.append(change) losses.append(0) else: gains.append(0) losses.append(abs(change)) if len(gains) < period: return None avg_gain = sum(gains[-period:]) / period avg_loss = sum(losses[-period:]) / period if avg_loss == 0: return 100 rs = avg_gain / avg_loss return 100 - (100 / (1 + rs)) @staticmethod def bollinger_bands(prices, period=20, std_dev=2): """布林带计算""" if len(prices) < period: return None, None, None middle = sum(prices[-period:]) / period variance = sum((p - middle) ** 2 for p in prices[-period:]) / period std = math.sqrt(variance) upper = middle + (std * std_dev) lower = middle - (std * std_dev) return upper, middle, lower @staticmethod def macd(prices, fast=12, slow=26, signal=9): """MACD计算""" if len(prices) < slow: return None, None, None # 计算EMA def calc_ema(data, period): multiplier = 2 / (period + 1) ema = data[0] for price in data[1:]: ema = (price - ema) * multiplier + ema return ema ema_fast = calc_ema(prices[-fast:], fast) if len(prices) >= fast else None ema_slow = calc_ema(prices[-slow:], slow) if len(prices) >= slow else None if ema_fast is None or ema_slow is None: return None, None, None macd_line = ema_fast - ema_slow # 简化:使用当前MACD作为信号线近似 signal_line = macd_line * 0.8 # 近似值 histogram = macd_line - signal_line return macd_line, signal_line, histogram @staticmethod def kdj(highs, lows, closes, period=9): """KDJ计算""" if len(closes) < period: return None, None, None low_n = min(lows[-period:]) high_n = max(highs[-period:]) close = closes[-1] if high_n == low_n: rsv = 50 else: rsv = (close - low_n) / (high_n - low_n) * 100 # 简化KDJ计算 k = rsv d = k j = 3 * k - 2 * d return k, d, j @staticmethod def atr(highs, lows, closes, period=14): """ATR计算""" if len(closes) < period + 1: return None tr_values = [] for i in range(1, len(closes)): high_low = highs[i] - lows[i] high_close = abs(highs[i] - closes[i-1]) low_close = abs(lows[i] - closes[i-1]) tr = max(high_low, high_close, low_close) tr_values.append(tr) if len(tr_values) < period: return None return sum(tr_values[-period:]) / period # ==================== 数据加载类 ==================== class DataLoader: """CSV数据加载器""" def __init__(self, file_path): self.file_path = file_path self.data = [] def load(self): """加载CSV数据""" print(f"正在加载数据文件: {self.file_path}") with open(self.file_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: # 解析时间 dt_str = row['DateTime'] dt = datetime.strptime(dt_str, '%Y-%m-%d %H:%M:%S') self.data.append({ 'datetime': dt, 'date': dt.date(), 'time': dt.time(), 'open': float(row['Open']), 'high': float(row['High']), 'low': float(row['Low']), 'close': float(row['Close']), 'volume': float(row['Volume']), 'a': float(row['a']) if row['a'] else 0, 'pc': float(row['pc']) if row['pc'] else 0, 'sf': float(row['sf']) if row['sf'] else 0 }) print(f"✅ 数据加载完成: {len(self.data)}条K线") print(f" 数据区间: {self.data[0]['datetime']} ~ {self.data[-1]['datetime']}") return self.data # ==================== 信号生成器 ==================== class SignalGenerator: """只做多信号生成器""" def __init__(self): self.prices = [] self.highs = [] self.lows = [] self.volumes = [] self.macd_histograms = [] def update(self, bar): """更新数据""" self.prices.append(bar['close']) self.highs.append(bar['high']) self.lows.append(bar['low']) self.volumes.append(bar['volume']) def calculate_indicators(self): """计算所有技术指标""" if len(self.prices) < 26: return None ti = TechnicalIndicators() # 移动平均线 ma6 = ti.sma(self.prices, 6) ma12 = ti.sma(self.prices, 12) ma24 = ti.sma(self.prices, 24) # RSI rsi = ti.rsi(self.prices, 14) # 布林带 bb_upper, bb_middle, bb_lower = ti.bollinger_bands(self.prices, 20) # MACD macd_line, macd_signal, macd_hist = ti.macd(self.prices, 12, 26, 9) if macd_hist is not None: self.macd_histograms.append(macd_hist) # KDJ k, d, j = ti.kdj(self.highs, self.lows, self.prices, 9) # ATR atr = ti.atr(self.highs, self.lows, self.prices, 14) atr_pct = atr / self.prices[-1] if atr else None # 成交量比率 volume_ma = ti.sma(self.volumes, 12) volume_ratio = self.volumes[-1] / volume_ma if volume_ma else 1.0 # 价格动量 price_momentum = (self.prices[-1] - self.prices[-6]) / self.prices[-6] if len(self.prices) >= 6 else 0 # 涨跌幅 returns = (self.prices[-1] - self.prices[-2]) / self.prices[-2] if len(self.prices) >= 2 else 0 close_open_pct = (self.prices[-1] - self.highs[-1]) / self.highs[-1] # 简化计算 return { 'ma6': ma6, 'ma12': ma12, 'ma24': ma24, 'rsi': rsi, 'bb_upper': bb_upper, 'bb_middle': bb_middle, 'bb_lower': bb_lower, 'macd': macd_line, 'macd_signal': macd_signal, 'macd_hist': macd_hist, 'k': k, 'd': d, 'j': j, 'atr_pct': atr_pct, 'volume_ratio': volume_ratio, 'price_momentum': price_momentum, 'returns': returns, 'close_open_pct': close_open_pct } def generate_long_signal(self, indicators, bar_idx): """生成做多信号""" if indicators is None: return 0, [] score = 0 signals = [] # 1. RSI超卖 if indicators['rsi'] < 30: score += 2 signals.append("RSI超卖") elif indicators['rsi'] < 35: score += 1 signals.append("RSI偏弱") # 2. KDJ超卖 if indicators['k'] < 20 and indicators['d'] < 20: score += 2 signals.append("KDJ超卖") elif indicators['j'] < 0: score += 1 signals.append("KDJ极端超卖") # 3. MACD金叉 if len(self.macd_histograms) >= 2: if indicators['macd_hist'] > 0 and self.macd_histograms[-2] <= 0: score += 2 signals.append("MACD金叉") elif indicators['macd_hist'] > self.macd_histograms[-2]: score += 1 signals.append("MACD改善") # 4. 价格触及布林带下轨 current_price = self.prices[-1] if indicators['bb_lower'] and current_price <= indicators['bb_lower'] * 1.005: score += 2 signals.append("触及下轨") elif indicators['bb_lower'] and current_price <= indicators['bb_lower'] * 1.01: score += 1 signals.append("接近下轨") # 5. 连续下跌后的反转 if len(self.prices) >= 7: recent_returns = [(self.prices[i] - self.prices[i-1]) / self.prices[i-1] for i in range(len(self.prices)-6, len(self.prices))] if min(recent_returns) < -0.015: consecutive_decline = sum(1 for r in recent_returns if r < 0) if consecutive_decline >= 4: score += 2 signals.append("连续下跌反转") # 6. 价格动量反转 if indicators['price_momentum'] < -0.02: score += 1 signals.append("动量超卖") # 7. 成交量配合 if indicators['volume_ratio'] > 1.2: score += 1 signals.append("放量配合") # 8. MA趋势过滤 if indicators['ma6'] and indicators['ma12'] and indicators['ma24']: if indicators['ma6'] < indicators['ma12'] < indicators['ma24']: score -= 1 signals.append("MA下降趋势惩罚") elif indicators['ma6'] > indicators['ma12']: score += 1 signals.append("MA短期上行") return score, signals # ==================== T+1交易执行器 ==================== class T1BacktestExecutor: """T+1回测执行器""" def __init__(self, initial_capital=1000000): self.initial_capital = initial_capital self.capital = initial_capital self.position = 0 self.entry_price = 0 self.entry_time = None self.entry_date = None self.entry_signals = [] self.holding_bars = 0 # 参数 self.commission_rate = 0.0001 # 万分之一 self.stop_loss_pct = 0.008 # 0.8%止损 self.take_profit_pct = 0.02 # 2%止盈 self.max_hold_bars = 16 # 最大持仓8小时 # 交易记录 self.trades = [] self.equity_curve = [] # 待平仓队列 (T+1规则:当天买入的次日才能卖出) self.pending_positions = [] # 存储不能当天卖出的持仓信息 def can_trade(self, current_date): """检查是否可以交易(T+1限制)""" # 检查是否有前一天买入的持仓可以卖出 available_to_sell = [] still_pending = [] for pos in self.pending_positions: if pos['entry_date'] < current_date: # 可以卖出了 available_to_sell.append(pos) else: # 还不能卖出 still_pending.append(pos) self.pending_positions = still_pending return available_to_sell def check_exit(self, bar, position_info): """检查是否需要平仓""" price = bar['close'] entry_price = position_info['entry_price'] holding_bars = position_info['holding_bars'] stop_loss = entry_price * (1 - self.stop_loss_pct) take_profit = entry_price * (1 + self.take_profit_pct) # 止损 if price <= stop_loss: return True, f"止损({price:.2f}<={stop_loss:.2f})", price # 止盈 if price >= take_profit: return True, f"止盈({price:.2f}>={take_profit:.2f})", price # 最大持仓时间 if holding_bars >= self.max_hold_bars: return True, f"时间平仓({holding_bars}周期)", price return False, "", price def execute_buy(self, bar, score, signals): """执行买入""" price = bar['close'] date = bar['date'] dt = bar['datetime'] # 计算仓位(全仓) position_value = self.capital position_size = int(position_value / price) if position_size <= 0: return False cost = position_size * price * (1 + self.commission_rate) if cost > self.capital: position_size = int(self.capital / (price * (1 + self.commission_rate))) cost = position_size * price * (1 + self.commission_rate) self.capital -= cost # 记录持仓信息(T+1规则下,当天不能卖出) position_info = { 'entry_price': price, 'entry_time': dt, 'entry_date': date, 'position_size': position_size, 'holding_bars': 0, 'entry_signals': signals, 'score': score, 'stop_loss': price * (1 - self.stop_loss_pct), 'take_profit': price * (1 + self.take_profit_pct) } self.pending_positions.append(position_info) print(f"\n[开仓] {dt} 价格:{price:.2f} 数量:{position_size} 信号分数:{score}") print(f" 信号: {', '.join(signals)}") return True def execute_sell(self, bar, position_info, exit_reason): """执行卖出""" price = bar['close'] dt = bar['datetime'] entry_price = position_info['entry_price'] position_size = position_info['position_size'] entry_time = position_info['entry_time'] holding_bars = position_info['holding_bars'] # 计算盈亏 gross_pnl = (price - entry_price) * position_size open_cost = position_size * entry_price * self.commission_rate close_revenue = position_size * price close_cost = close_revenue * self.commission_rate pnl = gross_pnl - open_cost - close_cost pnl_pct = (price - entry_price) / entry_price * 100 # 更新资金 self.capital += close_revenue - close_cost # 记录交易 trade = { 'entry_time': entry_time.strftime('%Y-%m-%d %H:%M:%S'), 'exit_time': dt.strftime('%Y-%m-%d %H:%M:%S'), 'entry_price': round(entry_price, 2), 'exit_price': round(price, 2), 'position': position_size, 'pnl': round(pnl, 2), 'pnl_pct': round(pnl_pct, 2), 'exit_reason': exit_reason, 'holding_bars': holding_bars, 'holding_hours': round(holding_bars * 0.5, 1), 'entry_signals': '|'.join(position_info['entry_signals']), 'capital': round(self.capital, 2), 'position_value': round(position_size * entry_price, 2) } self.trades.append(trade) status = "盈利" if pnl > 0 else "亏损" print(f"[平仓] {dt} 价格:{price:.2f} 盈亏:{pnl:+.2f}({pnl_pct:+.2f}%) [{status}] 原因:{exit_reason}") return trade def update_equity(self, bar, active_position=None): """更新权益曲线""" price = bar['close'] dt = bar['datetime'] total_value = self.capital if active_position: total_value += active_position['position_size'] * price self.equity_curve.append({ 'datetime': dt.strftime('%Y-%m-%d %H:%M:%S'), 'price': round(price, 2), 'capital': round(self.capital, 2), 'total_value': round(total_value, 2), 'return_pct': round((total_value / self.initial_capital - 1) * 100, 2) }) def run_backtest(self, data): """运行回测""" print("\n" + "="*80) print("开始T+1回测") print("="*80) signal_gen = SignalGenerator() active_position = None # 当前活跃持仓(可以卖出的) for i, bar in enumerate(data): current_date = bar['date'] # 更新信号生成器 signal_gen.update(bar) # 检查T+1限制,获取可以卖出的持仓 available_positions = self.can_trade(current_date) # 如果有可卖出的持仓,选择第一个作为活跃持仓 if available_positions and active_position is None: active_position = available_positions[0] for pos in available_positions[1:]: self.pending_positions.append(pos) # 更新活跃持仓的持仓时间 if active_position: active_position['holding_bars'] += 1 # 检查是否需要平仓(只有活跃持仓可以平仓) if active_position: should_exit, exit_reason, exit_price = self.check_exit(bar, active_position) if should_exit: self.execute_sell(bar, active_position, exit_reason) active_position = None # 检查是否开新仓(无持仓时) if active_position is None and len(self.pending_positions) == 0 and i >= 26: indicators = signal_gen.calculate_indicators() score, signals = signal_gen.generate_long_signal(indicators, i) # 信号分数>=4且开仓 if score >= 4: self.execute_buy(bar, score, signals) # 更新权益曲线 self.update_equity(bar, active_position) # 回测结束,强制平仓所有持仓 print("\n" + "="*80) print("回测结束,强制平仓") print("="*80) if active_position: self.execute_sell(data[-1], active_position, "回测结束") # 处理pending中的持仓(如果数据结束但还有持仓) for pos in self.pending_positions: pos['holding_bars'] = self.max_hold_bars # 强制达到平仓条件 self.execute_sell(data[-1], pos, "回测结束(T+1)") return self.trades, self.equity_curve # ==================== 回测报告生成器 ==================== class BacktestReport: """生成回测报告""" def __init__(self, trades, equity_curve, initial_capital=1000000): self.trades = trades self.equity_curve = equity_curve self.initial_capital = initial_capital def calculate_metrics(self): """计算回测指标""" if not self.trades: return { 'total_trades': 0, 'win_rate': 0, 'profit_factor': 0, 'total_return': 0, 'max_drawdown': 0, 'sharpe_ratio': 0 } total_trades = len(self.trades) winning_trades = [t for t in self.trades if t['pnl'] > 0] losing_trades = [t for t in self.trades if t['pnl'] <= 0] win_count = len(winning_trades) loss_count = len(losing_trades) win_rate = (win_count / total_trades * 100) if total_trades > 0 else 0 total_profit = sum(t['pnl'] for t in winning_trades) total_loss = abs(sum(t['pnl'] for t in losing_trades)) profit_factor = total_profit / total_loss if total_loss > 0 else 0 # 总收益 final_capital = self.trades[-1]['capital'] if self.trades else self.initial_capital total_return = (final_capital - self.initial_capital) / self.initial_capital * 100 # 最大回撤 max_drawdown = self._calculate_max_drawdown() # 夏普比率(简化计算) sharpe_ratio = self._calculate_sharpe() return { 'total_trades': total_trades, 'win_count': win_count, 'loss_count': loss_count, 'win_rate': round(win_rate, 2), 'profit_factor': round(profit_factor, 2), 'total_profit': round(total_profit, 2), 'total_loss': round(total_loss, 2), 'total_return': round(total_return, 2), 'max_drawdown': round(max_drawdown, 2), 'sharpe_ratio': round(sharpe_ratio, 2), 'initial_capital': self.initial_capital, 'final_capital': round(final_capital, 2), 'net_profit': round(final_capital - self.initial_capital, 2) } def _calculate_max_drawdown(self): """计算最大回撤""" if not self.equity_curve: return 0 max_dd = 0 peak = self.equity_curve[0]['total_value'] for point in self.equity_curve: value = point['total_value'] if value > peak: peak = value dd = (peak - value) / peak * 100 if dd > max_dd: max_dd = dd return max_dd def _calculate_sharpe(self): """计算夏普比率(简化版)""" if len(self.equity_curve) < 2: return 0 # 计算收益率序列 returns = [] for i in range(1, len(self.equity_curve)): prev = self.equity_curve[i-1]['total_value'] curr = self.equity_curve[i]['total_value'] if prev > 0: returns.append((curr - prev) / prev) if not returns: return 0 avg_return = sum(returns) / len(returns) # 计算标准差 variance = sum((r - avg_return) ** 2 for r in returns) / len(returns) std = math.sqrt(variance) if variance > 0 else 0 # 年化夏普(简化:假设每个bar代表30分钟) if std > 0: sharpe = (avg_return * 48 * 252) / (std * math.sqrt(48)) # 48个30分钟/天,252交易日/年 return sharpe return 0 def generate_report(self): """生成文字报告""" metrics = self.calculate_metrics() report = [] report.append("="*80) report.append("CYB50 只做多T+1策略回测报告") report.append("="*80) report.append("") report.append("【回测参数】") report.append(f" 初始资金: {metrics['initial_capital']:,.0f} 元") report.append(f" 最终资金: {metrics['final_capital']:,.2f} 元") report.append(f" 净盈亏: {metrics['net_profit']:+,.2f} 元") report.append(f" 总收益率: {metrics['total_return']:+.2f}%") report.append("") report.append("【交易统计】") report.append(f" 总交易次数: {metrics['total_trades']} 笔") report.append(f" 盈利次数: {metrics['win_count']} 笔") report.append(f" 亏损次数: {metrics['loss_count']} 笔") report.append(f" 胜率: {metrics['win_rate']}%") report.append(f" 盈亏比: {metrics['profit_factor']}") report.append(f" 总盈利: {metrics['total_profit']:,.2f} 元") report.append(f" 总亏损: {metrics['total_loss']:,.2f} 元") report.append("") report.append("【风险指标】") report.append(f" 最大回撤: {metrics['max_drawdown']}%") report.append(f" 夏普比率: {metrics['sharpe_ratio']}") report.append("") if self.trades: report.append("【最近20笔交易明细】") report.append("-"*120) report.append(f"{'开仓时间':<20} {'平仓时间':<20} {'开仓价':>10} {'平仓价':>10} {'盈亏':>12} {'盈亏%':>8} {'持仓h':>6} {'原因':<20}") report.append("-"*120) for t in self.trades[-20:]: report.append(f"{t['entry_time']:<20} {t['exit_time']:<20} {t['entry_price']:>10.2f} {t['exit_price']:>10.2f} " f"{t['pnl']:>+12.2f} {t['pnl_pct']:>+7.2f}% {t['holding_hours']:>6.1f} {t['exit_reason']:<20}") report.append("-"*120) report.append("") report.append("="*80) return "\n".join(report), metrics def save_results(self, output_dir="."): """保存结果到文件""" import os os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # 1. 保存交易明细 trades_file = os.path.join(output_dir, f"trades_{timestamp}.csv") if self.trades: with open(trades_file, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=self.trades[0].keys()) writer.writeheader() writer.writerows(self.trades) print(f"✅ 交易明细已保存: {trades_file}") # 2. 保存权益曲线 equity_file = os.path.join(output_dir, f"equity_{timestamp}.csv") if self.equity_curve: with open(equity_file, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=self.equity_curve[0].keys()) writer.writeheader() writer.writerows(self.equity_curve) print(f"✅ 权益曲线已保存: {equity_file}") # 3. 保存报告 report_text, metrics = self.generate_report() report_file = os.path.join(output_dir, f"report_{timestamp}.txt") with open(report_file, 'w', encoding='utf-8') as f: f.write(report_text) print(f"✅ 回测报告已保存: {report_file}") # 4. 保存指标JSON json_file = os.path.join(output_dir, f"metrics_{timestamp}.json") with open(json_file, 'w', encoding='utf-8') as f: json.dump(metrics, f, indent=2, ensure_ascii=False) print(f"✅ 指标数据已保存: {json_file}") return trades_file, equity_file, report_file, json_file # ==================== 主函数 ==================== def main(): """主程序""" print("="*80) print("CYB50 只做多T+1回测系统") print("="*80) # 数据文件路径 data_file = "/home/erwin/.openclaw/workspace/cyb50-quant/cat-fly/t1/cyb50_30min_2023_to_20260325.csv" # 1. 加载数据 loader = DataLoader(data_file) data = loader.load() # 2. 运行回测 executor = T1BacktestExecutor(initial_capital=1000000) trades, equity_curve = executor.run_backtest(data) # 3. 生成报告 report = BacktestReport(trades, equity_curve, initial_capital=1000000) report_text, metrics = report.generate_report() # 4. 打印报告 print("\n" + report_text) # 5. 保存结果 output_dir = "/home/erwin/.openclaw/workspace/cyb50-quant/cat-fly/t1/backtest_results" report.save_results(output_dir) print(f"\n✅ 回测完成!") print(f" 总收益率: {metrics['total_return']:+.2f}%") print(f" 交易次数: {metrics['total_trades']} 笔") print(f" 胜率: {metrics['win_rate']}%") if __name__ == "__main__": main()