""" 最终修正版Regime策略 - 正确使用backtrader指标历史访问 """ import backtrader as bt import pandas as pd import numpy as np class FinalRegimeStrategy(bt.Strategy): """ 状态感知策略 - 最终修正版 关键点: 在backtrader中,用(1)而不是[-1]获取历史值 """ params = ( ('fast', 20), ('slow', 60), ('printlog', False), ) def __init__(self): self.dataclose = self.datas[0].close self.order = None # 均线 self.sma_fast = bt.indicators.SMA(period=self.p.fast) self.sma_slow = bt.indicators.SMA(period=self.p.slow) # 交易计数 self.trade_count = 0 def next(self): if self.order: return # 必须至少有2个数据点才能判断交叉 if len(self) < 2: return # 获取当前和前一周期的均线值 # 在backtrader中: [0]是当前, [-1]是前一个周期 fast_now = self.sma_fast[0] fast_prev = self.sma_fast[-1] slow_now = self.sma_slow[0] slow_prev = self.sma_slow[-1] # 检查是否有有效值 if np.isnan(fast_now) or np.isnan(slow_now): return # 检测金叉: 前一周fast<=slow, 这一周fast>slow golden_cross = fast_prev <= slow_prev and fast_now > slow_now # 检测死叉: 前一周fast>=slow, 这一周fast= slow_prev and fast_now < slow_now # 买入逻辑: 金叉 + 空仓 if golden_cross and not self.position: size = int(self.broker.getcash() / self.dataclose[0] / 100) * 100 if size > 0: self.order = self.buy(size=size) self.trade_count += 1 if self.p.printlog: self.log(f'BUY #{self.trade_count} @ {self.dataclose[0]:.2f}') # 卖出逻辑: 死叉 + 有持仓 elif death_cross and self.position: self.order = self.close() if self.p.printlog: self.log(f'SELL @ {self.dataclose[0]:.2f}') def notify_order(self, order): if order.status in [order.Submitted, order.Accepted]: return if order.status in [order.Completed]: if order.isbuy(): self.log(f'BUY EXECUTED @ {order.executed.price:.2f}') else: self.log(f'SELL EXECUTED @ {order.executed.price:.2f}') self.order = None def log(self, txt, dt=None): if not self.p.printlog: return dt = dt or self.datas[0].datetime.date(0) print(f'{dt.isoformat()} {txt}') def stop(self): roi = (self.broker.getvalue() / self.broker.startingcash - 1) * 100 print(f'\n=== 最终收益: {roi:.2f}% ===') print(f'总交易次数: {self.trade_count}') def run_final_regime(csv_file="chinext50.csv", cash=100000.0): """运行最终修正版Regime策略""" cerebro = bt.Cerebro() df = pd.read_csv(csv_file, parse_dates=['datetime'], index_col='datetime') data = bt.feeds.PandasData(dataname=df) cerebro.adddata(data) cerebro.addstrategy(FinalRegimeStrategy, printlog=False) cerebro.broker.setcash(cash) cerebro.broker.setcommission(commission=0.001) cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.02) cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades') print('=== 创业板50 Regime策略回测 ===') print(f'初始资金: {cerebro.broker.getvalue():.2f}') results = cerebro.run() strat = results[0] print(f'最终资金: {cerebro.broker.getvalue():.2f}') returns = strat.analyzers.returns.get_analysis() print(f"年化收益: {returns.get('rnorm100', 0):.2f}%") sharpe = strat.analyzers.sharpe.get_analysis() sharpe_val = sharpe.get('sharperatio', 0) if sharpe_val: print(f"夏普比率: {sharpe_val:.3f}") else: print("夏普比率: N/A") drawdown = strat.analyzers.drawdown.get_analysis() print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%") trades = strat.analyzers.trades.get_analysis() if trades and trades.get('total'): total = trades['total'].get('total', 0) won = trades['won'].get('total', 0) if trades.get('won') else 0 print(f"总交易: {total}, 盈利: {won}") if total > 0: print(f"胜率: {won/total:.1%}") return cerebro, strat if __name__ == "__main__": run_final_regime()