| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import json
- import math
- from datetime import datetime
- from pathlib import Path
- import backtrader as bt
- import pandas as pd
- ROOT = Path(__file__).resolve().parent
- DATA_FILE = ROOT / "chinext50.csv"
- INITIAL_CASH = 100000.0
- COMMISSION = 0.001
- TRADING_DAYS = 252
- class Chinext50Data(bt.feeds.PandasData):
- params = (
- ("datetime", None),
- ("open", "open"),
- ("high", "high"),
- ("low", "low"),
- ("close", "close"),
- ("volume", "volume"),
- ("openinterest", None),
- )
- class BasePortfolioStrategy(bt.Strategy):
- params = (("rebalance_band", 0.05),)
- def __init__(self):
- self.order = None
- self.entry_count = 0
- self.bars_in_market = 0
- self.exposure_sum = 0.0
- def notify_order(self, order):
- if order.status in [order.Submitted, order.Accepted]:
- return
- if order.status == order.Completed and order.isbuy():
- self.entry_count += 1
- self.order = None
- def next(self):
- portfolio_value = self.broker.getvalue()
- if portfolio_value > 0:
- position_value = abs(self.position.size) * self.data.close[0]
- exposure = position_value / portfolio_value
- self.exposure_sum += exposure
- if exposure > 0:
- self.bars_in_market += 1
- def _target_size_for_weight(self, target_weight: float) -> int:
- target_weight = max(0.0, min(1.0, target_weight))
- portfolio_value = self.broker.getvalue()
- price = self.data.close[0]
- if portfolio_value <= 0 or price <= 0:
- return 0
- target_value = portfolio_value * target_weight
- return max(int(target_value / price), 0)
- def _rebalance_to_weight(self, target_weight: float):
- target_size = self._target_size_for_weight(target_weight)
- current_size = self.position.size
- size_delta = target_size - current_size
- if size_delta > 0:
- self.order = self.buy(size=size_delta)
- elif size_delta < 0:
- self.order = self.sell(size=abs(size_delta))
- class Core3ComboStrategy(BasePortfolioStrategy):
- """Equal-weight average of DualThrustBasic + DonchianRegime + MVT(0.30)."""
- def __init__(self):
- super().__init__()
- close = self.data.close
- self.roc_short = bt.indicators.ROC(close, period=20)
- self.roc_long = bt.indicators.ROC(close, period=120)
- self.sma150 = bt.indicators.SMA(close, period=150)
- returns = bt.indicators.PctChange(close, period=1)
- self.volatility = bt.indicators.StdDev(returns, period=30)
- self.highest_high = bt.indicators.Highest(self.data.high, period=55)
- self.lowest_low = bt.indicators.Lowest(self.data.low, period=30)
- self.dual_active = False
- self.don_active = False
- def next(self):
- super().next()
- if self.order:
- return
- weights = []
- # DualThrustBasic (20, 0.3, 0.3)
- dual_w = 0.0
- if len(self) > 20:
- closes = [float(self.data.close[-offset]) for offset in range(1, 21)]
- thrust_range = max(closes) - min(closes)
- reference_price = float(self.data.close[-1])
- upper = reference_price + 0.3 * thrust_range
- lower = reference_price - 0.3 * thrust_range
- if not self.dual_active and self.data.close[0] > upper:
- self.dual_active = True
- elif self.dual_active and self.data.close[0] < lower:
- self.dual_active = False
- dual_w = 1.0 if self.dual_active else 0.0
- weights.append(dual_w)
- # DonchianRegime (55, 30, 150)
- don_w = 0.0
- if len(self) > 150 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.sma150[0]]):
- breakout_signal = self.data.close[0] > self.highest_high[-1] and self.data.close[0] > self.sma150[0]
- exit_signal = self.data.close[0] < self.lowest_low[-1] or self.data.close[0] < self.sma150[0]
- if not self.don_active and breakout_signal:
- self.don_active = True
- elif self.don_active and exit_signal:
- self.don_active = False
- don_w = 1.0 if self.don_active else 0.0
- weights.append(don_w)
- # MomentumVolTarget (0.30)
- mvt_w = 0.0
- if not any(math.isnan(x) for x in [self.roc_short[0], self.roc_long[0], self.sma150[0], self.volatility[0]]) and self.volatility[0] > 0:
- signal = self.roc_short[0] > 0 and self.roc_long[0] > 0 and self.data.close[0] > self.sma150[0]
- if signal:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- mvt_w = min(1.0, 0.30 / annualized_vol)
- weights.append(mvt_w)
- target_weight = sum(weights) / len(weights)
- portfolio_value = self.broker.getvalue()
- current_weight = 0.0
- if portfolio_value > 0:
- current_weight = (abs(self.position.size) * self.data.close[0]) / portfolio_value
- if not self.position or abs(current_weight - target_weight) >= self.p.rebalance_band:
- self._rebalance_to_weight(target_weight)
- class Balanced3ComboStrategy(BasePortfolioStrategy):
- """Equal-weight average of DualThrustRegime035 + MVT(0.29) + DonchianHybrid(0.25)."""
- def __init__(self):
- super().__init__()
- close = self.data.close
- returns = bt.indicators.PctChange(close, period=1)
- self.volatility = bt.indicators.StdDev(returns, period=30)
- self.atr = bt.indicators.ATR(self.data, period=20)
- self.roc_short = bt.indicators.ROC(close, period=20)
- self.roc_long = bt.indicators.ROC(close, period=120)
- self.sma120 = bt.indicators.SMA(close, period=120)
- self.sma150 = bt.indicators.SMA(close, period=150)
- self.highest_high = bt.indicators.Highest(self.data.high, period=55)
- self.lowest_low = bt.indicators.Lowest(self.data.low, period=30)
- self.dt_reg_active = False
- self.hybrid_active = False
- self.hybrid_highest_close = None
- def next(self):
- super().next()
- if self.order:
- return
- weights = []
- # DualThrustRegime (20, 0.35, 0.35, 120)
- dt_w = 0.0
- if len(self) > 120 and not math.isnan(self.sma120[0]):
- closes = [float(self.data.close[-offset]) for offset in range(1, 21)]
- thrust_range = max(closes) - min(closes)
- reference_price = float(self.data.close[-1])
- upper = reference_price + 0.35 * thrust_range
- lower = reference_price - 0.35 * thrust_range
- entry_signal = self.data.close[0] > upper and self.data.close[0] > self.sma120[0]
- exit_signal = self.data.close[0] < lower or self.data.close[0] < self.sma120[0]
- if not self.dt_reg_active and entry_signal:
- self.dt_reg_active = True
- elif self.dt_reg_active and exit_signal:
- self.dt_reg_active = False
- dt_w = 1.0 if self.dt_reg_active else 0.0
- weights.append(dt_w)
- # MVT (0.29)
- mvt_w = 0.0
- if not any(math.isnan(x) for x in [self.roc_short[0], self.roc_long[0], self.sma150[0], self.volatility[0]]) and self.volatility[0] > 0:
- signal = self.roc_short[0] > 0 and self.roc_long[0] > 0 and self.data.close[0] > self.sma150[0]
- if signal:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- mvt_w = min(1.0, 0.29 / annualized_vol)
- weights.append(mvt_w)
- # DonchianHybrid (55,30,tv=0.25,atr=4)
- hy_w = 0.0
- if len(self) > 55 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.volatility[0], self.atr[0]]) and self.volatility[0] > 0:
- breakout_signal = self.data.close[0] > self.highest_high[-1]
- channel_exit = self.data.close[0] < self.lowest_low[-1]
- if not self.hybrid_active:
- if breakout_signal:
- self.hybrid_active = True
- self.hybrid_highest_close = float(self.data.close[0])
- else:
- self.hybrid_highest_close = max(self.hybrid_highest_close or float(self.data.close[0]), float(self.data.close[0]))
- trailing_stop = self.hybrid_highest_close - 4.0 * self.atr[0]
- if channel_exit or self.data.close[0] < trailing_stop:
- self.hybrid_active = False
- self.hybrid_highest_close = None
- if self.hybrid_active:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- hy_w = min(1.0, 0.25 / annualized_vol)
- weights.append(hy_w)
- target_weight = sum(weights) / len(weights)
- portfolio_value = self.broker.getvalue()
- current_weight = 0.0
- if portfolio_value > 0:
- current_weight = (abs(self.position.size) * self.data.close[0]) / portfolio_value
- if not self.position or abs(current_weight - target_weight) >= self.p.rebalance_band:
- self._rebalance_to_weight(target_weight)
- class Top5BlendStrategy(BasePortfolioStrategy):
- """Average of DTBasic + DonchianRegime + MVT0.30 + MVT0.29 + DonchianHybrid0.25."""
- def __init__(self):
- super().__init__()
- close = self.data.close
- returns = bt.indicators.PctChange(close, period=1)
- self.volatility = bt.indicators.StdDev(returns, period=30)
- self.atr = bt.indicators.ATR(self.data, period=20)
- self.roc_short = bt.indicators.ROC(close, period=20)
- self.roc_long = bt.indicators.ROC(close, period=120)
- self.sma150 = bt.indicators.SMA(close, period=150)
- self.highest_high = bt.indicators.Highest(self.data.high, period=55)
- self.lowest_low = bt.indicators.Lowest(self.data.low, period=30)
- self.dt_active = False
- self.don_active = False
- self.hybrid_active = False
- self.hybrid_highest_close = None
- def next(self):
- super().next()
- if self.order:
- return
- weights = []
- # DTBasic
- dt_w = 0.0
- if len(self) > 20:
- closes = [float(self.data.close[-offset]) for offset in range(1, 21)]
- thrust_range = max(closes) - min(closes)
- reference_price = float(self.data.close[-1])
- upper = reference_price + 0.3 * thrust_range
- lower = reference_price - 0.3 * thrust_range
- if not self.dt_active and self.data.close[0] > upper:
- self.dt_active = True
- elif self.dt_active and self.data.close[0] < lower:
- self.dt_active = False
- dt_w = 1.0 if self.dt_active else 0.0
- weights.append(dt_w)
- # DonchianRegime
- don_w = 0.0
- if len(self) > 150 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.sma150[0]]):
- breakout_signal = self.data.close[0] > self.highest_high[-1] and self.data.close[0] > self.sma150[0]
- exit_signal = self.data.close[0] < self.lowest_low[-1] or self.data.close[0] < self.sma150[0]
- if not self.don_active and breakout_signal:
- self.don_active = True
- elif self.don_active and exit_signal:
- self.don_active = False
- don_w = 1.0 if self.don_active else 0.0
- weights.append(don_w)
- # MVT 0.30 and 0.29
- for tv in (0.30, 0.29):
- mvt_w = 0.0
- if not any(math.isnan(x) for x in [self.roc_short[0], self.roc_long[0], self.sma150[0], self.volatility[0]]) and self.volatility[0] > 0:
- signal = self.roc_short[0] > 0 and self.roc_long[0] > 0 and self.data.close[0] > self.sma150[0]
- if signal:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- mvt_w = min(1.0, tv / annualized_vol)
- weights.append(mvt_w)
- # DonchianHybrid 0.25
- hy_w = 0.0
- if len(self) > 55 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.volatility[0], self.atr[0]]) and self.volatility[0] > 0:
- breakout_signal = self.data.close[0] > self.highest_high[-1]
- channel_exit = self.data.close[0] < self.lowest_low[-1]
- if not self.hybrid_active:
- if breakout_signal:
- self.hybrid_active = True
- self.hybrid_highest_close = float(self.data.close[0])
- else:
- self.hybrid_highest_close = max(self.hybrid_highest_close or float(self.data.close[0]), float(self.data.close[0]))
- trailing_stop = self.hybrid_highest_close - 4.0 * self.atr[0]
- if channel_exit or self.data.close[0] < trailing_stop:
- self.hybrid_active = False
- self.hybrid_highest_close = None
- if self.hybrid_active:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- hy_w = min(1.0, 0.25 / annualized_vol)
- weights.append(hy_w)
- target_weight = sum(weights) / len(weights)
- portfolio_value = self.broker.getvalue()
- current_weight = 0.0
- if portfolio_value > 0:
- current_weight = (abs(self.position.size) * self.data.close[0]) / portfolio_value
- if not self.position or abs(current_weight - target_weight) >= self.p.rebalance_band:
- self._rebalance_to_weight(target_weight)
- def load_dataframe() -> pd.DataFrame:
- df = pd.read_csv(DATA_FILE, parse_dates=['datetime'], index_col='datetime')
- df = df.sort_index()
- return df[['open', 'high', 'low', 'close', 'volume']].copy()
- def run_strategy(strategy_cls, df: pd.DataFrame) -> dict:
- cerebro = bt.Cerebro(stdstats=False)
- cerebro.adddata(Chinext50Data(dataname=df))
- cerebro.addstrategy(strategy_cls)
- cerebro.broker.setcash(INITIAL_CASH)
- cerebro.broker.setcommission(commission=COMMISSION)
- cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
- cerebro.addanalyzer(bt.analyzers.SharpeRatio_A, _name='sharpe', riskfreerate=0.02)
- cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
- strategy = cerebro.run()[0]
- final_value = cerebro.broker.getvalue()
- returns = strategy.analyzers.returns.get_analysis()
- drawdown = strategy.analyzers.drawdown.get_analysis()
- sharpe = strategy.analyzers.sharpe.get_analysis()
- trades = strategy.analyzers.trades.get_analysis()
- closed_trades = trades.get('total', {}).get('closed', 0)
- won_trades = trades.get('won', {}).get('total', 0)
- total_bars = len(df)
- return {
- 'final_value': round(final_value, 2),
- 'total_return_pct': round((final_value / INITIAL_CASH - 1.0) * 100.0, 2),
- 'annual_return_pct': round(returns.get('rnorm100', 0.0), 2),
- 'max_drawdown_pct': round(drawdown.get('max', {}).get('drawdown', 0.0), 2),
- 'sharpe': round(sharpe['sharperatio'], 3) if sharpe.get('sharperatio') is not None else None,
- 'entries': strategy.entry_count,
- 'closed_trades': closed_trades,
- 'win_rate_pct': round((won_trades / closed_trades) * 100.0, 2) if closed_trades else 0.0,
- 'exposure_pct': round((strategy.exposure_sum / total_bars) * 100.0, 2),
- }
- def main():
- df = load_dataframe()
- combos = [
- ('Core3ComboStrategy', Core3ComboStrategy),
- ('Balanced3ComboStrategy', Balanced3ComboStrategy),
- ('Top5BlendStrategy', Top5BlendStrategy),
- ]
- results = []
- for name, cls in combos:
- metrics = run_strategy(cls, df)
- results.append({'name': name, 'metrics': metrics})
- print(f"{name}: final={metrics['final_value']}, total_return={metrics['total_return_pct']}%, annual_return={metrics['annual_return_pct']}%, sharpe={metrics['sharpe']}, max_dd={metrics['max_drawdown_pct']}%, closed_trades={metrics['closed_trades']}, win_rate={metrics['win_rate_pct']}%, avg_exposure={metrics['exposure_pct']}%")
- stamp = datetime.now().strftime('%Y%m%d-%H%M%S')
- json_path = ROOT / f'shortlist_combo_trials_{stamp}.json'
- md_path = ROOT / f'shortlist_combo_trials_{stamp}.md'
- json_path.write_text(json.dumps({'results': results}, ensure_ascii=False, indent=2))
- lines = [
- '# Shortlist Combo Trials',
- '',
- '| Combo | Annual Return | Sharpe | Max DD | Closed Trades | Win Rate | Avg Exposure |',
- '| --- | ---: | ---: | ---: | ---: | ---: | ---: |',
- ]
- for item in results:
- m = item['metrics']
- lines.append(f"| {item['name']} | {m['annual_return_pct']:.2f}% | {m['sharpe']} | {m['max_drawdown_pct']:.2f}% | {m['closed_trades']} | {m['win_rate_pct']:.2f}% | {m['exposure_pct']:.2f}% |")
- md_path.write_text('\n'.join(lines))
- print(f'JSON_RESULT={json_path}')
- print(f'MD_RESULT={md_path}')
- if __name__ == '__main__':
- main()
|