#!/usr/bin/env python3 from __future__ import annotations import argparse import importlib.util import math import subprocess import sys from datetime import datetime, time from pathlib import Path from zoneinfo import ZoneInfo import pandas as pd from fetch_data import fetch_chinext50_data WORKDIR = Path(__file__).resolve().parent DEFAULT_EMAIL = "wangshuai.vip@qq.com" DEFAULT_BASELINE_CASH = 100_000 DEFAULT_DISPLAY_CASH = 1_000_000 SEND_TZ = ZoneInfo("Asia/Shanghai") SEND_START = time(15, 10) SEND_END = time(22, 30) def load_module(name: str, path: Path): spec = importlib.util.spec_from_file_location(name, path) mod = importlib.util.module_from_spec(spec) sys.modules[spec.name] = mod spec.loader.exec_module(mod) return mod combo_mod = load_module("combo", WORKDIR / "shortlist_combo_trials.py") exp_mod = load_module("exp", WORKDIR / "chinext50_experiments.py") bt = combo_mod.bt TRADING_DAYS = combo_mod.TRADING_DAYS BasePortfolioStrategy = combo_mod.BasePortfolioStrategy class CleanTradeRecorder(bt.Analyzer): def __init__(self): self.trades = [] self._entry_cost = 0.0 self._last_sell_price = None self._entry_qty = 0 def notify_order(self, order): if order.status != order.Completed: return if order.isbuy(): self._entry_cost += abs(order.executed.size) * order.executed.price self._entry_qty += abs(int(round(order.executed.size))) elif order.issell(): self._last_sell_price = round(order.executed.price, 2) def notify_trade(self, trade): if not trade.isclosed: return pnl = round(trade.pnlcomm, 2) cost = self._entry_cost if self._entry_cost > 0 else 1e-9 pnl_pct = round((pnl / cost) * 100, 2) exit_value = round(self.strategy.broker.getvalue(), 2) self.trades.append( { "entry_date": bt.num2date(trade.dtopen).strftime("%Y-%m-%d"), "exit_date": bt.num2date(trade.dtclose).strftime("%Y-%m-%d"), "entry_price": round(trade.price, 2) if trade.price else None, "exit_price": self._last_sell_price, "qty": self._entry_qty, "days": int(trade.barlen), "pnl": pnl, "pnl_pct": pnl_pct, "nav": exit_value, } ) self._entry_cost = 0.0 self._last_sell_price = None self._entry_qty = 0 def run_with_trades(strategy_cls, df, baseline_cash: float, config: dict | None = None): cerebro = bt.Cerebro(stdstats=False) cerebro.adddata(combo_mod.Chinext50Data(dataname=df)) cerebro.addstrategy(strategy_cls, **(config or {})) cerebro.broker.setcash(baseline_cash) cerebro.broker.setcommission(commission=combo_mod.COMMISSION) cerebro.addanalyzer(bt.analyzers.Returns, _name="returns") cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown") cerebro.addanalyzer(bt.analyzers.SharpeRatio_A, _name="sharpe", riskfreerate=0.02) cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades") cerebro.addanalyzer(CleanTradeRecorder, _name="recorder") strategy = cerebro.run()[0] final_value = cerebro.broker.getvalue() returns = strategy.analyzers.returns.get_analysis() drawdown = strategy.analyzers.drawdown.get_analysis() sharpe = strategy.analyzers.sharpe.get_analysis() trades = strategy.analyzers.trades.get_analysis() closed_trades = trades.get("total", {}).get("closed", 0) won_trades = trades.get("won", {}).get("total", 0) total_bars = len(df) metrics = { "final_value": round(final_value, 2), "total_return_pct": round((final_value / baseline_cash - 1.0) * 100.0, 2), "annual_return_pct": round(returns.get("rnorm100", 0.0), 2), "max_drawdown_pct": round(drawdown.get("max", {}).get("drawdown", 0.0), 2), "sharpe": round(sharpe["sharperatio"], 3) if sharpe.get("sharperatio") is not None else None, "closed_trades": closed_trades, "win_rate_pct": round((won_trades / closed_trades) * 100.0, 2) if closed_trades else 0.0, "exposure_pct": round((strategy.exposure_sum / total_bars) * 100.0, 2), } return metrics, strategy.analyzers.recorder.trades def scale_metrics(metrics: dict, scale: float) -> dict: out = dict(metrics) out["final_value"] = round(out["final_value"] * scale, 2) return out def scale_trades(trades: list[dict], scale: float) -> list[dict]: out = [] for trade in trades: row = dict(trade) row["qty"] = int(round(row["qty"] * scale)) row["pnl"] = round(row["pnl"] * scale, 2) row["nav"] = round(row["nav"] * scale, 2) out.append(row) return out def safe_num(v): if v is None: return "-" if isinstance(v, int): return f"{v:,}" if isinstance(v, float): s = f"{v:,.2f}" if s.endswith(".00"): s = s[:-3] return s return str(v) def td(text, extra=""): return f'{text}' def th(text): return f'{text}' def compute_subperiods(strategy_cls, df, baseline_cash: float, config: dict | None = None) -> list[dict]: periods = [ ("2014-06 ~ 2018-12", "2014-06-18", "2018-12-31"), ("2019-01 ~ 2022-12", "2019-01-01", "2022-12-31"), (f"2023-01 ~ {df.index.max().strftime('%Y-%m')}", "2023-01-01", df.index.max().strftime('%Y-%m-%d')), ] out = [] for label, start, end in periods: sub_df = df.loc[(df.index >= start) & (df.index <= end)].copy() if len(sub_df) < 30: continue metrics, _ = run_with_trades(strategy_cls, sub_df, baseline_cash, config) out.append( { "period": label, "annual": metrics["annual_return_pct"], "sharpe": metrics["sharpe"], "max_dd": metrics["max_drawdown_pct"], } ) return out def compute_recent_dualthrust_signals(df: pd.DataFrame, range_period: int = 20, k1: float = 0.3, k2: float = 0.3, recent_n: int = 20) -> list[dict]: state = False rows = [] close = df['close'] for i in range(len(df)): if i <= range_period: continue window = close.iloc[i - range_period:i] thrust_range = float(window.max() - window.min()) ref_price = float(close.iloc[i - 1]) last_close = float(close.iloc[i]) upper = ref_price + k1 * thrust_range lower = ref_price - k2 * thrust_range entry_signal = last_close > upper exit_signal = last_close < lower action = '空仓' if entry_signal and not state: state = True action = '买入触发' elif state and exit_signal: state = False action = '卖出触发' else: action = '持仓' if state else '空仓' rows.append({ 'date': df.index[i].strftime('%Y-%m-%d'), 'close': round(last_close, 2), 'upper': round(upper, 2), 'lower': round(lower, 2), 'entry_signal': '是' if entry_signal else '否', 'exit_signal': '是' if exit_signal else '否', 'status': action, }) return rows[-recent_n:] def compute_recent_combo_signals(df: pd.DataFrame, recent_n: int = 20) -> list[dict]: close = df['close'] high = df['high'] low = df['low'] returns = close.pct_change(1) sma120 = close.rolling(120).mean() sma150 = close.rolling(150).mean() roc20 = close.pct_change(20) roc120 = close.pct_change(120) vol30 = returns.rolling(30).std() prev_close = close.shift(1) tr = pd.concat([ (high - low), (high - prev_close).abs(), (low - prev_close).abs(), ], axis=1).max(axis=1) atr20 = tr.rolling(20).mean() highest55_prev = high.rolling(55).max().shift(1) lowest30_prev = low.rolling(30).min().shift(1) dt_reg_active = False hybrid_active = False hybrid_highest_close = None rows = [] for i in range(len(df)): dt_w = 0.0 if i > 120 and not pd.isna(sma120.iloc[i]): window = close.iloc[i - 20:i] thrust_range = float(window.max() - window.min()) ref_price = float(close.iloc[i - 1]) upper = ref_price + 0.35 * thrust_range lower = ref_price - 0.35 * thrust_range entry_signal = float(close.iloc[i]) > upper and float(close.iloc[i]) > float(sma120.iloc[i]) exit_signal = float(close.iloc[i]) < lower or float(close.iloc[i]) < float(sma120.iloc[i]) if not dt_reg_active and entry_signal: dt_reg_active = True elif dt_reg_active and exit_signal: dt_reg_active = False dt_w = 1.0 if dt_reg_active else 0.0 mvt_signal = False mvt_w = 0.0 if not any(pd.isna(x) for x in [roc20.iloc[i], roc120.iloc[i], sma150.iloc[i], vol30.iloc[i]]) and vol30.iloc[i] > 0: mvt_signal = bool(roc20.iloc[i] > 0 and roc120.iloc[i] > 0 and close.iloc[i] > sma150.iloc[i]) if mvt_signal: annualized_vol = float(vol30.iloc[i]) * math.sqrt(TRADING_DAYS) mvt_w = min(1.0, 0.29 / annualized_vol) hy_w = 0.0 hy_break = False if i > 55 and not any(pd.isna(x) for x in [highest55_prev.iloc[i], lowest30_prev.iloc[i], vol30.iloc[i], atr20.iloc[i]]) and vol30.iloc[i] > 0: hy_break = bool(close.iloc[i] > highest55_prev.iloc[i]) channel_exit = bool(close.iloc[i] < lowest30_prev.iloc[i]) if not hybrid_active: if hy_break: hybrid_active = True hybrid_highest_close = float(close.iloc[i]) else: hybrid_highest_close = max(hybrid_highest_close or float(close.iloc[i]), float(close.iloc[i])) trailing_stop = hybrid_highest_close - 4.0 * float(atr20.iloc[i]) if channel_exit or float(close.iloc[i]) < trailing_stop: hybrid_active = False hybrid_highest_close = None if hybrid_active: annualized_vol = float(vol30.iloc[i]) * math.sqrt(TRADING_DAYS) hy_w = min(1.0, 0.25 / annualized_vol) target_weight = 0.60 * dt_w + 0.20 * mvt_w + 0.20 * hy_w rows.append({ 'date': df.index[i].strftime('%Y-%m-%d'), 'close': round(float(close.iloc[i]), 2), 'dt_status': '开' if dt_reg_active else '关', 'mvt_status': '开' if mvt_signal else '关', 'hy_status': '开' if hybrid_active else '关', 'dt_weight': round(dt_w * 100, 1), 'mvt_weight': round(mvt_w * 100, 1), 'hy_weight': round(hy_w * 100, 1), 'target_weight': round(target_weight * 100, 1), }) return rows[-recent_n:] def build_recent_signal_html(strategy_name: str, recent_rows: list[dict]) -> str: if not recent_rows: return '' if strategy_name == 'DualThrustBasicStrategy': header = '' + th('日期') + th('收盘') + th('上轨') + th('下轨') + th('入场触发') + th('出场触发') + th('状态') + '' body = [] for r in recent_rows: body.append('' + td(r['date']) + td(safe_num(r['close'])) + td(safe_num(r['upper'])) + td(safe_num(r['lower'])) + td(r['entry_signal']) + td(r['exit_signal']) + td(r['status']) + '') else: header = '' + th('日期') + th('收盘') + th('DT状态') + th('MVT状态') + th('HY状态') + th('DT权重') + th('MVT权重') + th('HY权重') + th('组合目标仓位') + '' body = [] for r in recent_rows: body.append('' + td(r['date']) + td(safe_num(r['close'])) + td(r['dt_status']) + td(r['mvt_status']) + td(r['hy_status']) + td(f"{r['dt_weight']:.1f}%") + td(f"{r['mvt_weight']:.1f}%") + td(f"{r['hy_weight']:.1f}%") + td(f"{r['target_weight']:.1f}%") + '') return f'''

