| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- """
- 市场状态感知策略 (Regime-Aware Strategy)
- 基于Regime Detection的动态仓位管理
- """
- import backtrader as bt
- import pandas as pd
- import numpy as np
- class RegimeAwareStrategy(bt.Strategy):
- """
- 创业板50状态感知策略
-
- 核心逻辑:
- - 强趋势上涨: 满仓持有
- - 弱趋势上涨: 半仓持有
- - 震荡整理: 轻仓或空仓
- - 弱趋势下跌: 空仓或轻仓做空(如允许)
- - 强趋势下跌: 空仓观望
-
- 入场信号: 20/60日均线金叉
- 出场信号: 20/60日均线死叉 或 状态恶化
- """
-
- params = (
- ('fast_ma', 20),
- ('slow_ma', 60),
- ('vol_short', 20),
- ('vol_long', 60),
- ('trend_threshold', 0.03),
- ('vol_percentile_threshold', 0.6),
- ('strong_bull_pct', 1.0), # 强趋势上涨仓位
- ('weak_bull_pct', 0.5), # 弱趋势上涨仓位
- ('consolidation_pct', 0.2), # 震荡仓位
- ('weak_bear_pct', 0.0), # 弱趋势下跌仓位
- ('strong_bear_pct', 0.0), # 强趋势下跌仓位
- ('printlog', True),
- )
-
- def __init__(self):
- self.dataclose = self.datas[0].close
- self.order = None
-
- # 双均线
- self.sma_fast = bt.indicators.SMA(period=self.p.fast_ma)
- self.sma_slow = bt.indicators.SMA(period=self.p.slow_ma)
- self.crossover = bt.indicators.CrossOver(self.sma_fast, self.sma_slow)
-
- # 波动率计算
- self.returns = bt.indicators.PctChange(self.dataclose, period=1)
- self.vol_short = bt.indicators.StdDev(self.returns, period=self.p.vol_short) * np.sqrt(252)
- self.vol_long = bt.indicators.StdDev(self.returns, period=self.p.vol_long) * np.sqrt(252)
-
- # 趋势强度
- self.ma_deviation = (self.dataclose - self.sma_fast) / self.sma_fast
- self.trend_strength = bt.indicators.SMA(self.ma_deviation, period=self.p.fast_ma)
-
- # 存储状态历史
- self.regime_history = []
-
- def get_current_regime(self):
- """判断当前市场状态"""
- # 获取当前值
- vol_pct = self.vol_short[0] / self.vol_long[0] if self.vol_long[0] != 0 else 1.0
- trend = self.trend_strength[0]
-
- # 高波动判断
- high_vol = vol_pct > 1.2 # 短期波动率高于长期20%
-
- # 趋势判断
- strong_up = trend > self.p.trend_threshold
- strong_down = trend < -self.p.trend_threshold
-
- if high_vol and strong_up:
- return 'strong_bull'
- elif (not high_vol) and strong_up:
- return 'weak_bull'
- elif high_vol and strong_down:
- return 'strong_bear'
- elif (not high_vol) and strong_down:
- return 'weak_bear'
- else:
- return 'consolidation'
-
- def get_target_position(self, regime):
- """根据状态确定目标仓位"""
- position_map = {
- 'strong_bull': self.p.strong_bull_pct,
- 'weak_bull': self.p.weak_bull_pct,
- 'consolidation': self.p.consolidation_pct,
- 'weak_bear': self.p.weak_bear_pct,
- 'strong_bear': self.p.strong_bear_pct,
- }
- return position_map.get(regime, 0)
-
- def next(self):
- # 当前状态
- regime = self.get_current_regime()
- self.regime_history.append(regime)
-
- # 目标仓位
- target_pct = self.get_target_position(regime)
-
- # 金叉信号
- golden_cross = self.crossover > 0
- # 死叉信号
- death_cross = self.crossover < 0
-
- # 当前仓位
- current_value = self.broker.getvalue()
- cash = self.broker.getcash()
- position_size = self.position.size if self.position else 0
- current_pct = (position_size * self.dataclose[0]) / current_value if current_value > 0 else 0
-
- # 交易逻辑
- if target_pct > 0 and position_size == 0 and golden_cross:
- # 入场: 有仓位空间 + 空仓 + 金叉
- size = int((current_value * target_pct) / self.dataclose[0] / 100) * 100
- if size > 0:
- self.order = self.buy(size=size)
- if self.p.printlog:
- self.log(f'BUY [{regime}], Size: {size}, Target: {target_pct:.0%}')
-
- elif position_size > 0 and (death_cross or target_pct == 0):
- # 出场: 有持仓 + (死叉 或 状态恶化到0仓位)
- self.order = self.close()
- if self.p.printlog:
- reason = 'Death Cross' if death_cross else f'Regime: {regime}'
- self.log(f'SELL [{reason}], Size: {position_size}')
-
- elif position_size > 0 and target_pct < current_pct * 0.8:
- # 减仓: 状态恶化但未到0
- new_size = int((current_value * target_pct) / self.dataclose[0] / 100) * 100
- if new_size < position_size:
- close_size = position_size - new_size
- self.order = self.sell(size=close_size)
- if self.p.printlog:
- self.log(f'REDUCE [{regime}], From {current_pct:.1%} to {target_pct:.0%}')
-
- def notify_order(self, order):
- if order.status in [order.Submitted, order.Accepted]:
- return
- if order.status in [order.Completed]:
- if order.isbuy():
- self.log(f'BUY EXECUTED @ {order.executed.price:.2f}')
- else:
- self.log(f'SELL EXECUTED @ {order.executed.price:.2f}')
- self.order = None
-
- def log(self, txt, dt=None):
- dt = dt or self.datas[0].datetime.date(0)
- print(f'{dt.isoformat()} {txt}')
-
- def stop(self):
- # 统计状态分布
- if self.regime_history:
- from collections import Counter
- regime_counts = Counter(self.regime_history)
- print('\n=== 状态分布 ===')
- for regime, count in regime_counts.most_common():
- print(f'{regime}: {count} 天 ({count/len(self.regime_history):.1%})')
-
- roi = (self.broker.getvalue() / self.broker.startingcash - 1) * 100
- print(f'\n=== 最终收益: {roi:.2f}% ===')
- def run_regime_backtest(csv_file="chinext50.csv", cash=100000.0):
- """运行状态感知策略回测"""
- cerebro = bt.Cerebro()
-
- # 数据
- df = pd.read_csv(csv_file, parse_dates=['datetime'], index_col='datetime')
- data = bt.feeds.PandasData(dataname=df)
- cerebro.adddata(data)
-
- # 策略
- cerebro.addstrategy(RegimeAwareStrategy)
-
- # 设置
- cerebro.broker.setcash(cash)
- cerebro.broker.setcommission(commission=0.001)
-
- # 分析器
- cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.02)
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
- cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
-
- print('=== 状态感知策略回测 ===')
- print(f'初始资金: {cerebro.broker.getvalue():.2f}')
-
- results = cerebro.run()
- strat = results[0]
-
- print(f'\n=== 回测指标 ===')
- returns = strat.analyzers.returns.get_analysis()
- print(f"年化收益: {returns.get('rnorm100', 0):.2f}%")
-
- sharpe = strat.analyzers.sharpe.get_analysis()
- sharpe_ratio = sharpe.get('sharperatio')
- sharpe_text = 'N/A' if sharpe_ratio is None else f"{sharpe_ratio:.3f}"
- print(f"夏普比率: {sharpe_text}")
-
- drawdown = strat.analyzers.drawdown.get_analysis()
- print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%")
-
- return cerebro, strat
- if __name__ == "__main__":
- run_regime_backtest()
|