dualthrust_strategy.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. #!/usr/bin/env python3
  2. """DualThrust strategies for Backtrader — highest-returning variants from Chinext50 experiments."""
  3. import math
  4. import backtrader as bt
  5. class BaseIndexStrategy(bt.Strategy):
  6. """Common helpers for long-only index timing strategies."""
  7. def __init__(self):
  8. self.order = None
  9. self.entry_count = 0
  10. self.bars_in_market = 0
  11. self.exposure_sum = 0.0
  12. def notify_order(self, order):
  13. if order.status in [order.Submitted, order.Accepted]:
  14. return
  15. if order.status == order.Completed and order.isbuy():
  16. self.entry_count += 1
  17. self.order = None
  18. def next(self):
  19. portfolio_value = self.broker.getvalue()
  20. if portfolio_value > 0:
  21. position_value = abs(self.position.size) * self.data.close[0]
  22. exposure = position_value / portfolio_value
  23. self.exposure_sum += exposure
  24. if exposure > 0:
  25. self.bars_in_market += 1
  26. def _target_size_for_weight(self, target_weight: float) -> int:
  27. target_weight = max(0.0, target_weight)
  28. portfolio_value = self.broker.getvalue()
  29. price = self.data.close[0]
  30. if portfolio_value <= 0 or price <= 0:
  31. return 0
  32. target_value = portfolio_value * target_weight
  33. return max(int(target_value / price), 0)
  34. def _rebalance_to_weight(self, target_weight: float):
  35. target_size = self._target_size_for_weight(target_weight)
  36. current_size = self.position.size
  37. size_delta = target_size - current_size
  38. if size_delta > 0:
  39. self.order = self.buy(size=size_delta)
  40. elif size_delta < 0:
  41. self.order = self.sell(size=abs(size_delta))
  42. def _buy_full(self):
  43. self._rebalance_to_weight(1.0)
  44. def _go_flat(self):
  45. if self.position:
  46. self.order = self.close()
  47. class DualThrustBasicStrategy(BaseIndexStrategy):
  48. """Close-only Dual Thrust proxy without regime filter.
  49. Best config from Chinext50 experiments (2014-2026):
  50. - range_period=20, k1=0.3, k2=0.3
  51. - Total return: 639.25%, Annual: 19.22%, Sharpe: 0.501, Max DD: 37.27%
  52. """
  53. params = (
  54. ("range_period", 20),
  55. ("k1", 0.5),
  56. ("k2", 0.5),
  57. )
  58. def __init__(self):
  59. super().__init__()
  60. def next(self):
  61. super().next()
  62. if self.order:
  63. return
  64. if len(self) <= self.p.range_period:
  65. return
  66. closes = [float(self.data.close[-offset]) for offset in range(1, self.p.range_period + 1)]
  67. thrust_range = max(closes) - min(closes)
  68. reference_price = float(self.data.close[-1])
  69. upper = reference_price + self.p.k1 * thrust_range
  70. lower = reference_price - self.p.k2 * thrust_range
  71. entry_signal = self.data.close[0] > upper
  72. exit_signal = self.data.close[0] < lower
  73. if entry_signal and not self.position:
  74. self._buy_full()
  75. elif self.position and exit_signal:
  76. self._go_flat()
  77. class DualThrustRegimeStrategy(BaseIndexStrategy):
  78. """Close-only Dual Thrust proxy with long-term SMA regime filter.
  79. Best config from Chinext50 experiments:
  80. - range_period=20, k1=0.3, k2=0.3, regime=120
  81. - Total return: 391.77%, Annual: 15.02%, Sharpe: 0.405, Max DD: 34.04%
  82. """
  83. params = (
  84. ("range_period", 20),
  85. ("k1", 0.5),
  86. ("k2", 0.5),
  87. ("regime", 200),
  88. )
  89. def __init__(self):
  90. super().__init__()
  91. self.sma_regime = bt.indicators.SMA(self.data.close, period=self.p.regime)
  92. def next(self):
  93. super().next()
  94. if self.order:
  95. return
  96. if len(self) <= max(self.p.range_period, self.p.regime):
  97. return
  98. if math.isnan(self.sma_regime[0]):
  99. return
  100. closes = [float(self.data.close[-offset]) for offset in range(1, self.p.range_period + 1)]
  101. thrust_range = max(closes) - min(closes)
  102. reference_price = float(self.data.close[-1])
  103. upper = reference_price + self.p.k1 * thrust_range
  104. lower = reference_price - self.p.k2 * thrust_range
  105. entry_signal = self.data.close[0] > upper and self.data.close[0] > self.sma_regime[0]
  106. exit_signal = self.data.close[0] < lower or self.data.close[0] < self.sma_regime[0]
  107. if entry_signal and not self.position:
  108. self._buy_full()
  109. elif self.position and exit_signal:
  110. self._go_flat()
  111. # Additional tested configurations (same classes, different params):
  112. # DualThrustFastStrategy: DualThrustBasicStrategy(range_period=15, k1=0.3, k2=0.3)
  113. # DualThrustSlowStrategy: DualThrustBasicStrategy(range_period=30, k1=0.3, k2=0.3)