| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- """
- 修正版Regime策略 - 使用正确的CrossOver判断
- """
- import backtrader as bt
- import pandas as pd
- import numpy as np
- class RegimeStrategyV2(bt.Strategy):
- """
- 状态感知策略V2
-
- 修复:
- - CrossOver只在穿越当天返回1/-1,需要检测这个变化
- - 加入趋势强度过滤
- """
-
- params = (
- ('fast', 20),
- ('slow', 60),
- ('trend_threshold', 0.02),
- ('printlog', True),
- )
-
- def __init__(self):
- self.dataclose = self.datas[0].close
- self.order = None
-
- # 均线
- self.sma_fast = bt.indicators.SMA(period=self.p.fast)
- self.sma_slow = bt.indicators.SMA(period=self.p.slow)
-
- # 趋势强度
- self.trend = (self.dataclose - self.sma_fast) / self.sma_fast
-
- # 记录上一个cross状态
- self.last_cross = 0
-
- def next(self):
- if self.order:
- return
-
- # 当前cross状态: 1=金叉(快上穿慢), -1=死叉(快下穿慢), 0=无变化
- cross_now = 0
- if self.sma_fast[0] > self.sma_slow[0] and self.sma_fast[-1] <= self.sma_slow[-1]:
- cross_now = 1 # 金叉
- elif self.sma_fast[0] < self.sma_slow[0] and self.sma_fast[-1] >= self.sma_slow[-1]:
- cross_now = -1 # 死叉
-
- trend_val = self.trend[0] if not np.isnan(self.trend[0]) else 0
-
- # 金叉入场
- if cross_now == 1:
- if not self.position:
- size = int(self.broker.getcash() / self.dataclose[0] / 100) * 100
- if size > 0:
- self.order = self.buy(size=size)
- if self.p.printlog:
- self.log(f'BUY @ {self.dataclose[0]:.2f}, Trend: {trend_val:.4f}')
-
- # 死叉出场
- elif cross_now == -1:
- if self.position:
- self.order = self.close()
- if self.p.printlog:
- self.log(f'SELL @ {self.dataclose[0]:.2f}, Trend: {trend_val:.4f}')
-
- def notify_order(self, order):
- if order.status in [order.Submitted, order.Accepted]:
- return
- if order.status in [order.Completed]:
- if order.isbuy():
- self.log(f'BUY EXECUTED @ {order.executed.price:.2f}')
- else:
- self.log(f'SELL EXECUTED @ {order.executed.price:.2f}')
- self.order = None
-
- def log(self, txt, dt=None):
- if not self.p.printlog:
- return
- dt = dt or self.datas[0].datetime.date(0)
- print(f'{dt.isoformat()} {txt}')
-
- def stop(self):
- roi = (self.broker.getvalue() / self.broker.startingcash - 1) * 100
- print(f'\n=== 最终收益: {roi:.2f}% ===')
- def run_regime_v2(csv_file="chinext50.csv", cash=100000.0):
- """运行修正版Regime策略"""
- cerebro = bt.Cerebro()
-
- df = pd.read_csv(csv_file, parse_dates=['datetime'], index_col='datetime')
- data = bt.feeds.PandasData(dataname=df)
- cerebro.adddata(data)
-
- cerebro.addstrategy(RegimeStrategyV2, printlog=False)
- cerebro.broker.setcash(cash)
- cerebro.broker.setcommission(commission=0.001)
-
- cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.02)
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
- cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
- cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
-
- print('=== Regime策略V2回测 ===')
- print(f'初始资金: {cerebro.broker.getvalue():.2f}')
-
- results = cerebro.run()
- strat = results[0]
-
- print(f'最终资金: {cerebro.broker.getvalue():.2f}')
-
- returns = strat.analyzers.returns.get_analysis()
- print(f"年化收益: {returns.get('rnorm100', 0):.2f}%")
-
- sharpe = strat.analyzers.sharpe.get_analysis()
- sharpe_val = sharpe.get('sharperatio', 0)
- if sharpe_val:
- print(f"夏普比率: {sharpe_val:.3f}")
- else:
- print("夏普比率: N/A")
-
- drawdown = strat.analyzers.drawdown.get_analysis()
- print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%")
-
- trades = strat.analyzers.trades.get_analysis()
- if trades and trades.get('total'):
- total = trades['total'].get('total', 0)
- won = trades['won'].get('total', 0) if trades.get('won') else 0
- print(f"总交易: {total}, 盈利: {won}")
- if total > 0:
- print(f"胜率: {won/total:.1%}")
-
- return cerebro, strat
- if __name__ == "__main__":
- run_regime_v2()
|