近20交易日指标触发情况

{header}{''.join(body)}
''' def build_html(title: str, strategy_name: str, config_desc: str, metrics: dict, trades: list[dict], subperiods: list[dict], recent_signal_rows: list[dict], df, display_cash: float, scaled_note: str): total_pnl = sum(t["pnl"] for t in trades) trade_rows = [] for i, t in enumerate(trades, 1): pnl_color = "#0b8f3d" if t["pnl_pct"] >= 0 else "#c62828" pnl_sign = "+" if t["pnl_pct"] > 0 else "" trade_rows.append( "" + td(i) + td(t["entry_date"]) + td(safe_num(t["entry_price"])) + td(t["exit_date"]) + td(safe_num(t["exit_price"])) + td(safe_num(t["qty"])) + td(t["days"]) + td(safe_num(t["pnl"]), f"color:{pnl_color};font-weight:700;") + td(f"{pnl_sign}{t['pnl_pct']:.2f}%", f"color:{pnl_color};font-weight:700;") + td(safe_num(t["nav"])) + "" ) trade_rows = "\n".join(trade_rows) sub_rows = [] for sp in subperiods: sub_rows.append( "" + td(sp["period"]) + td(f"{sp['annual']:.2f}%") + td(sp["sharpe"]) + td(f"{sp['max_dd']:.2f}%") + "" ) sub_rows = "\n".join(sub_rows) data_start = df.index.min().date() data_end = df.index.max().date() bars = len(df) recent_signal_html = build_recent_signal_html(strategy_name, recent_signal_rows) return f""" {title}

{title}

初始资金:{safe_num(display_cash)}
策略名称:{strategy_name}
配置参数:{config_desc}
数据来源:chinext50.csv({data_start} 至 {data_end},{bars} 根 K 线)
数据处理:{scaled_note}

核心指标

{th('指标')}{th('数值')}{th('指标')}{th('数值')}{td('总收益')}{td(f"{metrics['total_return_pct']:.2f}%")}{td('年化收益')}{td(f"{metrics['annual_return_pct']:.2f}%")}{td('Sharpe')}{td(metrics['sharpe'])}{td('最大回撤')}{td(f"{metrics['max_drawdown_pct']:.2f}%")}{td('交易次数')}{td(metrics['closed_trades'])}{td('胜率')}{td(f"{metrics['win_rate_pct']:.2f}%")}{td('最终净值')}{td(safe_num(metrics['final_value']))}{td('平均暴露')}{td(f"{metrics['exposure_pct']:.2f}%")}

子区间复核

{th('区间')}{th('年化收益')}{th('Sharpe')}{th('最大回撤')} {sub_rows}
{recent_signal_html}

历史交易记录

累计盈亏:{safe_num(total_pnl)}
{th('#')}{th('买入时间')}{th('买入价')}{th('卖出时间')}{th('卖出价')}{th('数量')}{th('天数')}{th('盈亏额')}{th('盈亏%')}{th('账户净值')} {trade_rows}
本报告每次运行都会先拉取远程最新数据,刷新本地 chinext50.csv,再重算后发出。
""" class Balanced3_DT60_MVT20_HY20(BasePortfolioStrategy): params = (("w_dt", 0.60), ("w_mvt", 0.20), ("w_hy", 0.20), ("rebalance_band", 0.05)) def __init__(self): super().__init__() close = self.data.close returns = bt.indicators.PctChange(close, period=1) self.volatility = bt.indicators.StdDev(returns, period=30) self.atr = bt.indicators.ATR(self.data, period=20) self.roc_short = bt.indicators.ROC(close, period=20) self.roc_long = bt.indicators.ROC(close, period=120) self.sma120 = bt.indicators.SMA(close, period=120) self.sma150 = bt.indicators.SMA(close, period=150) self.highest_high = bt.indicators.Highest(self.data.high, period=55) self.lowest_low = bt.indicators.Lowest(self.data.low, period=30) self.dt_reg_active = False self.hybrid_active = False self.hybrid_highest_close = None def next(self): super().next() if self.order: return dt_w = 0.0 if len(self) > 120 and not math.isnan(self.sma120[0]): closes = [float(self.data.close[-offset]) for offset in range(1, 21)] thrust_range = max(closes) - min(closes) reference_price = float(self.data.close[-1]) upper = reference_price + 0.35 * thrust_range lower = reference_price - 0.35 * thrust_range entry_signal = self.data.close[0] > upper and self.data.close[0] > self.sma120[0] exit_signal = self.data.close[0] < lower or self.data.close[0] < self.sma120[0] if not self.dt_reg_active and entry_signal: self.dt_reg_active = True elif self.dt_reg_active and exit_signal: self.dt_reg_active = False dt_w = 1.0 if self.dt_reg_active else 0.0 mvt_w = 0.0 if not any(math.isnan(x) for x in [self.roc_short[0], self.roc_long[0], self.sma150[0], self.volatility[0]]) and self.volatility[0] > 0: signal = self.roc_short[0] > 0 and self.roc_long[0] > 0 and self.data.close[0] > self.sma150[0] if signal: annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS) mvt_w = min(1.0, 0.29 / annualized_vol) hy_w = 0.0 if len(self) > 55 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.volatility[0], self.atr[0]]) and self.volatility[0] > 0: breakout_signal = self.data.close[0] > self.highest_high[-1] channel_exit = self.data.close[0] < self.lowest_low[-1] if not self.hybrid_active: if breakout_signal: self.hybrid_active = True self.hybrid_highest_close = float(self.data.close[0]) else: self.hybrid_highest_close = max(self.hybrid_highest_close or float(self.data.close[0]), float(self.data.close[0])) trailing_stop = self.hybrid_highest_close - 4.0 * self.atr[0] if channel_exit or self.data.close[0] < trailing_stop: self.hybrid_active = False self.hybrid_highest_close = None if self.hybrid_active: annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS) hy_w = min(1.0, 0.25 / annualized_vol) target_weight = self.p.w_dt * dt_w + self.p.w_mvt * mvt_w + self.p.w_hy * hy_w pv = self.broker.getvalue() cw = (abs(self.position.size) * self.data.close[0]) / pv if pv > 0 else 0.0 if not self.position or abs(cw - target_weight) >= self.p.rebalance_band: self._rebalance_to_weight(target_weight) def write_mail(subject: str, html: str, output_path: Path, to_email: str): mail = "\n".join( [ f"Subject: {subject}", f"From: {to_email}", f"To: {to_email}", "MIME-Version: 1.0", "Content-Type: text/html; charset=utf-8", "Content-Transfer-Encoding: 8bit", "", html, ] ) output_path.write_text(mail, encoding="utf-8") def send_mail(file_path: Path, to_email: str): content = file_path.read_bytes() subprocess.run(["/usr/sbin/sendmail", to_email], input=content, check=True) def get_send_window_status() -> tuple[bool, str]: now = datetime.now(SEND_TZ) now_t = now.time() allowed = SEND_START <= now_t <= SEND_END msg = ( f"send_window tz=Asia/Shanghai now={now.strftime('%Y-%m-%d %H:%M:%S')} " f"window={SEND_START.strftime('%H:%M')}-{SEND_END.strftime('%H:%M')} allowed={allowed}" ) return allowed, msg def main(): parser = argparse.ArgumentParser() parser.add_argument("--email", default=DEFAULT_EMAIL) parser.add_argument("--baseline-cash", type=float, default=DEFAULT_BASELINE_CASH) parser.add_argument("--display-cash", type=float, default=DEFAULT_DISPLAY_CASH) parser.add_argument("--skip-refresh", action="store_true") parser.add_argument("--skip-send", action="store_true") parser.add_argument("--force-send", action="store_true", help="Ignore send window and send immediately.") args = parser.parse_args() if not args.skip_refresh: try: refreshed = fetch_chinext50_data(save_path=WORKDIR / "chinext50.csv") except Exception as e: print(f"REFRESH_FAILED: {e}", file=sys.stderr) print("ABORT_SEND: remote refresh failed, report email not sent", file=sys.stderr) raise SystemExit(2) print(f"refreshed rows={len(refreshed)} range={refreshed['datetime'].min().date()}~{refreshed['datetime'].max().date()}") df = exp_mod.load_dataframe() scale = args.display_cash / args.baseline_cash scaled_note = ( f"先刷新远程数据后回测;展示口径按 {safe_num(args.baseline_cash)} 基准回测等比例放大到 {safe_num(args.display_cash)}。" if abs(scale - 1.0) > 1e-9 else f"先刷新远程数据后,按 {safe_num(args.display_cash)} 初始资金直接回测。" ) m1, t1 = run_with_trades(exp_mod.DualThrustBasicStrategy, df, args.baseline_cash, {"range_period": 20, "k1": 0.3, "k2": 0.3}) m2, t2 = run_with_trades(Balanced3_DT60_MVT20_HY20, df, args.baseline_cash) if abs(scale - 1.0) > 1e-9: m1, t1 = scale_metrics(m1, scale), scale_trades(t1, scale) m2, t2 = scale_metrics(m2, scale), scale_trades(t2, scale) sub1 = compute_subperiods( exp_mod.DualThrustBasicStrategy, df, args.baseline_cash, {"range_period": 20, "k1": 0.3, "k2": 0.3}, ) sub2 = compute_subperiods( Balanced3_DT60_MVT20_HY20, df, args.baseline_cash, None, ) recent1 = compute_recent_dualthrust_signals(df, range_period=20, k1=0.3, k2=0.3, recent_n=20) recent2 = compute_recent_combo_signals(df, recent_n=20) html1 = build_html( "DualThrustBasicStrategy — 单策略统计信息(自动刷新版)", "DualThrustBasicStrategy", "range_period=20, k1=0.3, k2=0.3", m1, t1, sub1, recent1, df, args.display_cash, scaled_note, ) html2 = build_html( "Balanced3_DT60_MVT20_HY20 — 组合策略统计信息(自动刷新版)", "Balanced3_DT60_MVT20_HY20", "DualThrustRegime(60%) + MVT_reg150_tv029(20%) + DonchianHybrid_b55_e30_tv025_atr4(20%)", m2, t2, sub2, recent2, df, args.display_cash, scaled_note, ) p1 = WORKDIR / "auto_refresh_email_strategy_1.html" p2 = WORKDIR / "auto_refresh_email_strategy_2.html" write_mail("【策略统计-自动刷新版】DualThrustBasicStrategy 单策略统计信息", html1, p1, args.email) write_mail("【策略统计-自动刷新版】Balanced3_DT60_MVT20_HY20 组合策略统计信息", html2, p2, args.email) print(f"strategy1 total={m1['total_return_pct']}% final={m1['final_value']}") print(f"strategy2 total={m2['total_return_pct']}% final={m2['final_value']}") print(f"wrote {p1} and {p2}") if not args.skip_send: allowed, window_msg = get_send_window_status() print(window_msg) if allowed or args.force_send: send_mail(p1, args.email) send_mail(p2, args.email) print("sent both emails") else: print("SKIP_SEND: outside Asia/Shanghai send window; reports generated but not mailed") if __name__ == "__main__": main()