| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464 |
- from __future__ import annotations
- from dataclasses import asdict
- from datetime import date
- import json
- from pathlib import Path
- import pandas as pd
- from dragon_branch_configs import (
- alpha_first_glued_followthrough_probe_config,
- alpha_first_glued_followthrough_mid_exit_probe_config,
- alpha_first_glued_refined_hot_cap_config,
- alpha_first_selective_veto_config,
- workbook_preserving_config,
- )
- from dragon_execution_common import apply_execution_model as _apply_execution_model, risk_cluster as _risk_cluster, summary as _summary
- from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine
- from dragon_shared import START_DATE, format_num as _format_num, format_pct as _format_pct
- from dragon_strategy import DragonRuleEngine
- BRANCH_CONFIGS = [
- ("workbook_preserving", workbook_preserving_config),
- ("alpha_first_selective_veto", alpha_first_selective_veto_config),
- ("alpha_first_glued_refined_hot_cap", alpha_first_glued_refined_hot_cap_config),
- ("alpha_first_glued_followthrough_probe", alpha_first_glued_followthrough_probe_config),
- ("alpha_first_glued_followthrough_mid_exit_probe", alpha_first_glued_followthrough_mid_exit_probe_config),
- ]
- def _entry_family(reason: str) -> str:
- return str(reason).split(":", 1)[0]
- def _load_monitor_template(base_dir: Path) -> pd.DataFrame:
- return pd.read_csv(base_dir / "dragon_strategy_monitoring_template.csv", encoding="utf-8-sig")
- def _load_removed_trade_over_removal_count(base_dir: Path) -> float:
- path = base_dir / "dragon_glued_refined_removed_trade_attribution.csv"
- if not path.exists():
- return float("nan")
- df = pd.read_csv(path, encoding="utf-8-sig")
- if "recommendation" not in df.columns:
- return float("nan")
- return float((df["recommendation"].astype(str) == "OVER_REMOVAL").sum())
- def _load_local_sensitivity_robust_case_count(base_dir: Path) -> float:
- path = base_dir / "dragon_glued_refined_sensitivity.csv"
- if not path.exists():
- return float("nan")
- df = pd.read_csv(path, encoding="utf-8-sig")
- if df.empty or "label" not in df.columns:
- return float("nan")
- candidate = df[df["label"] == "refined_candidate_baseline"].copy()
- if candidate.empty:
- return float("nan")
- candidate_row = candidate.iloc[0]
- neighborhood = df[~df["label"].isin(["current_alpha_control", "refined_candidate_baseline"])].copy()
- if neighborhood.empty:
- return float("nan")
- robust = neighborhood[
- (neighborhood["avg_return"] >= float(candidate_row["avg_return"]) - 0.0015)
- & (neighborhood["profit_factor"] >= float(candidate_row["profit_factor"]) - 0.20)
- & (neighborhood["real_buy_overlap"] >= int(candidate_row["real_buy_overlap"]) - 1)
- & (neighborhood["real_sell_overlap"] >= int(candidate_row["real_sell_overlap"]) - 1)
- ]
- return float(len(robust))
- def _infer_initial_capital(base_dir: Path) -> float:
- workbook_trades = pd.read_csv(base_dir / "true_trades.csv", encoding="utf-8-sig")
- if workbook_trades.empty:
- return 55450.0
- first = workbook_trades.iloc[0]
- return float(first["ending_capital"]) / (1.0 + float(first["return_pct"]))
- def _build_branch_state(branch: str, config, indicators: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, object]]:
- engine = DragonRuleEngine(config=config)
- events, trades = engine.run(indicators.set_index("date", drop=False))
- latest_date = indicators["date"].max().date().isoformat()
- latest_row = indicators.iloc[-1]
- latest_timestamp = pd.Timestamp(latest_row["date"]).isoformat(timespec="seconds")
- real_events = events[events["layer"] == "real_trade"].copy()
- latest_events = events[events["date"] == latest_date].copy()
- last_real = real_events.iloc[-1] if not real_events.empty else None
- in_position = bool(engine.context.in_position)
- open_trade = None
- if in_position and engine.context.entry_date is not None:
- open_trade = {
- "entry_date": engine.context.entry_date.isoformat(),
- "entry_price": float(engine.context.entry_price) if engine.context.entry_price is not None else None,
- "entry_reason": engine.context.entry_reason,
- "current_return_pct": float(latest_row["close"]) / float(engine.context.entry_price) - 1.0
- if engine.context.entry_price
- else None,
- "holding_days": (latest_row["date"].date() - engine.context.entry_date).days,
- }
- state = {
- "branch": branch,
- "as_of_date": latest_date,
- "as_of_timestamp": latest_timestamp,
- "latest_close": float(latest_row["close"]),
- "latest_a1": float(latest_row["a1"]),
- "latest_b1": float(latest_row["b1"]),
- "latest_c1": float(latest_row["c1"]),
- "latest_kdj_buy": bool(latest_row["kdj_buy"]),
- "latest_kdj_sell": bool(latest_row["kdj_sell"]),
- "latest_ql_buy": bool(latest_row["ql_buy"]),
- "latest_ql_sell": bool(latest_row["ql_sell"]),
- "latest_real_event_date": "" if last_real is None else str(last_real["date"]),
- "latest_real_event_side": "" if last_real is None else str(last_real["side"]),
- "latest_real_event_reason": "" if last_real is None else str(last_real["reason"]),
- "events_today_count": int(len(latest_events)),
- "events_today": " | ".join(
- f"{row['side']}:{row['layer']}:{row['reason']}" for _, row in latest_events.iterrows()
- ),
- "in_position": in_position,
- "open_entry_date": "" if open_trade is None else str(open_trade["entry_date"]),
- "open_entry_reason": "" if open_trade is None else str(open_trade["entry_reason"]),
- "open_entry_price": float("nan") if open_trade is None else float(open_trade["entry_price"]),
- "open_holding_days": float("nan") if open_trade is None else int(open_trade["holding_days"]),
- "open_return_pct": float("nan") if open_trade is None else float(open_trade["current_return_pct"]),
- }
- return events, trades, state
- def _build_historical_trade_details(branch: str, trades: pd.DataFrame, initial_capital: float) -> pd.DataFrame:
- trades = trades.copy()
- trades = trades[trades["buy_date"] >= START_DATE].copy()
- if trades.empty:
- return pd.DataFrame(
- columns=[
- "branch",
- "trade_no",
- "buy_date",
- "buy_price",
- "buy_reason",
- "sell_date",
- "sell_price",
- "sell_reason",
- "holding_days",
- "return_pct",
- "capital_before",
- "pnl_amount",
- "capital_after",
- ]
- )
- capital_before: list[float] = []
- pnl_amount: list[float] = []
- capital_after: list[float] = []
- running_capital = float(initial_capital)
- for _, row in trades.iterrows():
- trade_ret = float(row["return_pct"])
- capital_before.append(running_capital)
- pnl = running_capital * trade_ret
- pnl_amount.append(pnl)
- running_capital = running_capital + pnl
- capital_after.append(running_capital)
- trades = trades.reset_index(drop=True)
- trades.insert(0, "trade_no", trades.index + 1)
- trades.insert(0, "branch", branch)
- trades["capital_before"] = capital_before
- trades["pnl_amount"] = pnl_amount
- trades["capital_after"] = capital_after
- return trades[
- [
- "branch",
- "trade_no",
- "buy_date",
- "buy_price",
- "buy_reason",
- "sell_date",
- "sell_price",
- "sell_reason",
- "holding_days",
- "return_pct",
- "capital_before",
- "pnl_amount",
- "capital_after",
- ]
- ].copy()
- def _add_execution_prices(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
- trades = trades.copy()
- indicators = indicators.sort_values("date").reset_index(drop=True)
- lookup = indicators.set_index(indicators["date"].dt.date)
- next_by_date = {
- indicators.iloc[idx]["date"].date().isoformat(): indicators.iloc[idx + 1]
- for idx in range(len(indicators) - 1)
- }
- same_entry: list[float] = []
- same_exit: list[float] = []
- next_open_entry: list[float] = []
- next_open_exit: list[float] = []
- for _, trade in trades.iterrows():
- buy_row = lookup.loc[pd.Timestamp(trade["buy_date"]).date()]
- sell_row = lookup.loc[pd.Timestamp(trade["sell_date"]).date()]
- buy_next = next_by_date.get(trade["buy_date"])
- sell_next = next_by_date.get(trade["sell_date"])
- same_entry.append(float(buy_row["close"]))
- same_exit.append(float(sell_row["close"]))
- next_open_entry.append(float("nan") if buy_next is None else float(buy_next["open"]))
- next_open_exit.append(float("nan") if sell_next is None else float(sell_next["open"]))
- trades["exec_same_close_entry"] = same_entry
- trades["exec_same_close_exit"] = same_exit
- trades["exec_next_open_entry"] = next_open_entry
- trades["exec_next_open_exit"] = next_open_exit
- return trades
- def _metric_actuals(indicators: pd.DataFrame, control_trades: pd.DataFrame, refined_trades: pd.DataFrame) -> dict[str, object]:
- control_eval = _apply_execution_model(_add_execution_prices(control_trades, indicators), "next_open", 0.0)
- refined_eval = _apply_execution_model(_add_execution_prices(refined_trades, indicators), "next_open", 0.0)
- control_stress = _apply_execution_model(_add_execution_prices(control_trades, indicators), "next_open", 20.0)
- refined_stress = _apply_execution_model(_add_execution_prices(refined_trades, indicators), "next_open", 20.0)
- control_sum = _summary("control", control_eval)
- refined_sum = _summary("refined", refined_eval)
- control_stress_sum = _summary("control", control_stress)
- refined_stress_sum = _summary("refined", refined_stress)
- refined_risk = _risk_cluster("refined", refined_eval)
- return {
- "next_open_avg_return_delta_vs_control": float(refined_sum["avg_return"] - control_sum["avg_return"]),
- "next_open_profit_factor_delta_vs_control": float(refined_sum["profit_factor"] - control_sum["profit_factor"]),
- "next_open_max_drawdown": float(refined_sum["max_drawdown"]),
- "next_open_max_loss_streak": int(refined_risk["max_loss_streak"]),
- "worst_5trade_sum_next_open": float(refined_risk["worst_5trade_sum"]),
- "short_loss_share": float(refined_risk["short_loss_share"]),
- "removed_trade_over_removal_count": _load_removed_trade_over_removal_count(Path(__file__).resolve().parent),
- "local_sensitivity_robust_case_count": _load_local_sensitivity_robust_case_count(Path(__file__).resolve().parent),
- "headline_avg_return_delta_vs_control": float(refined_trades["return_pct"].mean() - control_trades["return_pct"].mean()),
- "headline_profit_factor_delta_vs_control": float(
- (refined_trades[refined_trades["return_pct"] > 0]["return_pct"].sum() / -refined_trades[refined_trades["return_pct"] < 0]["return_pct"].sum())
- - (control_trades[control_trades["return_pct"] > 0]["return_pct"].sum() / -control_trades[control_trades["return_pct"] < 0]["return_pct"].sum())
- ),
- "next_open_20bps_cagr_refined": float(refined_stress_sum["cagr"]),
- "next_open_20bps_cagr_control": float(control_stress_sum["cagr"]),
- "next_open_20bps_pf_refined": float(refined_stress_sum["profit_factor"]),
- "next_open_20bps_pf_control": float(control_stress_sum["profit_factor"]),
- }
- def _compare_numeric(actual: float, warning_rule: str, hard_rule: str) -> str:
- if pd.isna(actual):
- return "missing_data"
- def parse(rule: str) -> tuple[str, float]:
- rule = str(rule).strip()
- if rule.startswith(("<=", ">=")):
- op = rule[:2]
- body = rule[2:]
- elif rule.startswith(("<", ">")):
- op = rule[:1]
- body = rule[1:]
- else:
- raise ValueError(rule)
- if body.endswith("%"):
- threshold = float(body[:-1]) / 100.0
- else:
- threshold = float(body)
- return op, threshold
- hard_op, hard_val = parse(hard_rule)
- warn_op, warn_val = parse(warning_rule)
- def hit(op: str, threshold: float) -> bool:
- if op == "<=":
- return actual <= threshold
- if op == ">=":
- return actual >= threshold
- if op == "<":
- return actual < threshold
- if op == ">":
- return actual > threshold
- raise ValueError(op)
- if hit(hard_op, hard_val):
- return "hard_breach"
- if hit(warn_op, warn_val):
- return "warning"
- return "ok"
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- output_dir = base_dir / "daily_reports"
- output_dir.mkdir(exist_ok=True)
- initial_capital = _infer_initial_capital(base_dir)
- as_of_request_date = date.today().isoformat()
- engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date=as_of_request_date))
- raw = engine.fetch_daily_data(include_intraday_snapshot=True)
- fetch_meta = dict(engine.last_fetch_meta)
- snapshot_appended = bool(fetch_meta.get("intraday_snapshot_appended", False))
- snapshot_timestamp = fetch_meta.get("intraday_snapshot_timestamp") or ""
- historical_latest_bar_date = str(fetch_meta.get("historical_latest_bar_date") or "")
- data_mode = "intraday_snapshot" if snapshot_appended else "official_daily_bar"
- indicators = engine.compute(raw.reset_index(drop=False).rename(columns={"index": "date"}))
- indicators["date"] = pd.to_datetime(indicators["date"])
- latest_bar_date = indicators["date"].max().date().isoformat()
- branch_runs = [
- _build_branch_state(branch_name, config_factory(), indicators)
- for branch_name, config_factory in BRANCH_CONFIGS
- ]
- branch_payload = {
- state["branch"]: {"events": events, "trades": trades, "state": state}
- for events, trades, state in branch_runs
- }
- refined_trades = branch_payload["alpha_first_glued_refined_hot_cap"]["trades"]
- control_trades = branch_payload["alpha_first_selective_veto"]["trades"]
- refined_trades = refined_trades[refined_trades["buy_date"] >= START_DATE].copy()
- control_trades = control_trades[control_trades["buy_date"] >= START_DATE].copy()
- refined_trades["entry_family"] = refined_trades["buy_reason"].map(_entry_family)
- control_trades["entry_family"] = control_trades["buy_reason"].map(_entry_family)
- recent_indicators = indicators.tail(15).copy()
- recent_indicators["date"] = recent_indicators["date"].dt.date.astype(str)
- recent_indicators.to_csv(base_dir / "dragon_daily_signal_snapshot.csv", index=False, encoding="utf-8-sig")
- recent_indicators.to_csv(output_dir / f"dragon_daily_signal_snapshot_{latest_bar_date}.csv", index=False, encoding="utf-8-sig")
- branch_status = pd.DataFrame([payload["state"] for payload in branch_payload.values()])
- branch_status["data_mode"] = data_mode
- branch_status["snapshot_appended"] = snapshot_appended
- branch_status["snapshot_timestamp"] = snapshot_timestamp
- branch_status["historical_latest_bar_date"] = historical_latest_bar_date or latest_bar_date
- branch_status.to_csv(base_dir / "dragon_daily_branch_status.csv", index=False, encoding="utf-8-sig")
- branch_status.to_csv(output_dir / f"dragon_daily_branch_status_{latest_bar_date}.csv", index=False, encoding="utf-8-sig")
- historical_detail = pd.concat(
- [
- _build_historical_trade_details(branch, payload["trades"], initial_capital)
- for branch, payload in branch_payload.items()
- ],
- ignore_index=True,
- sort=False,
- )
- historical_detail.to_csv(base_dir / "dragon_historical_trade_details.csv", index=False, encoding="utf-8-sig")
- historical_detail.to_csv(
- output_dir / f"dragon_historical_trade_details_{latest_bar_date}.csv",
- index=False,
- encoding="utf-8-sig",
- )
- actuals = _metric_actuals(indicators, control_trades, refined_trades)
- template = _load_monitor_template(base_dir)
- template["actual_value"] = template["metric"].map(actuals)
- template["status"] = template.apply(
- lambda row: _compare_numeric(row["actual_value"], str(row["warning_threshold"]), str(row["hard_threshold"])),
- axis=1,
- )
- template.to_csv(base_dir / "dragon_daily_monitor_snapshot.csv", index=False, encoding="utf-8-sig")
- template.to_csv(output_dir / f"dragon_daily_monitor_snapshot_{latest_bar_date}.csv", index=False, encoding="utf-8-sig")
- config_snapshot = {
- "release_version": "RC1",
- "branch": "alpha_first_glued_refined_hot_cap",
- "config": {**asdict(alpha_first_glued_refined_hot_cap_config()), "disabled_rules": sorted(alpha_first_glued_refined_hot_cap_config().disabled_rules)},
- "as_of_request_date": as_of_request_date,
- "latest_bar_date": latest_bar_date,
- "data_mode": data_mode,
- "snapshot_appended": snapshot_appended,
- "snapshot_timestamp": snapshot_timestamp,
- "historical_latest_bar_date": historical_latest_bar_date or latest_bar_date,
- }
- (base_dir / "dragon_daily_rc1_manifest.json").write_text(
- json.dumps(config_snapshot, indent=2, ensure_ascii=False) + "\n",
- encoding="utf-8",
- )
- (output_dir / f"dragon_daily_rc1_manifest_{latest_bar_date}.json").write_text(
- json.dumps(config_snapshot, indent=2, ensure_ascii=False) + "\n",
- encoding="utf-8",
- )
- warning_count = int((template["status"] == "warning").sum())
- missing_data_count = int((template["status"] == "missing_data").sum())
- hard_count = int(template["status"].isin(["hard_breach", "missing_data"]).sum())
- lines = [
- "# Dragon Daily Signal Report",
- "",
- f"- Request date: `{as_of_request_date}`",
- f"- Latest available market bar: `{latest_bar_date}`",
- f"- Data mode: `{data_mode}`",
- (
- f"- Historical latest official bar: `{historical_latest_bar_date}`"
- if snapshot_appended
- else f"- Historical latest official bar: `{latest_bar_date}`"
- ),
- (
- f"- Snapshot timestamp: `{snapshot_timestamp}`"
- if snapshot_appended
- else "- Snapshot timestamp: `none`"
- ),
- (
- "- Snapshot rule: `current market price is used as today's close for indicator and signal evaluation`"
- if snapshot_appended
- else "- Snapshot rule: `not used`"
- ),
- "- Instrument: `399673`",
- "- Forward default branch: `alpha_first_glued_refined_hot_cap`",
- "- Benchmark control branch: `alpha_first_selective_veto`",
- "",
- "## Latest Branch Status",
- ]
- for state in branch_status.to_dict("records"):
- lines.extend(
- [
- f"### {state['branch']}",
- f"- evaluated_at `{state['as_of_timestamp']}`",
- f"- latest_close `{state['latest_close']:.3f}` | a1 `{state['latest_a1']:.4f}` | b1 `{state['latest_b1']:.4f}` | c1 `{state['latest_c1']:.2f}`",
- f"- latest markers: `KDJ buy={state['latest_kdj_buy']}` `KDJ sell={state['latest_kdj_sell']}` `QL buy={state['latest_ql_buy']}` `QL sell={state['latest_ql_sell']}`",
- f"- latest real event: `{state['latest_real_event_date']}` `{state['latest_real_event_side']}` `{state['latest_real_event_reason']}`",
- f"- events on latest bar: `{state['events_today'] if state['events_today'] else 'none'}`",
- f"- in_position: `{state['in_position']}`",
- (
- f"- open trade: `{state['open_entry_date']}` `{state['open_entry_reason']}` | "
- f"holding `{int(state['open_holding_days'])}`d | open_return `{_format_pct(float(state['open_return_pct']))}`"
- if bool(state["in_position"])
- else "- open trade: `none`"
- ),
- "",
- ]
- )
- lines.extend(
- [
- "## Monitor Snapshot",
- f"- warnings: `{warning_count}`",
- f"- hard breaches: `{hard_count}`",
- f"- missing data metrics: `{missing_data_count}`",
- f"- next_open avg_return delta vs control: `{_format_pct(float(actuals['next_open_avg_return_delta_vs_control']))}`",
- f"- next_open PF delta vs control: `{_format_num(float(actuals['next_open_profit_factor_delta_vs_control']))}`",
- f"- next_open max_drawdown refined: `{_format_pct(float(actuals['next_open_max_drawdown']))}`",
- f"- next_open max loss streak refined: `{int(actuals['next_open_max_loss_streak'])}`",
- f"- next_open + 20bps CAGR refined/control: `{_format_pct(float(actuals['next_open_20bps_cagr_refined']))}` / `{_format_pct(float(actuals['next_open_20bps_cagr_control']))}`",
- "",
- "## Outputs",
- "- `dragon_daily_signal_snapshot.csv`",
- "- `dragon_daily_branch_status.csv`",
- "- `dragon_daily_monitor_snapshot.csv`",
- "- `dragon_historical_trade_details.csv`",
- "- `dragon_daily_rc1_manifest.json`",
- ]
- )
- (base_dir / "dragon_daily_signal_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- (output_dir / f"dragon_daily_signal_report_{latest_bar_date}.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|