from __future__ import annotations from dataclasses import asdict from datetime import date import json from pathlib import Path import pandas as pd from dragon_branch_configs import ( alpha_first_glued_followthrough_probe_config, alpha_first_glued_followthrough_mid_exit_probe_config, alpha_first_glued_refined_hot_cap_config, alpha_first_selective_veto_config, workbook_preserving_config, ) from dragon_execution_common import apply_execution_model as _apply_execution_model, risk_cluster as _risk_cluster, summary as _summary from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine from dragon_shared import START_DATE, format_num as _format_num, format_pct as _format_pct from dragon_strategy import DragonRuleEngine BRANCH_CONFIGS = [ ("workbook_preserving", workbook_preserving_config), ("alpha_first_selective_veto", alpha_first_selective_veto_config), ("alpha_first_glued_refined_hot_cap", alpha_first_glued_refined_hot_cap_config), ("alpha_first_glued_followthrough_probe", alpha_first_glued_followthrough_probe_config), ("alpha_first_glued_followthrough_mid_exit_probe", alpha_first_glued_followthrough_mid_exit_probe_config), ] def _entry_family(reason: str) -> str: return str(reason).split(":", 1)[0] def _load_monitor_template(base_dir: Path) -> pd.DataFrame: return pd.read_csv(base_dir / "dragon_strategy_monitoring_template.csv", encoding="utf-8-sig") def _load_removed_trade_over_removal_count(base_dir: Path) -> float: path = base_dir / "dragon_glued_refined_removed_trade_attribution.csv" if not path.exists(): return float("nan") df = pd.read_csv(path, encoding="utf-8-sig") if "recommendation" not in df.columns: return float("nan") return float((df["recommendation"].astype(str) == "OVER_REMOVAL").sum()) def _load_local_sensitivity_robust_case_count(base_dir: Path) -> float: path = base_dir / "dragon_glued_refined_sensitivity.csv" if not path.exists(): return float("nan") df = pd.read_csv(path, encoding="utf-8-sig") if df.empty or "label" not in df.columns: return float("nan") candidate = df[df["label"] == "refined_candidate_baseline"].copy() if candidate.empty: return float("nan") candidate_row = candidate.iloc[0] neighborhood = df[~df["label"].isin(["current_alpha_control", "refined_candidate_baseline"])].copy() if neighborhood.empty: return float("nan") robust = neighborhood[ (neighborhood["avg_return"] >= float(candidate_row["avg_return"]) - 0.0015) & (neighborhood["profit_factor"] >= float(candidate_row["profit_factor"]) - 0.20) & (neighborhood["real_buy_overlap"] >= int(candidate_row["real_buy_overlap"]) - 1) & (neighborhood["real_sell_overlap"] >= int(candidate_row["real_sell_overlap"]) - 1) ] return float(len(robust)) def _infer_initial_capital(base_dir: Path) -> float: workbook_trades = pd.read_csv(base_dir / "true_trades.csv", encoding="utf-8-sig") if workbook_trades.empty: return 55450.0 first = workbook_trades.iloc[0] return float(first["ending_capital"]) / (1.0 + float(first["return_pct"])) def _build_branch_state(branch: str, config, indicators: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, object]]: engine = DragonRuleEngine(config=config) events, trades = engine.run(indicators.set_index("date", drop=False)) latest_date = indicators["date"].max().date().isoformat() latest_row = indicators.iloc[-1] latest_timestamp = pd.Timestamp(latest_row["date"]).isoformat(timespec="seconds") real_events = events[events["layer"] == "real_trade"].copy() latest_events = events[events["date"] == latest_date].copy() last_real = real_events.iloc[-1] if not real_events.empty else None in_position = bool(engine.context.in_position) open_trade = None if in_position and engine.context.entry_date is not None: open_trade = { "entry_date": engine.context.entry_date.isoformat(), "entry_price": float(engine.context.entry_price) if engine.context.entry_price is not None else None, "entry_reason": engine.context.entry_reason, "current_return_pct": float(latest_row["close"]) / float(engine.context.entry_price) - 1.0 if engine.context.entry_price else None, "holding_days": (latest_row["date"].date() - engine.context.entry_date).days, } state = { "branch": branch, "as_of_date": latest_date, "as_of_timestamp": latest_timestamp, "latest_close": float(latest_row["close"]), "latest_a1": float(latest_row["a1"]), "latest_b1": float(latest_row["b1"]), "latest_c1": float(latest_row["c1"]), "latest_kdj_buy": bool(latest_row["kdj_buy"]), "latest_kdj_sell": bool(latest_row["kdj_sell"]), "latest_ql_buy": bool(latest_row["ql_buy"]), "latest_ql_sell": bool(latest_row["ql_sell"]), "latest_real_event_date": "" if last_real is None else str(last_real["date"]), "latest_real_event_side": "" if last_real is None else str(last_real["side"]), "latest_real_event_reason": "" if last_real is None else str(last_real["reason"]), "events_today_count": int(len(latest_events)), "events_today": " | ".join( f"{row['side']}:{row['layer']}:{row['reason']}" for _, row in latest_events.iterrows() ), "in_position": in_position, "open_entry_date": "" if open_trade is None else str(open_trade["entry_date"]), "open_entry_reason": "" if open_trade is None else str(open_trade["entry_reason"]), "open_entry_price": float("nan") if open_trade is None else float(open_trade["entry_price"]), "open_holding_days": float("nan") if open_trade is None else int(open_trade["holding_days"]), "open_return_pct": float("nan") if open_trade is None else float(open_trade["current_return_pct"]), } return events, trades, state def _build_historical_trade_details(branch: str, trades: pd.DataFrame, initial_capital: float) -> pd.DataFrame: trades = trades.copy() trades = trades[trades["buy_date"] >= START_DATE].copy() if trades.empty: return pd.DataFrame( columns=[ "branch", "trade_no", "buy_date", "buy_price", "buy_reason", "sell_date", "sell_price", "sell_reason", "holding_days", "return_pct", "capital_before", "pnl_amount", "capital_after", ] ) capital_before: list[float] = [] pnl_amount: list[float] = [] capital_after: list[float] = [] running_capital = float(initial_capital) for _, row in trades.iterrows(): trade_ret = float(row["return_pct"]) capital_before.append(running_capital) pnl = running_capital * trade_ret pnl_amount.append(pnl) running_capital = running_capital + pnl capital_after.append(running_capital) trades = trades.reset_index(drop=True) trades.insert(0, "trade_no", trades.index + 1) trades.insert(0, "branch", branch) trades["capital_before"] = capital_before trades["pnl_amount"] = pnl_amount trades["capital_after"] = capital_after return trades[ [ "branch", "trade_no", "buy_date", "buy_price", "buy_reason", "sell_date", "sell_price", "sell_reason", "holding_days", "return_pct", "capital_before", "pnl_amount", "capital_after", ] ].copy() def _add_execution_prices(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame: trades = trades.copy() indicators = indicators.sort_values("date").reset_index(drop=True) lookup = indicators.set_index(indicators["date"].dt.date) next_by_date = { indicators.iloc[idx]["date"].date().isoformat(): indicators.iloc[idx + 1] for idx in range(len(indicators) - 1) } same_entry: list[float] = [] same_exit: list[float] = [] next_open_entry: list[float] = [] next_open_exit: list[float] = [] for _, trade in trades.iterrows(): buy_row = lookup.loc[pd.Timestamp(trade["buy_date"]).date()] sell_row = lookup.loc[pd.Timestamp(trade["sell_date"]).date()] buy_next = next_by_date.get(trade["buy_date"]) sell_next = next_by_date.get(trade["sell_date"]) same_entry.append(float(buy_row["close"])) same_exit.append(float(sell_row["close"])) next_open_entry.append(float("nan") if buy_next is None else float(buy_next["open"])) next_open_exit.append(float("nan") if sell_next is None else float(sell_next["open"])) trades["exec_same_close_entry"] = same_entry trades["exec_same_close_exit"] = same_exit trades["exec_next_open_entry"] = next_open_entry trades["exec_next_open_exit"] = next_open_exit return trades def _metric_actuals(indicators: pd.DataFrame, control_trades: pd.DataFrame, refined_trades: pd.DataFrame) -> dict[str, object]: control_eval = _apply_execution_model(_add_execution_prices(control_trades, indicators), "next_open", 0.0) refined_eval = _apply_execution_model(_add_execution_prices(refined_trades, indicators), "next_open", 0.0) control_stress = _apply_execution_model(_add_execution_prices(control_trades, indicators), "next_open", 20.0) refined_stress = _apply_execution_model(_add_execution_prices(refined_trades, indicators), "next_open", 20.0) control_sum = _summary("control", control_eval) refined_sum = _summary("refined", refined_eval) control_stress_sum = _summary("control", control_stress) refined_stress_sum = _summary("refined", refined_stress) refined_risk = _risk_cluster("refined", refined_eval) return { "next_open_avg_return_delta_vs_control": float(refined_sum["avg_return"] - control_sum["avg_return"]), "next_open_profit_factor_delta_vs_control": float(refined_sum["profit_factor"] - control_sum["profit_factor"]), "next_open_max_drawdown": float(refined_sum["max_drawdown"]), "next_open_max_loss_streak": int(refined_risk["max_loss_streak"]), "worst_5trade_sum_next_open": float(refined_risk["worst_5trade_sum"]), "short_loss_share": float(refined_risk["short_loss_share"]), "removed_trade_over_removal_count": _load_removed_trade_over_removal_count(Path(__file__).resolve().parent), "local_sensitivity_robust_case_count": _load_local_sensitivity_robust_case_count(Path(__file__).resolve().parent), "headline_avg_return_delta_vs_control": float(refined_trades["return_pct"].mean() - control_trades["return_pct"].mean()), "headline_profit_factor_delta_vs_control": float( (refined_trades[refined_trades["return_pct"] > 0]["return_pct"].sum() / -refined_trades[refined_trades["return_pct"] < 0]["return_pct"].sum()) - (control_trades[control_trades["return_pct"] > 0]["return_pct"].sum() / -control_trades[control_trades["return_pct"] < 0]["return_pct"].sum()) ), "next_open_20bps_cagr_refined": float(refined_stress_sum["cagr"]), "next_open_20bps_cagr_control": float(control_stress_sum["cagr"]), "next_open_20bps_pf_refined": float(refined_stress_sum["profit_factor"]), "next_open_20bps_pf_control": float(control_stress_sum["profit_factor"]), } def _compare_numeric(actual: float, warning_rule: str, hard_rule: str) -> str: if pd.isna(actual): return "missing_data" def parse(rule: str) -> tuple[str, float]: rule = str(rule).strip() if rule.startswith(("<=", ">=")): op = rule[:2] body = rule[2:] elif rule.startswith(("<", ">")): op = rule[:1] body = rule[1:] else: raise ValueError(rule) if body.endswith("%"): threshold = float(body[:-1]) / 100.0 else: threshold = float(body) return op, threshold hard_op, hard_val = parse(hard_rule) warn_op, warn_val = parse(warning_rule) def hit(op: str, threshold: float) -> bool: if op == "<=": return actual <= threshold if op == ">=": return actual >= threshold if op == "<": return actual < threshold if op == ">": return actual > threshold raise ValueError(op) if hit(hard_op, hard_val): return "hard_breach" if hit(warn_op, warn_val): return "warning" return "ok" def main() -> None: base_dir = Path(__file__).resolve().parent output_dir = base_dir / "daily_reports" output_dir.mkdir(exist_ok=True) initial_capital = _infer_initial_capital(base_dir) as_of_request_date = date.today().isoformat() engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date=as_of_request_date)) raw = engine.fetch_daily_data(include_intraday_snapshot=True) fetch_meta = dict(engine.last_fetch_meta) snapshot_appended = bool(fetch_meta.get("intraday_snapshot_appended", False)) snapshot_timestamp = fetch_meta.get("intraday_snapshot_timestamp") or "" historical_latest_bar_date = str(fetch_meta.get("historical_latest_bar_date") or "") data_mode = "intraday_snapshot" if snapshot_appended else "official_daily_bar" indicators = engine.compute(raw.reset_index(drop=False).rename(columns={"index": "date"})) indicators["date"] = pd.to_datetime(indicators["date"]) latest_bar_date = indicators["date"].max().date().isoformat() branch_runs = [ _build_branch_state(branch_name, config_factory(), indicators) for branch_name, config_factory in BRANCH_CONFIGS ] branch_payload = { state["branch"]: {"events": events, "trades": trades, "state": state} for events, trades, state in branch_runs } refined_trades = branch_payload["alpha_first_glued_refined_hot_cap"]["trades"] control_trades = branch_payload["alpha_first_selective_veto"]["trades"] refined_trades = refined_trades[refined_trades["buy_date"] >= START_DATE].copy() control_trades = control_trades[control_trades["buy_date"] >= START_DATE].copy() refined_trades["entry_family"] = refined_trades["buy_reason"].map(_entry_family) control_trades["entry_family"] = control_trades["buy_reason"].map(_entry_family) recent_indicators = indicators.tail(15).copy() recent_indicators["date"] = recent_indicators["date"].dt.date.astype(str) recent_indicators.to_csv(base_dir / "dragon_daily_signal_snapshot.csv", index=False, encoding="utf-8-sig") recent_indicators.to_csv(output_dir / f"dragon_daily_signal_snapshot_{latest_bar_date}.csv", index=False, encoding="utf-8-sig") branch_status = pd.DataFrame([payload["state"] for payload in branch_payload.values()]) branch_status["data_mode"] = data_mode branch_status["snapshot_appended"] = snapshot_appended branch_status["snapshot_timestamp"] = snapshot_timestamp branch_status["historical_latest_bar_date"] = historical_latest_bar_date or latest_bar_date branch_status.to_csv(base_dir / "dragon_daily_branch_status.csv", index=False, encoding="utf-8-sig") branch_status.to_csv(output_dir / f"dragon_daily_branch_status_{latest_bar_date}.csv", index=False, encoding="utf-8-sig") historical_detail = pd.concat( [ _build_historical_trade_details(branch, payload["trades"], initial_capital) for branch, payload in branch_payload.items() ], ignore_index=True, sort=False, ) historical_detail.to_csv(base_dir / "dragon_historical_trade_details.csv", index=False, encoding="utf-8-sig") historical_detail.to_csv( output_dir / f"dragon_historical_trade_details_{latest_bar_date}.csv", index=False, encoding="utf-8-sig", ) actuals = _metric_actuals(indicators, control_trades, refined_trades) template = _load_monitor_template(base_dir) template["actual_value"] = template["metric"].map(actuals) template["status"] = template.apply( lambda row: _compare_numeric(row["actual_value"], str(row["warning_threshold"]), str(row["hard_threshold"])), axis=1, ) template.to_csv(base_dir / "dragon_daily_monitor_snapshot.csv", index=False, encoding="utf-8-sig") template.to_csv(output_dir / f"dragon_daily_monitor_snapshot_{latest_bar_date}.csv", index=False, encoding="utf-8-sig") config_snapshot = { "release_version": "RC1", "branch": "alpha_first_glued_refined_hot_cap", "config": {**asdict(alpha_first_glued_refined_hot_cap_config()), "disabled_rules": sorted(alpha_first_glued_refined_hot_cap_config().disabled_rules)}, "as_of_request_date": as_of_request_date, "latest_bar_date": latest_bar_date, "data_mode": data_mode, "snapshot_appended": snapshot_appended, "snapshot_timestamp": snapshot_timestamp, "historical_latest_bar_date": historical_latest_bar_date or latest_bar_date, } (base_dir / "dragon_daily_rc1_manifest.json").write_text( json.dumps(config_snapshot, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) (output_dir / f"dragon_daily_rc1_manifest_{latest_bar_date}.json").write_text( json.dumps(config_snapshot, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) warning_count = int((template["status"] == "warning").sum()) missing_data_count = int((template["status"] == "missing_data").sum()) hard_count = int(template["status"].isin(["hard_breach", "missing_data"]).sum()) lines = [ "# Dragon Daily Signal Report", "", f"- Request date: `{as_of_request_date}`", f"- Latest available market bar: `{latest_bar_date}`", f"- Data mode: `{data_mode}`", ( f"- Historical latest official bar: `{historical_latest_bar_date}`" if snapshot_appended else f"- Historical latest official bar: `{latest_bar_date}`" ), ( f"- Snapshot timestamp: `{snapshot_timestamp}`" if snapshot_appended else "- Snapshot timestamp: `none`" ), ( "- Snapshot rule: `current market price is used as today's close for indicator and signal evaluation`" if snapshot_appended else "- Snapshot rule: `not used`" ), "- Instrument: `399673`", "- Forward default branch: `alpha_first_glued_refined_hot_cap`", "- Benchmark control branch: `alpha_first_selective_veto`", "", "## Latest Branch Status", ] for state in branch_status.to_dict("records"): lines.extend( [ f"### {state['branch']}", f"- evaluated_at `{state['as_of_timestamp']}`", f"- latest_close `{state['latest_close']:.3f}` | a1 `{state['latest_a1']:.4f}` | b1 `{state['latest_b1']:.4f}` | c1 `{state['latest_c1']:.2f}`", f"- latest markers: `KDJ buy={state['latest_kdj_buy']}` `KDJ sell={state['latest_kdj_sell']}` `QL buy={state['latest_ql_buy']}` `QL sell={state['latest_ql_sell']}`", f"- latest real event: `{state['latest_real_event_date']}` `{state['latest_real_event_side']}` `{state['latest_real_event_reason']}`", f"- events on latest bar: `{state['events_today'] if state['events_today'] else 'none'}`", f"- in_position: `{state['in_position']}`", ( f"- open trade: `{state['open_entry_date']}` `{state['open_entry_reason']}` | " f"holding `{int(state['open_holding_days'])}`d | open_return `{_format_pct(float(state['open_return_pct']))}`" if bool(state["in_position"]) else "- open trade: `none`" ), "", ] ) lines.extend( [ "## Monitor Snapshot", f"- warnings: `{warning_count}`", f"- hard breaches: `{hard_count}`", f"- missing data metrics: `{missing_data_count}`", f"- next_open avg_return delta vs control: `{_format_pct(float(actuals['next_open_avg_return_delta_vs_control']))}`", f"- next_open PF delta vs control: `{_format_num(float(actuals['next_open_profit_factor_delta_vs_control']))}`", f"- next_open max_drawdown refined: `{_format_pct(float(actuals['next_open_max_drawdown']))}`", f"- next_open max loss streak refined: `{int(actuals['next_open_max_loss_streak'])}`", f"- next_open + 20bps CAGR refined/control: `{_format_pct(float(actuals['next_open_20bps_cagr_refined']))}` / `{_format_pct(float(actuals['next_open_20bps_cagr_control']))}`", "", "## Outputs", "- `dragon_daily_signal_snapshot.csv`", "- `dragon_daily_branch_status.csv`", "- `dragon_daily_monitor_snapshot.csv`", "- `dragon_historical_trade_details.csv`", "- `dragon_daily_rc1_manifest.json`", ] ) (base_dir / "dragon_daily_signal_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8") (output_dir / f"dragon_daily_signal_report_{latest_bar_date}.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()