""" 市场状态感知策略 (Regime-Aware Strategy) 基于Regime Detection的动态仓位管理 """ import backtrader as bt import pandas as pd import numpy as np class RegimeAwareStrategy(bt.Strategy): """ 创业板50状态感知策略 核心逻辑: - 强趋势上涨: 满仓持有 - 弱趋势上涨: 半仓持有 - 震荡整理: 轻仓或空仓 - 弱趋势下跌: 空仓或轻仓做空(如允许) - 强趋势下跌: 空仓观望 入场信号: 20/60日均线金叉 出场信号: 20/60日均线死叉 或 状态恶化 """ params = ( ('fast_ma', 20), ('slow_ma', 60), ('vol_short', 20), ('vol_long', 60), ('trend_threshold', 0.03), ('vol_percentile_threshold', 0.6), ('strong_bull_pct', 1.0), # 强趋势上涨仓位 ('weak_bull_pct', 0.5), # 弱趋势上涨仓位 ('consolidation_pct', 0.2), # 震荡仓位 ('weak_bear_pct', 0.0), # 弱趋势下跌仓位 ('strong_bear_pct', 0.0), # 强趋势下跌仓位 ('printlog', True), ) def __init__(self): self.dataclose = self.datas[0].close self.order = None # 双均线 self.sma_fast = bt.indicators.SMA(period=self.p.fast_ma) self.sma_slow = bt.indicators.SMA(period=self.p.slow_ma) self.crossover = bt.indicators.CrossOver(self.sma_fast, self.sma_slow) # 波动率计算 self.returns = bt.indicators.PctChange(self.dataclose, period=1) self.vol_short = bt.indicators.StdDev(self.returns, period=self.p.vol_short) * np.sqrt(252) self.vol_long = bt.indicators.StdDev(self.returns, period=self.p.vol_long) * np.sqrt(252) # 趋势强度 self.ma_deviation = (self.dataclose - self.sma_fast) / self.sma_fast self.trend_strength = bt.indicators.SMA(self.ma_deviation, period=self.p.fast_ma) # 存储状态历史 self.regime_history = [] def get_current_regime(self): """判断当前市场状态""" # 获取当前值 vol_pct = self.vol_short[0] / self.vol_long[0] if self.vol_long[0] != 0 else 1.0 trend = self.trend_strength[0] # 高波动判断 high_vol = vol_pct > 1.2 # 短期波动率高于长期20% # 趋势判断 strong_up = trend > self.p.trend_threshold strong_down = trend < -self.p.trend_threshold if high_vol and strong_up: return 'strong_bull' elif (not high_vol) and strong_up: return 'weak_bull' elif high_vol and strong_down: return 'strong_bear' elif (not high_vol) and strong_down: return 'weak_bear' else: return 'consolidation' def get_target_position(self, regime): """根据状态确定目标仓位""" position_map = { 'strong_bull': self.p.strong_bull_pct, 'weak_bull': self.p.weak_bull_pct, 'consolidation': self.p.consolidation_pct, 'weak_bear': self.p.weak_bear_pct, 'strong_bear': self.p.strong_bear_pct, } return position_map.get(regime, 0) def next(self): # 当前状态 regime = self.get_current_regime() self.regime_history.append(regime) # 目标仓位 target_pct = self.get_target_position(regime) # 金叉信号 golden_cross = self.crossover > 0 # 死叉信号 death_cross = self.crossover < 0 # 当前仓位 current_value = self.broker.getvalue() cash = self.broker.getcash() position_size = self.position.size if self.position else 0 current_pct = (position_size * self.dataclose[0]) / current_value if current_value > 0 else 0 # 交易逻辑 if target_pct > 0 and position_size == 0 and golden_cross: # 入场: 有仓位空间 + 空仓 + 金叉 size = int((current_value * target_pct) / self.dataclose[0] / 100) * 100 if size > 0: self.order = self.buy(size=size) if self.p.printlog: self.log(f'BUY [{regime}], Size: {size}, Target: {target_pct:.0%}') elif position_size > 0 and (death_cross or target_pct == 0): # 出场: 有持仓 + (死叉 或 状态恶化到0仓位) self.order = self.close() if self.p.printlog: reason = 'Death Cross' if death_cross else f'Regime: {regime}' self.log(f'SELL [{reason}], Size: {position_size}') elif position_size > 0 and target_pct < current_pct * 0.8: # 减仓: 状态恶化但未到0 new_size = int((current_value * target_pct) / self.dataclose[0] / 100) * 100 if new_size < position_size: close_size = position_size - new_size self.order = self.sell(size=close_size) if self.p.printlog: self.log(f'REDUCE [{regime}], From {current_pct:.1%} to {target_pct:.0%}') def notify_order(self, order): if order.status in [order.Submitted, order.Accepted]: return if order.status in [order.Completed]: if order.isbuy(): self.log(f'BUY EXECUTED @ {order.executed.price:.2f}') else: self.log(f'SELL EXECUTED @ {order.executed.price:.2f}') self.order = None def log(self, txt, dt=None): dt = dt or self.datas[0].datetime.date(0) print(f'{dt.isoformat()} {txt}') def stop(self): # 统计状态分布 if self.regime_history: from collections import Counter regime_counts = Counter(self.regime_history) print('\n=== 状态分布 ===') for regime, count in regime_counts.most_common(): print(f'{regime}: {count} 天 ({count/len(self.regime_history):.1%})') roi = (self.broker.getvalue() / self.broker.startingcash - 1) * 100 print(f'\n=== 最终收益: {roi:.2f}% ===') def run_regime_backtest(csv_file="chinext50.csv", cash=100000.0): """运行状态感知策略回测""" cerebro = bt.Cerebro() # 数据 df = pd.read_csv(csv_file, parse_dates=['datetime'], index_col='datetime') data = bt.feeds.PandasData(dataname=df) cerebro.adddata(data) # 策略 cerebro.addstrategy(RegimeAwareStrategy) # 设置 cerebro.broker.setcash(cash) cerebro.broker.setcommission(commission=0.001) # 分析器 cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe', riskfreerate=0.02) cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') print('=== 状态感知策略回测 ===') print(f'初始资金: {cerebro.broker.getvalue():.2f}') results = cerebro.run() strat = results[0] print(f'\n=== 回测指标 ===') returns = strat.analyzers.returns.get_analysis() print(f"年化收益: {returns.get('rnorm100', 0):.2f}%") sharpe = strat.analyzers.sharpe.get_analysis() sharpe_ratio = sharpe.get('sharperatio') sharpe_text = 'N/A' if sharpe_ratio is None else f"{sharpe_ratio:.3f}" print(f"夏普比率: {sharpe_text}") drawdown = strat.analyzers.drawdown.get_analysis() print(f"最大回撤: {drawdown.get('max', {}).get('drawdown', 0):.2f}%") return cerebro, strat if __name__ == "__main__": run_regime_backtest()