| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- #!/usr/bin/env python3
- """DualThrust strategies for Backtrader — highest-returning variants from Chinext50 experiments."""
- import math
- import backtrader as bt
- class BaseIndexStrategy(bt.Strategy):
- """Common helpers for long-only index timing strategies."""
- def __init__(self):
- self.order = None
- self.entry_count = 0
- self.bars_in_market = 0
- self.exposure_sum = 0.0
- def notify_order(self, order):
- if order.status in [order.Submitted, order.Accepted]:
- return
- if order.status == order.Completed and order.isbuy():
- self.entry_count += 1
- self.order = None
- def next(self):
- portfolio_value = self.broker.getvalue()
- if portfolio_value > 0:
- position_value = abs(self.position.size) * self.data.close[0]
- exposure = position_value / portfolio_value
- self.exposure_sum += exposure
- if exposure > 0:
- self.bars_in_market += 1
- def _target_size_for_weight(self, target_weight: float) -> int:
- target_weight = max(0.0, target_weight)
- portfolio_value = self.broker.getvalue()
- price = self.data.close[0]
- if portfolio_value <= 0 or price <= 0:
- return 0
- target_value = portfolio_value * target_weight
- return max(int(target_value / price), 0)
- def _rebalance_to_weight(self, target_weight: float):
- target_size = self._target_size_for_weight(target_weight)
- current_size = self.position.size
- size_delta = target_size - current_size
- if size_delta > 0:
- self.order = self.buy(size=size_delta)
- elif size_delta < 0:
- self.order = self.sell(size=abs(size_delta))
- def _buy_full(self):
- self._rebalance_to_weight(1.0)
- def _go_flat(self):
- if self.position:
- self.order = self.close()
- class DualThrustBasicStrategy(BaseIndexStrategy):
- """Close-only Dual Thrust proxy without regime filter.
- Best config from Chinext50 experiments (2014-2026):
- - range_period=20, k1=0.3, k2=0.3
- - Total return: 639.25%, Annual: 19.22%, Sharpe: 0.501, Max DD: 37.27%
- """
- params = (
- ("range_period", 20),
- ("k1", 0.5),
- ("k2", 0.5),
- )
- def __init__(self):
- super().__init__()
- def next(self):
- super().next()
- if self.order:
- return
- if len(self) <= self.p.range_period:
- return
- closes = [float(self.data.close[-offset]) for offset in range(1, self.p.range_period + 1)]
- thrust_range = max(closes) - min(closes)
- reference_price = float(self.data.close[-1])
- upper = reference_price + self.p.k1 * thrust_range
- lower = reference_price - self.p.k2 * thrust_range
- entry_signal = self.data.close[0] > upper
- exit_signal = self.data.close[0] < lower
- if entry_signal and not self.position:
- self._buy_full()
- elif self.position and exit_signal:
- self._go_flat()
- class DualThrustRegimeStrategy(BaseIndexStrategy):
- """Close-only Dual Thrust proxy with long-term SMA regime filter.
- Best config from Chinext50 experiments:
- - range_period=20, k1=0.3, k2=0.3, regime=120
- - Total return: 391.77%, Annual: 15.02%, Sharpe: 0.405, Max DD: 34.04%
- """
- params = (
- ("range_period", 20),
- ("k1", 0.5),
- ("k2", 0.5),
- ("regime", 200),
- )
- def __init__(self):
- super().__init__()
- self.sma_regime = bt.indicators.SMA(self.data.close, period=self.p.regime)
- def next(self):
- super().next()
- if self.order:
- return
- if len(self) <= max(self.p.range_period, self.p.regime):
- return
- if math.isnan(self.sma_regime[0]):
- return
- closes = [float(self.data.close[-offset]) for offset in range(1, self.p.range_period + 1)]
- thrust_range = max(closes) - min(closes)
- reference_price = float(self.data.close[-1])
- upper = reference_price + self.p.k1 * thrust_range
- lower = reference_price - self.p.k2 * thrust_range
- entry_signal = self.data.close[0] > upper and self.data.close[0] > self.sma_regime[0]
- exit_signal = self.data.close[0] < lower or self.data.close[0] < self.sma_regime[0]
- if entry_signal and not self.position:
- self._buy_full()
- elif self.position and exit_signal:
- self._go_flat()
- # Additional tested configurations (same classes, different params):
- # DualThrustFastStrategy: DualThrustBasicStrategy(range_period=15, k1=0.3, k2=0.3)
- # DualThrustSlowStrategy: DualThrustBasicStrategy(range_period=30, k1=0.3, k2=0.3)
|