#!/usr/bin/env python3
from __future__ import annotations
import argparse
import importlib.util
import math
import subprocess
import sys
from datetime import datetime, time
from pathlib import Path
from zoneinfo import ZoneInfo
import pandas as pd
from fetch_data import fetch_chinext50_data
WORKDIR = Path(__file__).resolve().parent
DEFAULT_EMAIL = "wangshuai.vip@qq.com"
DEFAULT_BASELINE_CASH = 100_000
DEFAULT_DISPLAY_CASH = 1_000_000
SEND_TZ = ZoneInfo("Asia/Shanghai")
SEND_START = time(15, 10)
SEND_END = time(22, 30)
def load_module(name: str, path: Path):
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
return mod
combo_mod = load_module("combo", WORKDIR / "shortlist_combo_trials.py")
exp_mod = load_module("exp", WORKDIR / "chinext50_experiments.py")
bt = combo_mod.bt
TRADING_DAYS = combo_mod.TRADING_DAYS
BasePortfolioStrategy = combo_mod.BasePortfolioStrategy
class CleanTradeRecorder(bt.Analyzer):
def __init__(self):
self.trades = []
self._entry_cost = 0.0
self._last_sell_price = None
self._entry_qty = 0
def notify_order(self, order):
if order.status != order.Completed:
return
if order.isbuy():
self._entry_cost += abs(order.executed.size) * order.executed.price
self._entry_qty += abs(int(round(order.executed.size)))
elif order.issell():
self._last_sell_price = round(order.executed.price, 2)
def notify_trade(self, trade):
if not trade.isclosed:
return
pnl = round(trade.pnlcomm, 2)
cost = self._entry_cost if self._entry_cost > 0 else 1e-9
pnl_pct = round((pnl / cost) * 100, 2)
exit_value = round(self.strategy.broker.getvalue(), 2)
self.trades.append(
{
"entry_date": bt.num2date(trade.dtopen).strftime("%Y-%m-%d"),
"exit_date": bt.num2date(trade.dtclose).strftime("%Y-%m-%d"),
"entry_price": round(trade.price, 2) if trade.price else None,
"exit_price": self._last_sell_price,
"qty": self._entry_qty,
"days": int(trade.barlen),
"pnl": pnl,
"pnl_pct": pnl_pct,
"nav": exit_value,
}
)
self._entry_cost = 0.0
self._last_sell_price = None
self._entry_qty = 0
def run_with_trades(strategy_cls, df, baseline_cash: float, config: dict | None = None):
cerebro = bt.Cerebro(stdstats=False)
cerebro.adddata(combo_mod.Chinext50Data(dataname=df))
cerebro.addstrategy(strategy_cls, **(config or {}))
cerebro.broker.setcash(baseline_cash)
cerebro.broker.setcommission(commission=combo_mod.COMMISSION)
cerebro.addanalyzer(bt.analyzers.Returns, _name="returns")
cerebro.addanalyzer(bt.analyzers.DrawDown, _name="drawdown")
cerebro.addanalyzer(bt.analyzers.SharpeRatio_A, _name="sharpe", riskfreerate=0.02)
cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name="trades")
cerebro.addanalyzer(CleanTradeRecorder, _name="recorder")
strategy = cerebro.run()[0]
final_value = cerebro.broker.getvalue()
returns = strategy.analyzers.returns.get_analysis()
drawdown = strategy.analyzers.drawdown.get_analysis()
sharpe = strategy.analyzers.sharpe.get_analysis()
trades = strategy.analyzers.trades.get_analysis()
closed_trades = trades.get("total", {}).get("closed", 0)
won_trades = trades.get("won", {}).get("total", 0)
total_bars = len(df)
metrics = {
"final_value": round(final_value, 2),
"total_return_pct": round((final_value / baseline_cash - 1.0) * 100.0, 2),
"annual_return_pct": round(returns.get("rnorm100", 0.0), 2),
"max_drawdown_pct": round(drawdown.get("max", {}).get("drawdown", 0.0), 2),
"sharpe": round(sharpe["sharperatio"], 3) if sharpe.get("sharperatio") is not None else None,
"closed_trades": closed_trades,
"win_rate_pct": round((won_trades / closed_trades) * 100.0, 2) if closed_trades else 0.0,
"exposure_pct": round((strategy.exposure_sum / total_bars) * 100.0, 2),
}
return metrics, strategy.analyzers.recorder.trades
def scale_metrics(metrics: dict, scale: float) -> dict:
out = dict(metrics)
out["final_value"] = round(out["final_value"] * scale, 2)
return out
def scale_trades(trades: list[dict], scale: float) -> list[dict]:
out = []
for trade in trades:
row = dict(trade)
row["qty"] = int(round(row["qty"] * scale))
row["pnl"] = round(row["pnl"] * scale, 2)
row["nav"] = round(row["nav"] * scale, 2)
out.append(row)
return out
def safe_num(v):
if v is None:
return "-"
if isinstance(v, int):
return f"{v:,}"
if isinstance(v, float):
s = f"{v:,.2f}"
if s.endswith(".00"):
s = s[:-3]
return s
return str(v)
def td(text, extra=""):
return f'
{text} | '
def th(text):
return f'{text} | '
def compute_subperiods(strategy_cls, df, baseline_cash: float, config: dict | None = None) -> list[dict]:
periods = [
("2014-06 ~ 2018-12", "2014-06-18", "2018-12-31"),
("2019-01 ~ 2022-12", "2019-01-01", "2022-12-31"),
(f"2023-01 ~ {df.index.max().strftime('%Y-%m')}", "2023-01-01", df.index.max().strftime('%Y-%m-%d')),
]
out = []
for label, start, end in periods:
sub_df = df.loc[(df.index >= start) & (df.index <= end)].copy()
if len(sub_df) < 30:
continue
metrics, _ = run_with_trades(strategy_cls, sub_df, baseline_cash, config)
out.append(
{
"period": label,
"annual": metrics["annual_return_pct"],
"sharpe": metrics["sharpe"],
"max_dd": metrics["max_drawdown_pct"],
}
)
return out
def compute_recent_dualthrust_signals(df: pd.DataFrame, range_period: int = 20, k1: float = 0.3, k2: float = 0.3, recent_n: int = 20) -> list[dict]:
state = False
rows = []
close = df['close']
for i in range(len(df)):
if i <= range_period:
continue
window = close.iloc[i - range_period:i]
thrust_range = float(window.max() - window.min())
ref_price = float(close.iloc[i - 1])
last_close = float(close.iloc[i])
upper = ref_price + k1 * thrust_range
lower = ref_price - k2 * thrust_range
entry_signal = last_close > upper
exit_signal = last_close < lower
action = '空仓'
if entry_signal and not state:
state = True
action = '买入触发'
elif state and exit_signal:
state = False
action = '卖出触发'
else:
action = '持仓' if state else '空仓'
rows.append({
'date': df.index[i].strftime('%Y-%m-%d'),
'close': round(last_close, 2),
'upper': round(upper, 2),
'lower': round(lower, 2),
'entry_signal': '是' if entry_signal else '否',
'exit_signal': '是' if exit_signal else '否',
'status': action,
})
return rows[-recent_n:]
def compute_recent_combo_signals(df: pd.DataFrame, recent_n: int = 20) -> list[dict]:
close = df['close']
high = df['high']
low = df['low']
returns = close.pct_change(1)
sma120 = close.rolling(120).mean()
sma150 = close.rolling(150).mean()
roc20 = close.pct_change(20)
roc120 = close.pct_change(120)
vol30 = returns.rolling(30).std()
prev_close = close.shift(1)
tr = pd.concat([
(high - low),
(high - prev_close).abs(),
(low - prev_close).abs(),
], axis=1).max(axis=1)
atr20 = tr.rolling(20).mean()
highest55_prev = high.rolling(55).max().shift(1)
lowest30_prev = low.rolling(30).min().shift(1)
dt_reg_active = False
hybrid_active = False
hybrid_highest_close = None
rows = []
for i in range(len(df)):
dt_w = 0.0
if i > 120 and not pd.isna(sma120.iloc[i]):
window = close.iloc[i - 20:i]
thrust_range = float(window.max() - window.min())
ref_price = float(close.iloc[i - 1])
upper = ref_price + 0.35 * thrust_range
lower = ref_price - 0.35 * thrust_range
entry_signal = float(close.iloc[i]) > upper and float(close.iloc[i]) > float(sma120.iloc[i])
exit_signal = float(close.iloc[i]) < lower or float(close.iloc[i]) < float(sma120.iloc[i])
if not dt_reg_active and entry_signal:
dt_reg_active = True
elif dt_reg_active and exit_signal:
dt_reg_active = False
dt_w = 1.0 if dt_reg_active else 0.0
mvt_signal = False
mvt_w = 0.0
if not any(pd.isna(x) for x in [roc20.iloc[i], roc120.iloc[i], sma150.iloc[i], vol30.iloc[i]]) and vol30.iloc[i] > 0:
mvt_signal = bool(roc20.iloc[i] > 0 and roc120.iloc[i] > 0 and close.iloc[i] > sma150.iloc[i])
if mvt_signal:
annualized_vol = float(vol30.iloc[i]) * math.sqrt(TRADING_DAYS)
mvt_w = min(1.0, 0.29 / annualized_vol)
hy_w = 0.0
hy_break = False
if i > 55 and not any(pd.isna(x) for x in [highest55_prev.iloc[i], lowest30_prev.iloc[i], vol30.iloc[i], atr20.iloc[i]]) and vol30.iloc[i] > 0:
hy_break = bool(close.iloc[i] > highest55_prev.iloc[i])
channel_exit = bool(close.iloc[i] < lowest30_prev.iloc[i])
if not hybrid_active:
if hy_break:
hybrid_active = True
hybrid_highest_close = float(close.iloc[i])
else:
hybrid_highest_close = max(hybrid_highest_close or float(close.iloc[i]), float(close.iloc[i]))
trailing_stop = hybrid_highest_close - 4.0 * float(atr20.iloc[i])
if channel_exit or float(close.iloc[i]) < trailing_stop:
hybrid_active = False
hybrid_highest_close = None
if hybrid_active:
annualized_vol = float(vol30.iloc[i]) * math.sqrt(TRADING_DAYS)
hy_w = min(1.0, 0.25 / annualized_vol)
target_weight = 0.60 * dt_w + 0.20 * mvt_w + 0.20 * hy_w
rows.append({
'date': df.index[i].strftime('%Y-%m-%d'),
'close': round(float(close.iloc[i]), 2),
'dt_status': '开' if dt_reg_active else '关',
'mvt_status': '开' if mvt_signal else '关',
'hy_status': '开' if hybrid_active else '关',
'dt_weight': round(dt_w * 100, 1),
'mvt_weight': round(mvt_w * 100, 1),
'hy_weight': round(hy_w * 100, 1),
'target_weight': round(target_weight * 100, 1),
})
return rows[-recent_n:]
def build_recent_signal_html(strategy_name: str, recent_rows: list[dict]) -> str:
if not recent_rows:
return ''
if strategy_name == 'DualThrustBasicStrategy':
header = '' + th('日期') + th('收盘') + th('上轨') + th('下轨') + th('入场触发') + th('出场触发') + th('状态') + '
'
body = []
for r in recent_rows:
body.append('' + td(r['date']) + td(safe_num(r['close'])) + td(safe_num(r['upper'])) + td(safe_num(r['lower'])) + td(r['entry_signal']) + td(r['exit_signal']) + td(r['status']) + '
')
else:
header = '' + th('日期') + th('收盘') + th('DT状态') + th('MVT状态') + th('HY状态') + th('DT权重') + th('MVT权重') + th('HY权重') + th('组合目标仓位') + '
'
body = []
for r in recent_rows:
body.append('' + td(r['date']) + td(safe_num(r['close'])) + td(r['dt_status']) + td(r['mvt_status']) + td(r['hy_status']) + td(f"{r['dt_weight']:.1f}%") + td(f"{r['mvt_weight']:.1f}%") + td(f"{r['hy_weight']:.1f}%") + td(f"{r['target_weight']:.1f}%") + '
')
return f'''近20交易日指标触发情况
'''
def build_html(title: str, strategy_name: str, config_desc: str, metrics: dict, trades: list[dict], subperiods: list[dict], recent_signal_rows: list[dict], df, display_cash: float, scaled_note: str):
total_pnl = sum(t["pnl"] for t in trades)
trade_rows = []
for i, t in enumerate(trades, 1):
pnl_color = "#0b8f3d" if t["pnl_pct"] >= 0 else "#c62828"
pnl_sign = "+" if t["pnl_pct"] > 0 else ""
trade_rows.append(
""
+ td(i)
+ td(t["entry_date"])
+ td(safe_num(t["entry_price"]))
+ td(t["exit_date"])
+ td(safe_num(t["exit_price"]))
+ td(safe_num(t["qty"]))
+ td(t["days"])
+ td(safe_num(t["pnl"]), f"color:{pnl_color};font-weight:700;")
+ td(f"{pnl_sign}{t['pnl_pct']:.2f}%", f"color:{pnl_color};font-weight:700;")
+ td(safe_num(t["nav"]))
+ "
"
)
trade_rows = "\n".join(trade_rows)
sub_rows = []
for sp in subperiods:
sub_rows.append(
""
+ td(sp["period"])
+ td(f"{sp['annual']:.2f}%")
+ td(sp["sharpe"])
+ td(f"{sp['max_dd']:.2f}%")
+ "
"
)
sub_rows = "\n".join(sub_rows)
data_start = df.index.min().date()
data_end = df.index.max().date()
bars = len(df)
recent_signal_html = build_recent_signal_html(strategy_name, recent_signal_rows)
return f"""
{title}
{title}
初始资金:{safe_num(display_cash)}
策略名称:{strategy_name}
配置参数:{config_desc}
数据来源:chinext50.csv({data_start} 至 {data_end},{bars} 根 K 线)
数据处理:{scaled_note}
核心指标
{th('指标')}{th('数值')}{th('指标')}{th('数值')}
{td('总收益')}{td(f"{metrics['total_return_pct']:.2f}%")}{td('年化收益')}{td(f"{metrics['annual_return_pct']:.2f}%")}
{td('Sharpe')}{td(metrics['sharpe'])}{td('最大回撤')}{td(f"{metrics['max_drawdown_pct']:.2f}%")}
{td('交易次数')}{td(metrics['closed_trades'])}{td('胜率')}{td(f"{metrics['win_rate_pct']:.2f}%")}
{td('最终净值')}{td(safe_num(metrics['final_value']))}{td('平均暴露')}{td(f"{metrics['exposure_pct']:.2f}%")}
子区间复核
{th('区间')}{th('年化收益')}{th('Sharpe')}{th('最大回撤')}
{sub_rows}
{recent_signal_html}
历史交易记录
累计盈亏:{safe_num(total_pnl)}
{th('#')}{th('买入时间')}{th('买入价')}{th('卖出时间')}{th('卖出价')}{th('数量')}{th('天数')}{th('盈亏额')}{th('盈亏%')}{th('账户净值')}
{trade_rows}
本报告每次运行都会先拉取远程最新数据,刷新本地 chinext50.csv,再重算后发出。
"""
class Balanced3_DT60_MVT20_HY20(BasePortfolioStrategy):
params = (("w_dt", 0.60), ("w_mvt", 0.20), ("w_hy", 0.20), ("rebalance_band", 0.05))
def __init__(self):
super().__init__()
close = self.data.close
returns = bt.indicators.PctChange(close, period=1)
self.volatility = bt.indicators.StdDev(returns, period=30)
self.atr = bt.indicators.ATR(self.data, period=20)
self.roc_short = bt.indicators.ROC(close, period=20)
self.roc_long = bt.indicators.ROC(close, period=120)
self.sma120 = bt.indicators.SMA(close, period=120)
self.sma150 = bt.indicators.SMA(close, period=150)
self.highest_high = bt.indicators.Highest(self.data.high, period=55)
self.lowest_low = bt.indicators.Lowest(self.data.low, period=30)
self.dt_reg_active = False
self.hybrid_active = False
self.hybrid_highest_close = None
def next(self):
super().next()
if self.order:
return
dt_w = 0.0
if len(self) > 120 and not math.isnan(self.sma120[0]):
closes = [float(self.data.close[-offset]) for offset in range(1, 21)]
thrust_range = max(closes) - min(closes)
reference_price = float(self.data.close[-1])
upper = reference_price + 0.35 * thrust_range
lower = reference_price - 0.35 * thrust_range
entry_signal = self.data.close[0] > upper and self.data.close[0] > self.sma120[0]
exit_signal = self.data.close[0] < lower or self.data.close[0] < self.sma120[0]
if not self.dt_reg_active and entry_signal:
self.dt_reg_active = True
elif self.dt_reg_active and exit_signal:
self.dt_reg_active = False
dt_w = 1.0 if self.dt_reg_active else 0.0
mvt_w = 0.0
if not any(math.isnan(x) for x in [self.roc_short[0], self.roc_long[0], self.sma150[0], self.volatility[0]]) and self.volatility[0] > 0:
signal = self.roc_short[0] > 0 and self.roc_long[0] > 0 and self.data.close[0] > self.sma150[0]
if signal:
annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
mvt_w = min(1.0, 0.29 / annualized_vol)
hy_w = 0.0
if len(self) > 55 and not any(math.isnan(x) for x in [self.highest_high[-1], self.lowest_low[-1], self.volatility[0], self.atr[0]]) and self.volatility[0] > 0:
breakout_signal = self.data.close[0] > self.highest_high[-1]
channel_exit = self.data.close[0] < self.lowest_low[-1]
if not self.hybrid_active:
if breakout_signal:
self.hybrid_active = True
self.hybrid_highest_close = float(self.data.close[0])
else:
self.hybrid_highest_close = max(self.hybrid_highest_close or float(self.data.close[0]), float(self.data.close[0]))
trailing_stop = self.hybrid_highest_close - 4.0 * self.atr[0]
if channel_exit or self.data.close[0] < trailing_stop:
self.hybrid_active = False
self.hybrid_highest_close = None
if self.hybrid_active:
annualized_vol = self.volatility[0] * math.sqrt(TRADING_DAYS)
hy_w = min(1.0, 0.25 / annualized_vol)
target_weight = self.p.w_dt * dt_w + self.p.w_mvt * mvt_w + self.p.w_hy * hy_w
pv = self.broker.getvalue()
cw = (abs(self.position.size) * self.data.close[0]) / pv if pv > 0 else 0.0
if not self.position or abs(cw - target_weight) >= self.p.rebalance_band:
self._rebalance_to_weight(target_weight)
def write_mail(subject: str, html: str, output_path: Path, to_email: str):
mail = "\n".join(
[
f"Subject: {subject}",
f"From: {to_email}",
f"To: {to_email}",
"MIME-Version: 1.0",
"Content-Type: text/html; charset=utf-8",
"Content-Transfer-Encoding: 8bit",
"",
html,
]
)
output_path.write_text(mail, encoding="utf-8")
def send_mail(file_path: Path, to_email: str):
content = file_path.read_bytes()
subprocess.run(["/usr/sbin/sendmail", to_email], input=content, check=True)
def get_send_window_status() -> tuple[bool, str]:
now = datetime.now(SEND_TZ)
now_t = now.time()
allowed = SEND_START <= now_t <= SEND_END
msg = (
f"send_window tz=Asia/Shanghai now={now.strftime('%Y-%m-%d %H:%M:%S')} "
f"window={SEND_START.strftime('%H:%M')}-{SEND_END.strftime('%H:%M')} allowed={allowed}"
)
return allowed, msg
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--email", default=DEFAULT_EMAIL)
parser.add_argument("--baseline-cash", type=float, default=DEFAULT_BASELINE_CASH)
parser.add_argument("--display-cash", type=float, default=DEFAULT_DISPLAY_CASH)
parser.add_argument("--skip-refresh", action="store_true")
parser.add_argument("--skip-send", action="store_true")
parser.add_argument("--force-send", action="store_true", help="Ignore send window and send immediately.")
args = parser.parse_args()
if not args.skip_refresh:
try:
refreshed = fetch_chinext50_data(save_path=WORKDIR / "chinext50.csv")
except Exception as e:
print(f"REFRESH_FAILED: {e}", file=sys.stderr)
print("ABORT_SEND: remote refresh failed, report email not sent", file=sys.stderr)
raise SystemExit(2)
print(f"refreshed rows={len(refreshed)} range={refreshed['datetime'].min().date()}~{refreshed['datetime'].max().date()}")
df = exp_mod.load_dataframe()
scale = args.display_cash / args.baseline_cash
scaled_note = (
f"先刷新远程数据后回测;展示口径按 {safe_num(args.baseline_cash)} 基准回测等比例放大到 {safe_num(args.display_cash)}。"
if abs(scale - 1.0) > 1e-9
else f"先刷新远程数据后,按 {safe_num(args.display_cash)} 初始资金直接回测。"
)
m1, t1 = run_with_trades(exp_mod.DualThrustBasicStrategy, df, args.baseline_cash, {"range_period": 20, "k1": 0.3, "k2": 0.3})
m2, t2 = run_with_trades(Balanced3_DT60_MVT20_HY20, df, args.baseline_cash)
if abs(scale - 1.0) > 1e-9:
m1, t1 = scale_metrics(m1, scale), scale_trades(t1, scale)
m2, t2 = scale_metrics(m2, scale), scale_trades(t2, scale)
sub1 = compute_subperiods(
exp_mod.DualThrustBasicStrategy,
df,
args.baseline_cash,
{"range_period": 20, "k1": 0.3, "k2": 0.3},
)
sub2 = compute_subperiods(
Balanced3_DT60_MVT20_HY20,
df,
args.baseline_cash,
None,
)
recent1 = compute_recent_dualthrust_signals(df, range_period=20, k1=0.3, k2=0.3, recent_n=20)
recent2 = compute_recent_combo_signals(df, recent_n=20)
html1 = build_html(
"DualThrustBasicStrategy — 单策略统计信息(自动刷新版)",
"DualThrustBasicStrategy",
"range_period=20, k1=0.3, k2=0.3",
m1,
t1,
sub1,
recent1,
df,
args.display_cash,
scaled_note,
)
html2 = build_html(
"Balanced3_DT60_MVT20_HY20 — 组合策略统计信息(自动刷新版)",
"Balanced3_DT60_MVT20_HY20",
"DualThrustRegime(60%) + MVT_reg150_tv029(20%) + DonchianHybrid_b55_e30_tv025_atr4(20%)",
m2,
t2,
sub2,
recent2,
df,
args.display_cash,
scaled_note,
)
p1 = WORKDIR / "auto_refresh_email_strategy_1.html"
p2 = WORKDIR / "auto_refresh_email_strategy_2.html"
write_mail("【策略统计-自动刷新版】DualThrustBasicStrategy 单策略统计信息", html1, p1, args.email)
write_mail("【策略统计-自动刷新版】Balanced3_DT60_MVT20_HY20 组合策略统计信息", html2, p2, args.email)
print(f"strategy1 total={m1['total_return_pct']}% final={m1['final_value']}")
print(f"strategy2 total={m2['total_return_pct']}% final={m2['final_value']}")
print(f"wrote {p1} and {p2}")
if not args.skip_send:
allowed, window_msg = get_send_window_status()
print(window_msg)
if allowed or args.force_send:
send_mail(p1, args.email)
send_mail(p2, args.email)
print("sent both emails")
else:
print("SKIP_SEND: outside Asia/Shanghai send window; reports generated but not mailed")
if __name__ == "__main__":
main()