| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594 |
- #!/usr/bin/env python3
- from __future__ import annotations
- import argparse
- import importlib.util
- import math
- import subprocess
- import sys
- from datetime import datetime, time
- from pathlib import Path
- from zoneinfo import ZoneInfo
- import pandas as pd
- from fetch_data import fetch_chinext50_data
- WORKDIR = Path(__file__).resolve().parent
- DEFAULT_EMAIL = "wangshuai.vip@qq.com"
- DEFAULT_BASELINE_CASH = 100_000
- DEFAULT_DISPLAY_CASH = 1_000_000
- SEND_TZ = ZoneInfo("Asia/Shanghai")
- SEND_START = time(15, 10)
- SEND_END = time(22, 30)
- def load_module(name: str, path: Path):
- spec = importlib.util.spec_from_file_location(name, path)
- mod = importlib.util.module_from_spec(spec)
- sys.modules[spec.name] = mod
- spec.loader.exec_module(mod)
- return mod
- combo_mod = load_module("combo", WORKDIR / "shortlist_combo_trials.py")
- exp_mod = load_module("exp", WORKDIR / "chinext50_experiments.py")
- bt = combo_mod.bt
- TRADING_DAYS = combo_mod.TRADING_DAYS
- BasePortfolioStrategy = combo_mod.BasePortfolioStrategy
- class CleanTradeRecorder(bt.Analyzer):
- def __init__(self):
- self.trades = []
- self._entry_cost = 0.0
- self._last_sell_price = None
- self._entry_qty = 0
- def notify_order(self, order):
- if order.status != order.Completed:
- return
- if order.isbuy():
- self._entry_cost += abs(order.executed.size) * order.executed.price
- self._entry_qty += abs(int(round(order.executed.size)))
- elif order.issell():
- self._last_sell_price = round(order.executed.price, 2)
- def notify_trade(self, trade):
- if not trade.isclosed:
- return
- pnl = round(trade.pnlcomm, 2)
- cost = self._entry_cost if self._entry_cost > 0 else 1e-9
- pnl_pct = round((pnl / cost) * 100, 2)
- exit_value = round(self.strategy.broker.getvalue(), 2)
- self.trades.append(
- {
- "entry_date": bt.num2date(trade.dtopen).strftime("%Y-%m-%d"),
- "exit_date": bt.num2date(trade.dtclose).strftime("%Y-%m-%d"),
- "entry_price": round(trade.price, 2) if trade.price else None,
- "exit_price": self._last_sell_price,
- "qty": self._entry_qty,
- "days": int(trade.barlen),
- "pnl": pnl,
- "pnl_pct": pnl_pct,
- "nav": exit_value,
- }
- )
- self._entry_cost = 0.0
- self._last_sell_price = None
- self._entry_qty = 0
- def run_with_trades(strategy_cls, df, baseline_cash: float, config: dict | None = None):
- cerebro = bt.Cerebro(stdstats=False)
- cerebro.adddata(combo_mod.Chinext50Data(dataname=df))
- cerebro.addstrategy(strategy_cls, **(config or {}))
- cerebro.broker.setcash(baseline_cash)
- cerebro.broker.setcommission(commission=combo_mod.COMMISSION)
- cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")
- cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")
- cerebro.addanalyzer(bt.analyzers.SharpeRatio_A, _name="sharpe", riskfreerate=0.02)
- cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades")
- cerebro.addanalyzer(CleanTradeRecorder, _name="recorder")
- strategy = cerebro.run()[0]
- final_value = cerebro.broker.getvalue()
- returns = strategy.analyzers.returns.get_analysis()
- drawdown = strategy.analyzers.drawdown.get_analysis()
- sharpe = strategy.analyzers.sharpe.get_analysis()
- trades = strategy.analyzers.trades.get_analysis()
- closed_trades = trades.get("total", {}).get("closed", 0)
- won_trades = trades.get("won", {}).get("total", 0)
- total_bars = len(df)
- metrics = {
- "final_value": round(final_value, 2),
- "total_return_pct": round((final_value / baseline_cash - 1.0) * 100.0, 2),
- "annual_return_pct": round(returns.get("rnorm100", 0.0), 2),
- "max_drawdown_pct": round(drawdown.get("max", {}).get("drawdown", 0.0), 2),
- "sharpe": round(sharpe["sharperatio"], 3) if sharpe.get("sharperatio") is not None else None,
- "closed_trades": closed_trades,
- "win_rate_pct": round((won_trades / closed_trades) * 100.0, 2) if closed_trades else 0.0,
- "exposure_pct": round((strategy.exposure_sum / total_bars) * 100.0, 2),
- }
- return metrics, strategy.analyzers.recorder.trades
- def scale_metrics(metrics: dict, scale: float) -> dict:
- out = dict(metrics)
- out["final_value"] = round(out["final_value"] * scale, 2)
- return out
- def scale_trades(trades: list[dict], scale: float) -> list[dict]:
- out = []
- for trade in trades:
- row = dict(trade)
- row["qty"] = int(round(row["qty"] * scale))
- row["pnl"] = round(row["pnl"] * scale, 2)
- row["nav"] = round(row["nav"] * scale, 2)
- out.append(row)
- return out
- def safe_num(v):
- if v is None:
- return "-"
- if isinstance(v, int):
- return f"{v:,}"
- if isinstance(v, float):
- s = f"{v:,.2f}"
- if s.endswith(".00"):
- s = s[:-3]
- return s
- return str(v)
- def td(text, extra=""):
- return f'<td style="border:1px solid #d9dee7;padding:8px 10px;{extra}">{text}</td>'
- def th(text):
- return f'<th style="border:1px solid #d9dee7;background:#f3f6fa;padding:8px 10px;text-align:left;">{text}</th>'
- def compute_subperiods(strategy_cls, df, baseline_cash: float, config: dict | None = None) -> list[dict]:
- periods = [
- ("2014-06 ~ 2018-12", "2014-06-18", "2018-12-31"),
- ("2019-01 ~ 2022-12", "2019-01-01", "2022-12-31"),
- (f"2023-01 ~ {df.index.max().strftime('%Y-%m')}", "2023-01-01", df.index.max().strftime('%Y-%m-%d')),
- ]
- out = []
- for label, start, end in periods:
- sub_df = df.loc[(df.index >= start) & (df.index <= end)].copy()
- if len(sub_df) < 30:
- continue
- metrics, _ = run_with_trades(strategy_cls, sub_df, baseline_cash, config)
- out.append(
- {
- "period": label,
- "annual": metrics["annual_return_pct"],
- "sharpe": metrics["sharpe"],
- "max_dd": metrics["max_drawdown_pct"],
- }
- )
- return out
- def compute_recent_dualthrust_signals(df: pd.DataFrame, range_period: int = 20, k1: float = 0.3, k2: float = 0.3, recent_n: int = 20) -> list[dict]:
- state = False
- rows = []
- close = df['close']
- for i in range(len(df)):
- if i <= range_period:
- continue
- window = close.iloc[i - range_period:i]
- thrust_range = float(window.max() - window.min())
- ref_price = float(close.iloc[i - 1])
- last_close = float(close.iloc[i])
- upper = ref_price + k1 * thrust_range
- lower = ref_price - k2 * thrust_range
- entry_signal = last_close > upper
- exit_signal = last_close < lower
- action = '空仓'
- if entry_signal and not state:
- state = True
- action = '买入触发'
- elif state and exit_signal:
- state = False
- action = '卖出触发'
- else:
- action = '持仓' if state else '空仓'
- rows.append({
- 'date': df.index[i].strftime('%Y-%m-%d'),
- 'close': round(last_close, 2),
- 'upper': round(upper, 2),
- 'lower': round(lower, 2),
- 'entry_signal': '是' if entry_signal else '否',
- 'exit_signal': '是' if exit_signal else '否',
- 'status': action,
- })
- return rows[-recent_n:]
- def compute_recent_combo_signals(df: pd.DataFrame, recent_n: int = 20) -> list[dict]:
- close = df['close']
- high = df['high']
- low = df['low']
- returns = close.pct_change(1)
- sma120 = close.rolling(120).mean()
- sma150 = close.rolling(150).mean()
- roc20 = close.pct_change(20)
- roc120 = close.pct_change(120)
- vol30 = returns.rolling(30).std()
- prev_close = close.shift(1)
- tr = pd.concat([
- (high - low),
- (high - prev_close).abs(),
- (low - prev_close).abs(),
- ], axis=1).max(axis=1)
- atr20 = tr.rolling(20).mean()
- highest55_prev = high.rolling(55).max().shift(1)
- lowest30_prev = low.rolling(30).min().shift(1)
- dt_reg_active = False
- hybrid_active = False
- hybrid_highest_close = None
- rows = []
- for i in range(len(df)):
- dt_w = 0.0
- if i > 120 and not pd.isna(sma120.iloc[i]):
- window = close.iloc[i - 20:i]
- thrust_range = float(window.max() - window.min())
- ref_price = float(close.iloc[i - 1])
- upper = ref_price + 0.35 * thrust_range
- lower = ref_price - 0.35 * thrust_range
- entry_signal = float(close.iloc[i]) > upper and float(close.iloc[i]) > float(sma120.iloc[i])
- exit_signal = float(close.iloc[i]) < lower or float(close.iloc[i]) < float(sma120.iloc[i])
- if not dt_reg_active and entry_signal:
- dt_reg_active = True
- elif dt_reg_active and exit_signal:
- dt_reg_active = False
- dt_w = 1.0 if dt_reg_active else 0.0
- mvt_signal = False
- mvt_w = 0.0
- if not any(pd.isna(x) for x in [roc20.iloc[i], roc120.iloc[i], sma150.iloc[i], vol30.iloc[i]]) and vol30.iloc[i] > 0:
- mvt_signal = bool(roc20.iloc[i] > 0 and roc120.iloc[i] > 0 and close.iloc[i] > sma150.iloc[i])
- if mvt_signal:
- annualized_vol = float(vol30.iloc[i]) * math.sqrt(TRADING_DAYS)
- mvt_w = min(1.0, 0.29 / annualized_vol)
- hy_w = 0.0
- hy_break = False
- if i > 55 and not any(pd.isna(x) for x in [highest55_prev.iloc[i], lowest30_prev.iloc[i], vol30.iloc[i], atr20.iloc[i]]) and vol30.iloc[i] > 0:
- hy_break = bool(close.iloc[i] > highest55_prev.iloc[i])
- channel_exit = bool(close.iloc[i] < lowest30_prev.iloc[i])
- if not hybrid_active:
- if hy_break:
- hybrid_active = True
- hybrid_highest_close = float(close.iloc[i])
- else:
- hybrid_highest_close = max(hybrid_highest_close or float(close.iloc[i]), float(close.iloc[i]))
- trailing_stop = hybrid_highest_close - 4.0 * float(atr20.iloc[i])
- if channel_exit or float(close.iloc[i]) < trailing_stop:
- hybrid_active = False
- hybrid_highest_close = None
- if hybrid_active:
- annualized_vol = float(vol30.iloc[i]) * math.sqrt(TRADING_DAYS)
- hy_w = min(1.0, 0.25 / annualized_vol)
- target_weight = 0.60 * dt_w + 0.20 * mvt_w + 0.20 * hy_w
- rows.append({
- 'date': df.index[i].strftime('%Y-%m-%d'),
- 'close': round(float(close.iloc[i]), 2),
- 'dt_status': '开' if dt_reg_active else '关',
- 'mvt_status': '开' if mvt_signal else '关',
- 'hy_status': '开' if hybrid_active else '关',
- 'dt_weight': round(dt_w * 100, 1),
- 'mvt_weight': round(mvt_w * 100, 1),
- 'hy_weight': round(hy_w * 100, 1),
- 'target_weight': round(target_weight * 100, 1),
- })
- return rows[-recent_n:]
- def build_recent_signal_html(strategy_name: str, recent_rows: list[dict]) -> str:
- if not recent_rows:
- return ''
- if strategy_name == 'DualThrustBasicStrategy':
- header = '<tr>' + th('日期') + th('收盘') + th('上轨') + th('下轨') + th('入场触发') + th('出场触发') + th('状态') + '</tr>'
- body = []
- for r in recent_rows:
- body.append('<tr>' + td(r['date']) + td(safe_num(r['close'])) + td(safe_num(r['upper'])) + td(safe_num(r['lower'])) + td(r['entry_signal']) + td(r['exit_signal']) + td(r['status']) + '</tr>')
- else:
- header = '<tr>' + th('日期') + th('收盘') + th('DT状态') + th('MVT状态') + th('HY状态') + th('DT权重') + th('MVT权重') + th('HY权重') + th('组合目标仓位') + '</tr>'
- body = []
- for r in recent_rows:
- body.append('<tr>' + td(r['date']) + td(safe_num(r['close'])) + td(r['dt_status']) + td(r['mvt_status']) + td(r['hy_status']) + td(f"{r['dt_weight']:.1f}%") + td(f"{r['mvt_weight']:.1f}%") + td(f"{r['hy_weight']:.1f}%") + td(f"{r['target_weight']:.1f}%") + '</tr>')
- return f'''<h2 style="margin:20px 0 12px 0;font-size:18px;color:#1f2d3d;">近20交易日指标触发情况</h2>
- <table style="width:100%;border-collapse:collapse;font-size:13px;margin-bottom:22px;">{header}{''.join(body)}</table>'''
- def build_html(title: str, strategy_name: str, config_desc: str, metrics: dict, trades: list[dict], subperiods: list[dict], recent_signal_rows: list[dict], df, display_cash: float, scaled_note: str):
- total_pnl = sum(t["pnl"] for t in trades)
- trade_rows = []
- for i, t in enumerate(trades, 1):
- pnl_color = "#0b8f3d" if t["pnl_pct"] >= 0 else "#c62828"
- pnl_sign = "+" if t["pnl_pct"] > 0 else ""
- trade_rows.append(
- "<tr>"
- + td(i)
- + td(t["entry_date"])
- + td(safe_num(t["entry_price"]))
- + td(t["exit_date"])
- + td(safe_num(t["exit_price"]))
- + td(safe_num(t["qty"]))
- + td(t["days"])
- + td(safe_num(t["pnl"]), f"color:{pnl_color};font-weight:700;")
- + td(f"{pnl_sign}{t['pnl_pct']:.2f}%", f"color:{pnl_color};font-weight:700;")
- + td(safe_num(t["nav"]))
- + "</tr>"
- )
- trade_rows = "\n".join(trade_rows)
- sub_rows = []
- for sp in subperiods:
- sub_rows.append(
- "<tr>"
- + td(sp["period"])
- + td(f"{sp['annual']:.2f}%")
- + td(sp["sharpe"])
- + td(f"{sp['max_dd']:.2f}%")
- + "</tr>"
- )
- sub_rows = "\n".join(sub_rows)
- data_start = df.index.min().date()
- data_end = df.index.max().date()
- bars = len(df)
- recent_signal_html = build_recent_signal_html(strategy_name, recent_signal_rows)
- return f"""<!DOCTYPE html>
- <html>
- <head>
- <meta charset="utf-8">
- <title>{title}</title>
- </head>
- <body style="margin:0;padding:24px;background:#f6f8fb;font-family:Arial,'PingFang SC','Microsoft YaHei',sans-serif;color:#243447;">
- <div style="max-width:1120px;margin:0 auto;background:#ffffff;border:1px solid #e6ebf2;border-radius:10px;padding:28px;">
- <h1 style="margin:0 0 16px 0;font-size:26px;line-height:1.3;color:#1f2d3d;">{title}</h1>
- <div style="font-size:14px;line-height:1.8;color:#4a5568;margin-bottom:20px;">
- <div><strong>初始资金:</strong>{safe_num(display_cash)}</div>
- <div><strong>策略名称:</strong>{strategy_name}</div>
- <div><strong>配置参数:</strong>{config_desc}</div>
- <div><strong>数据来源:</strong>chinext50.csv({data_start} 至 {data_end},{bars} 根 K 线)</div>
- <div><strong>数据处理:</strong>{scaled_note}</div>
- </div>
- <h2 style="margin:20px 0 12px 0;font-size:18px;color:#1f2d3d;">核心指标</h2>
- <table style="width:100%;border-collapse:collapse;font-size:14px;margin-bottom:22px;">
- <tr>{th('指标')}{th('数值')}{th('指标')}{th('数值')}</tr>
- <tr>{td('总收益')}{td(f"{metrics['total_return_pct']:.2f}%")}{td('年化收益')}{td(f"{metrics['annual_return_pct']:.2f}%")}</tr>
- <tr>{td('Sharpe')}{td(metrics['sharpe'])}{td('最大回撤')}{td(f"{metrics['max_drawdown_pct']:.2f}%")}</tr>
- <tr>{td('交易次数')}{td(metrics['closed_trades'])}{td('胜率')}{td(f"{metrics['win_rate_pct']:.2f}%")}</tr>
- <tr>{td('最终净值')}{td(safe_num(metrics['final_value']))}{td('平均暴露')}{td(f"{metrics['exposure_pct']:.2f}%")}</tr>
- </table>
- <h2 style="margin:20px 0 12px 0;font-size:18px;color:#1f2d3d;">子区间复核</h2>
- <table style="width:100%;border-collapse:collapse;font-size:14px;margin-bottom:22px;">
- <tr>{th('区间')}{th('年化收益')}{th('Sharpe')}{th('最大回撤')}</tr>
- {sub_rows}
- </table>
- {recent_signal_html}
- <h2 style="margin:20px 0 12px 0;font-size:18px;color:#1f2d3d;">历史交易记录</h2>
- <div style="font-size:13px;color:#667085;margin-bottom:10px;">累计盈亏:{safe_num(total_pnl)}</div>
- <table style="width:100%;border-collapse:collapse;font-size:13px;">
- <tr>{th('#')}{th('买入时间')}{th('买入价')}{th('卖出时间')}{th('卖出价')}{th('数量')}{th('天数')}{th('盈亏额')}{th('盈亏%')}{th('账户净值')}</tr>
- {trade_rows}
- </table>
- <div style="margin-top:18px;background:#fff7e6;border-left:4px solid #f0b429;padding:10px 12px;font-size:13px;color:#7a5c00;">本报告每次运行都会先拉取远程最新数据,刷新本地 chinext50.csv,再重算后发出。</div>
- </div>
- </body>
- </html>"""
- class Balanced3_DT60_MVT20_HY20(BasePortfolioStrategy):
- params = (("w_dt", 0.60), ("w_mvt", 0.20), ("w_hy", 0.20), ("rebalance_band", 0.05))
- def __init__(self):
- super().__init__()
- close = self.data.close
- returns = bt.indicators.PctChange(close, period=1)
- self.volatility = bt.indicators.StdDev(returns, period=30)
- self.atr = bt.indicators.ATR(self.data, period=20)
- self.roc_short = bt.indicators.ROC(close, period=20)
- self.roc_long = bt.indicators.ROC(close, period=120)
- self.sma120 = bt.indicators.SMA(close, period=120)
- self.sma150 = bt.indicators.SMA(close, period=150)
- self.highest_high = bt.indicators.Highest(self.data.high, period=55)
- self.lowest_low = bt.indicators.Lowest(self.data.low, period=30)
- self.dt_reg_active = False
- self.hybrid_active = False
- self.hybrid_highest_close = None
- def next(self):
- super().next()
- if self.order:
- return
- dt_w = 0.0
- if len(self) > 120 and not math.isnan(self.sma120[0]):
- closes = [float(self.data.close[-offset]) for offset in range(1, 21)]
- thrust_range = max(closes) - min(closes)
- reference_price = float(self.data.close[-1])
- upper = reference_price + 0.35 * thrust_range
- lower = reference_price - 0.35 * thrust_range
- entry_signal = self.data.close[0] > upper and self.data.close[0] > self.sma120[0]
- exit_signal = self.data.close[0] < lower or self.data.close[0] < self.sma120[0]
- if not self.dt_reg_active and entry_signal:
- self.dt_reg_active = True
- elif self.dt_reg_active and exit_signal:
- self.dt_reg_active = False
- dt_w = 1.0 if self.dt_reg_active else 0.0
- mvt_w = 0.0
- if not any(math.isnan(x) for x in [self.roc_short[0], self.roc_long[0], self.sma150[0], self.volatility[0]]) and self.volatility[0] > 0:
- signal = self.roc_short[0] > 0 and self.roc_long[0] > 0 and self.data.close[0] > self.sma150[0]
- if signal:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- mvt_w = min(1.0, 0.29 / annualized_vol)
- hy_w = 0.0
- if len(self) > 55 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.volatility[0], self.atr[0]]) and self.volatility[0] > 0:
- breakout_signal = self.data.close[0] > self.highest_high[-1]
- channel_exit = self.data.close[0] < self.lowest_low[-1]
- if not self.hybrid_active:
- if breakout_signal:
- self.hybrid_active = True
- self.hybrid_highest_close = float(self.data.close[0])
- else:
- self.hybrid_highest_close = max(self.hybrid_highest_close or float(self.data.close[0]), float(self.data.close[0]))
- trailing_stop = self.hybrid_highest_close - 4.0 * self.atr[0]
- if channel_exit or self.data.close[0] < trailing_stop:
- self.hybrid_active = False
- self.hybrid_highest_close = None
- if self.hybrid_active:
- annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
- hy_w = min(1.0, 0.25 / annualized_vol)
- target_weight = self.p.w_dt * dt_w + self.p.w_mvt * mvt_w + self.p.w_hy * hy_w
- pv = self.broker.getvalue()
- cw = (abs(self.position.size) * self.data.close[0]) / pv if pv > 0 else 0.0
- if not self.position or abs(cw - target_weight) >= self.p.rebalance_band:
- self._rebalance_to_weight(target_weight)
- def write_mail(subject: str, html: str, output_path: Path, to_email: str):
- mail = "\n".join(
- [
- f"Subject: {subject}",
- f"From: {to_email}",
- f"To: {to_email}",
- "MIME-Version: 1.0",
- "Content-Type: text/html; charset=utf-8",
- "Content-Transfer-Encoding: 8bit",
- "",
- html,
- ]
- )
- output_path.write_text(mail, encoding="utf-8")
- def send_mail(file_path: Path, to_email: str):
- content = file_path.read_bytes()
- subprocess.run(["/usr/sbin/sendmail", to_email], input=content, check=True)
- def get_send_window_status() -> tuple[bool, str]:
- now = datetime.now(SEND_TZ)
- now_t = now.time()
- allowed = SEND_START <= now_t <= SEND_END
- msg = (
- f"send_window tz=Asia/Shanghai now={now.strftime('%Y-%m-%d %H:%M:%S')} "
- f"window={SEND_START.strftime('%H:%M')}-{SEND_END.strftime('%H:%M')} allowed={allowed}"
- )
- return allowed, msg
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--email", default=DEFAULT_EMAIL)
- parser.add_argument("--baseline-cash", type=float, default=DEFAULT_BASELINE_CASH)
- parser.add_argument("--display-cash", type=float, default=DEFAULT_DISPLAY_CASH)
- parser.add_argument("--skip-refresh", action="store_true")
- parser.add_argument("--skip-send", action="store_true")
- parser.add_argument("--force-send", action="store_true", help="Ignore send window and send immediately.")
- args = parser.parse_args()
- if not args.skip_refresh:
- try:
- refreshed = fetch_chinext50_data(save_path=WORKDIR / "chinext50.csv")
- except Exception as e:
- print(f"REFRESH_FAILED: {e}", file=sys.stderr)
- print("ABORT_SEND: remote refresh failed, report email not sent", file=sys.stderr)
- raise SystemExit(2)
- print(f"refreshed rows={len(refreshed)} range={refreshed['datetime'].min().date()}~{refreshed['datetime'].max().date()}")
- df = exp_mod.load_dataframe()
- scale = args.display_cash / args.baseline_cash
- scaled_note = (
- f"先刷新远程数据后回测;展示口径按 {safe_num(args.baseline_cash)} 基准回测等比例放大到 {safe_num(args.display_cash)}。"
- if abs(scale - 1.0) > 1e-9
- else f"先刷新远程数据后,按 {safe_num(args.display_cash)} 初始资金直接回测。"
- )
- m1, t1 = run_with_trades(exp_mod.DualThrustBasicStrategy, df, args.baseline_cash, {"range_period": 20, "k1": 0.3, "k2": 0.3})
- m2, t2 = run_with_trades(Balanced3_DT60_MVT20_HY20, df, args.baseline_cash)
- if abs(scale - 1.0) > 1e-9:
- m1, t1 = scale_metrics(m1, scale), scale_trades(t1, scale)
- m2, t2 = scale_metrics(m2, scale), scale_trades(t2, scale)
- sub1 = compute_subperiods(
- exp_mod.DualThrustBasicStrategy,
- df,
- args.baseline_cash,
- {"range_period": 20, "k1": 0.3, "k2": 0.3},
- )
- sub2 = compute_subperiods(
- Balanced3_DT60_MVT20_HY20,
- df,
- args.baseline_cash,
- None,
- )
- recent1 = compute_recent_dualthrust_signals(df, range_period=20, k1=0.3, k2=0.3, recent_n=20)
- recent2 = compute_recent_combo_signals(df, recent_n=20)
- html1 = build_html(
- "DualThrustBasicStrategy — 单策略统计信息(自动刷新版)",
- "DualThrustBasicStrategy",
- "range_period=20, k1=0.3, k2=0.3",
- m1,
- t1,
- sub1,
- recent1,
- df,
- args.display_cash,
- scaled_note,
- )
- html2 = build_html(
- "Balanced3_DT60_MVT20_HY20 — 组合策略统计信息(自动刷新版)",
- "Balanced3_DT60_MVT20_HY20",
- "DualThrustRegime(60%) + MVT_reg150_tv029(20%) + DonchianHybrid_b55_e30_tv025_atr4(20%)",
- m2,
- t2,
- sub2,
- recent2,
- df,
- args.display_cash,
- scaled_note,
- )
- p1 = WORKDIR / "auto_refresh_email_strategy_1.html"
- p2 = WORKDIR / "auto_refresh_email_strategy_2.html"
- write_mail("【策略统计-自动刷新版】DualThrustBasicStrategy 单策略统计信息", html1, p1, args.email)
- write_mail("【策略统计-自动刷新版】Balanced3_DT60_MVT20_HY20 组合策略统计信息", html2, p2, args.email)
- print(f"strategy1 total={m1['total_return_pct']}% final={m1['final_value']}")
- print(f"strategy2 total={m2['total_return_pct']}% final={m2['final_value']}")
- print(f"wrote {p1} and {p2}")
- if not args.skip_send:
- allowed, window_msg = get_send_window_status()
- print(window_msg)
- if allowed or args.force_send:
- send_mail(p1, args.email)
- send_mail(p2, args.email)
- print("sent both emails")
- else:
- print("SKIP_SEND: outside Asia/Shanghai send window; reports generated but not mailed")
- if __name__ == "__main__":
- main()
|