from __future__ import annotations import json from dataclasses import asdict from pathlib import Path import pandas as pd from dragon_branch_configs import ( alpha_first_glued_refined_hot_cap_config, alpha_first_selective_veto_config, workbook_preserving_config, ) from dragon_shared import END_DATE, START_DATE, format_num as _format_num, format_pct as _format_pct, profit_factor from dragon_strategy import DragonRuleEngine from dragon_strategy_config import StrategyConfig def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame: df = pd.read_csv(base_dir / "dragon_indicator_snapshot.csv", encoding="utf-8-sig") df["date"] = pd.to_datetime(df["date"]) return df.sort_values("date").reset_index(drop=True) def _load_true_trade_events(base_dir: Path) -> pd.DataFrame: return pd.read_csv(base_dir / "true_trade_events.csv", encoding="utf-8-sig") def _holding_bucket(days: int) -> str: if days <= 5: return "00-05d" if days <= 10: return "06-10d" if days <= 20: return "11-20d" if days <= 40: return "21-40d" return "41d+" def _event_match(strategy_events: pd.DataFrame, workbook_events: pd.DataFrame, side: str) -> tuple[int, int, int]: wb = set(workbook_events[(workbook_events["side"] == side) & (workbook_events["layer"] == "real_trade")]["date"]) st = set(strategy_events[(strategy_events["side"] == side) & (strategy_events["layer"] == "real_trade")]["date"]) return len(wb & st), len(wb - st), len(st - wb) def _segment_stats(df: pd.DataFrame) -> dict[str, float | int]: if df.empty: return { "trades": 0, "win_rate": float("nan"), "avg_return": float("nan"), "profit_factor": float("nan"), "compounded_return": float("nan"), } returns = df["return_pct"].astype(float) return { "trades": int(len(df)), "win_rate": float((returns > 0).mean()), "avg_return": float(returns.mean()), "profit_factor": profit_factor(returns), "compounded_return": float((1.0 + returns).prod() - 1.0), } def _build_walk_forward(trades: pd.DataFrame, branch_name: str) -> pd.DataFrame: years = sorted(int(year) for year in trades["sell_year"].unique()) rows: list[dict[str, object]] = [] for idx, test_year in enumerate(years): if idx >= 1: train_years = years[:idx] train_df = trades[trades["sell_year"].isin(train_years)] test_df = trades[trades["sell_year"] == test_year] rows.append( { "branch": branch_name, "scheme": "anchored_expanding", "train_start_year": train_years[0], "train_end_year": train_years[-1], "test_year": test_year, **{f"train_{k}": v for k, v in _segment_stats(train_df).items()}, **{f"test_{k}": v for k, v in _segment_stats(test_df).items()}, } ) if idx >= 3: train_years = years[idx - 3 : idx] train_df = trades[trades["sell_year"].isin(train_years)] test_df = trades[trades["sell_year"] == test_year] rows.append( { "branch": branch_name, "scheme": "rolling_3y", "train_start_year": train_years[0], "train_end_year": train_years[-1], "test_year": test_year, **{f"train_{k}": v for k, v in _segment_stats(train_df).items()}, **{f"test_{k}": v for k, v in _segment_stats(test_df).items()}, } ) return pd.DataFrame(rows) def _build_trade_quality(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame: trades = trades.copy() trades["sell_dt"] = pd.to_datetime(trades["sell_date"]) trades["sell_year"] = trades["sell_dt"].dt.year.astype(int) trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket) indicator_by_date = indicators.set_index(indicators["date"].dt.date) buy_c1: list[float] = [] mfe_list: list[float] = [] mae_list: list[float] = [] for _, trade in trades.iterrows(): buy_date = pd.Timestamp(trade["buy_date"]).date() entry_price = float(trade["buy_price"]) buy_row = indicator_by_date.loc[buy_date] buy_c1.append(float(buy_row["c1"])) window = indicators[ (indicators["date"] >= pd.Timestamp(trade["buy_date"])) & (indicators["date"] <= pd.Timestamp(trade["sell_date"])) ] mfe_list.append(float(window["high"].max()) / entry_price - 1.0) mae_list.append(float(window["low"].min()) / entry_price - 1.0) trades["buy_c1"] = buy_c1 trades["mfe_pct"] = mfe_list trades["mae_pct"] = mae_list trades["regime_bucket"] = trades["buy_c1"].map(lambda x: "hot" if x >= 80 else "high_mid" if x >= 60 else "mid" if x >= 35 else "low") return trades def _run_branch( name: str, config: StrategyConfig, indicators: pd.DataFrame, workbook_events: pd.DataFrame, first_date: str, last_date: str, ) -> tuple[dict[str, object], pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: indexed = indicators.set_index("date", drop=False) engine = DragonRuleEngine(config=config) events, trades = engine.run(indexed) start = max(first_date, START_DATE) end = min(last_date, END_DATE) events = events[(events["date"] >= start) & (events["date"] <= end)].copy() trades = trades[ (trades["buy_date"] >= start) & (trades["buy_date"] <= end) & (trades["sell_date"] >= start) & (trades["sell_date"] <= end) ].copy() trades = _build_trade_quality(trades, indicators) buy_overlap, buy_missing, buy_extra = _event_match(events, workbook_events, "BUY") sell_overlap, sell_missing, sell_extra = _event_match(events, workbook_events, "SELL") returns = trades["return_pct"].astype(float) if not trades.empty else pd.Series(dtype=float) summary = { "branch": name, "trades": int(len(trades)), "win_rate": float((returns > 0).mean()) if not trades.empty else float("nan"), "avg_return": float(returns.mean()) if not trades.empty else float("nan"), "median_return": float(returns.median()) if not trades.empty else float("nan"), "profit_factor": profit_factor(returns) if not trades.empty else float("nan"), "avg_mfe": float(trades["mfe_pct"].mean()) if not trades.empty else float("nan"), "avg_mae": float(trades["mae_pct"].mean()) if not trades.empty else float("nan"), "real_buy_overlap": int(buy_overlap), "real_buy_missing": int(buy_missing), "real_buy_extra": int(buy_extra), "real_sell_overlap": int(sell_overlap), "real_sell_missing": int(sell_missing), "real_sell_extra": int(sell_extra), "short_00_05d_avg_return": float(trades[trades["holding_bucket"] == "00-05d"]["return_pct"].mean()), "short_06_10d_avg_return": float(trades[trades["holding_bucket"] == "06-10d"]["return_pct"].mean()), } def agg(df: pd.DataFrame, by: str, out: str) -> pd.DataFrame: view = ( df.groupby(by, dropna=False) .agg( trades=("buy_date", "count"), win_rate=("return_pct", lambda s: float((s > 0).mean())), avg_return=("return_pct", "mean"), profit_factor=("return_pct", profit_factor), ) .reset_index() .rename(columns={by: out}) ) view["branch"] = name return view holding = agg(trades, "holding_bucket", "holding_bucket") yearly = agg(trades, "sell_year", "sell_year") family = agg(trades, "buy_reason", "entry_family") regime = agg(trades, "regime_bucket", "regime_bucket") walk = _build_walk_forward(trades, name) return summary, trades, holding, yearly, family, regime, walk def _config_snapshot(config: StrategyConfig) -> dict[str, object]: snapshot = asdict(config) snapshot["disabled_rules"] = sorted(config.disabled_rules) return snapshot def _trade_set(df: pd.DataFrame) -> set[tuple[str, str, str, str]]: return set(zip(df["buy_date"], df["sell_date"], df["buy_reason"], df["sell_reason"])) def _trade_diff(source: pd.DataFrame, target: pd.DataFrame, removed_label: str, added_label: str) -> pd.DataFrame: source_set = _trade_set(source) target_set = _trade_set(target) rows: list[dict[str, object]] = [] for row in sorted(source_set - target_set): rows.append({"change_type": removed_label, "buy_date": row[0], "sell_date": row[1], "buy_reason": row[2], "sell_reason": row[3]}) for row in sorted(target_set - source_set): rows.append({"change_type": added_label, "buy_date": row[0], "sell_date": row[1], "buy_reason": row[2], "sell_reason": row[3]}) return pd.DataFrame(rows) def _wf_stats(df: pd.DataFrame, scheme: str) -> tuple[int, int, float]: view = df[df["scheme"] == scheme] positive = int((view["test_avg_return"] > 0).sum()) if not view.empty else 0 total = int(len(view)) avg_oos = float(view["test_avg_return"].mean()) if not view.empty else float("nan") return positive, total, avg_oos def main() -> None: base_dir = Path(__file__).resolve().parent indicators = _load_indicator_snapshot(base_dir) workbook_events = _load_true_trade_events(base_dir) first_date = workbook_events["date"].min() last_date = workbook_events["date"].max() branches = [ ("workbook_preserving", workbook_preserving_config()), ("alpha_first_selective_veto", alpha_first_selective_veto_config()), ("alpha_first_glued_refined_hot_cap", alpha_first_glued_refined_hot_cap_config()), ] summaries: list[dict[str, object]] = [] trades_by_branch: dict[str, pd.DataFrame] = {} holding_frames: list[pd.DataFrame] = [] yearly_frames: list[pd.DataFrame] = [] family_frames: list[pd.DataFrame] = [] regime_frames: list[pd.DataFrame] = [] walk_frames: list[pd.DataFrame] = [] for name, config in branches: summary, trades, holding, yearly, family, regime, walk = _run_branch( name, config, indicators, workbook_events, first_date, last_date, ) summaries.append(summary) trades_by_branch[name] = trades holding_frames.append(holding) yearly_frames.append(yearly) family_frames.append(family) regime_frames.append(regime) walk_frames.append(walk) summary_df = pd.DataFrame(summaries) summary_df.to_csv(base_dir / "dragon_glued_refined_branch_summary.csv", index=False, encoding="utf-8-sig") branch_lookup = {row["branch"]: row for row in summaries} workbook_row = branch_lookup["workbook_preserving"] alpha_row = branch_lookup["alpha_first_selective_veto"] refined_row = branch_lookup["alpha_first_glued_refined_hot_cap"] comparison_rows: list[dict[str, object]] = [] for metric in [ "trades", "win_rate", "avg_return", "median_return", "profit_factor", "avg_mfe", "avg_mae", "real_buy_overlap", "real_sell_overlap", "short_00_05d_avg_return", "short_06_10d_avg_return", ]: comparison_rows.append( { "metric": metric, "workbook_preserving": workbook_row[metric], "alpha_first_selective_veto": alpha_row[metric], "alpha_first_glued_refined_hot_cap": refined_row[metric], "delta_refined_minus_alpha": refined_row[metric] - alpha_row[metric], "delta_refined_minus_workbook": refined_row[metric] - workbook_row[metric], } ) pd.DataFrame(comparison_rows).to_csv(base_dir / "dragon_glued_refined_branch_comparison.csv", index=False, encoding="utf-8-sig") pd.concat(holding_frames, ignore_index=True).to_csv(base_dir / "dragon_glued_refined_holding_breakdown.csv", index=False, encoding="utf-8-sig") pd.concat(yearly_frames, ignore_index=True).to_csv(base_dir / "dragon_glued_refined_yearly_breakdown.csv", index=False, encoding="utf-8-sig") pd.concat(family_frames, ignore_index=True).to_csv(base_dir / "dragon_glued_refined_family_breakdown.csv", index=False, encoding="utf-8-sig") pd.concat(regime_frames, ignore_index=True).to_csv(base_dir / "dragon_glued_refined_regime_breakdown.csv", index=False, encoding="utf-8-sig") combined_walk = pd.concat(walk_frames, ignore_index=True) combined_walk.to_csv(base_dir / "dragon_glued_refined_branch_walk_forward.csv", index=False, encoding="utf-8-sig") diff_vs_alpha = _trade_diff( trades_by_branch["alpha_first_selective_veto"], trades_by_branch["alpha_first_glued_refined_hot_cap"], "removed_from_refined_vs_alpha", "added_in_refined_vs_alpha", ) diff_vs_alpha.to_csv(base_dir / "dragon_glued_refined_branch_trade_diff.csv", index=False, encoding="utf-8-sig") (base_dir / "dragon_glued_refined_branch_config_snapshot.json").write_text( json.dumps(_config_snapshot(alpha_first_glued_refined_hot_cap_config()), indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) af_anchor_pos, af_anchor_total, af_anchor_avg = _wf_stats(combined_walk[combined_walk["branch"] == "alpha_first_selective_veto"], "anchored_expanding") ref_anchor_pos, ref_anchor_total, ref_anchor_avg = _wf_stats(combined_walk[combined_walk["branch"] == "alpha_first_glued_refined_hot_cap"], "anchored_expanding") af_roll_pos, af_roll_total, af_roll_avg = _wf_stats(combined_walk[combined_walk["branch"] == "alpha_first_selective_veto"], "rolling_3y") ref_roll_pos, ref_roll_total, ref_roll_avg = _wf_stats(combined_walk[combined_walk["branch"] == "alpha_first_glued_refined_hot_cap"], "rolling_3y") removed_count = int((diff_vs_alpha["change_type"] == "removed_from_refined_vs_alpha").sum()) added_count = int((diff_vs_alpha["change_type"] == "added_in_refined_vs_alpha").sum()) yearly_all = pd.concat(yearly_frames, ignore_index=True) alpha_yearly = yearly_all[yearly_all["branch"] == "alpha_first_selective_veto"].copy() refined_yearly = yearly_all[yearly_all["branch"] == "alpha_first_glued_refined_hot_cap"].copy() yearly_merge = alpha_yearly.merge(refined_yearly, on="sell_year", suffixes=("_alpha", "_refined")) yearly_better = int((yearly_merge["avg_return_refined"] > yearly_merge["avg_return_alpha"]).sum()) upgrade_ready = ( refined_row["avg_return"] - alpha_row["avg_return"] >= 0.003 and refined_row["profit_factor"] - alpha_row["profit_factor"] >= 0.50 and ref_anchor_pos >= af_anchor_pos and ref_roll_pos >= af_roll_pos ) lines = [ "# Dragon Glued Refined Branch Review", "", "## Branches", "- `workbook_preserving`: official reconstruction baseline.", "- `alpha_first_selective_veto`: current formal alpha-first branch.", "- `alpha_first_glued_refined_hot_cap`: refined glued research candidate with `40 <= c1 < 75`, `b1 >= 0.10`, plus intact low weak-range veto.", "", "## Headline Comparison", f"- workbook_preserving: trades `{int(workbook_row['trades'])}`, avg_return `{_format_pct(float(workbook_row['avg_return']))}`, profit_factor `{_format_num(float(workbook_row['profit_factor']))}`, real BUY / SELL `{int(workbook_row['real_buy_overlap'])}/{int(workbook_row['real_sell_overlap'])}`", f"- alpha_first_selective_veto: trades `{int(alpha_row['trades'])}`, avg_return `{_format_pct(float(alpha_row['avg_return']))}`, profit_factor `{_format_num(float(alpha_row['profit_factor']))}`, real BUY / SELL `{int(alpha_row['real_buy_overlap'])}/{int(alpha_row['real_sell_overlap'])}`", f"- alpha_first_glued_refined_hot_cap: trades `{int(refined_row['trades'])}`, avg_return `{_format_pct(float(refined_row['avg_return']))}`, profit_factor `{_format_num(float(refined_row['profit_factor']))}`, real BUY / SELL `{int(refined_row['real_buy_overlap'])}/{int(refined_row['real_sell_overlap'])}`", "", "## Trade Quality", f"- avg MFE / MAE: alpha `{_format_pct(float(alpha_row['avg_mfe']))}` / `{_format_pct(float(alpha_row['avg_mae']))}` vs refined `{_format_pct(float(refined_row['avg_mfe']))}` / `{_format_pct(float(refined_row['avg_mae']))}`", f"- short bucket `00-05d`: alpha `{_format_pct(float(alpha_row['short_00_05d_avg_return']))}` vs refined `{_format_pct(float(refined_row['short_00_05d_avg_return']))}`", f"- short bucket `06-10d`: alpha `{_format_pct(float(alpha_row['short_06_10d_avg_return']))}` vs refined `{_format_pct(float(refined_row['short_06_10d_avg_return']))}`", "", "## Walk-Forward Comparison", f"- Anchored expanding: alpha `{af_anchor_pos}/{af_anchor_total}`, avg `{_format_pct(af_anchor_avg)}` vs refined `{ref_anchor_pos}/{ref_anchor_total}`, avg `{_format_pct(ref_anchor_avg)}`", f"- Rolling 3Y: alpha `{af_roll_pos}/{af_roll_total}`, avg `{_format_pct(af_roll_avg)}` vs refined `{ref_roll_pos}/{ref_roll_total}`, avg `{_format_pct(ref_roll_avg)}`", "", "## Trade-Diff Summary", f"- refined vs alpha-first: removed `{removed_count}`, added `{added_count}`", "- The refined branch is still a removal-driven candidate; improvement comes from deleting weak trades, not from adding a new complex trade tree.", "", "## Stability Read", f"- refined beats alpha on avg_return in `{yearly_better}` yearly buckets out of `{int(len(yearly_merge))}` overlapping sell years", f"- avg_return delta vs alpha: `{_format_pct(float(refined_row['avg_return'] - alpha_row['avg_return']))}`", f"- profit_factor delta vs alpha: `{_format_num(float(refined_row['profit_factor'] - alpha_row['profit_factor']))}`", f"- overlap delta vs alpha: BUY `{int(refined_row['real_buy_overlap'] - alpha_row['real_buy_overlap'])}` / SELL `{int(refined_row['real_sell_overlap'] - alpha_row['real_sell_overlap'])}`", "", "## Governance Judgment", f"- Upgrade gate status: `{'PASS' if upgrade_ready else 'PARTIAL_PASS'}` on headline quality and walk-forward thresholds", "- The refined branch is stronger than the current alpha-first baseline and stronger than the older full glued candidate.", "- The remaining blocker is governance: overlap loss is still large enough that promotion should be explicit rather than silent.", "- Recommended status: keep `alpha_first_selective_veto` as formal baseline; mark `alpha_first_glued_refined_hot_cap` as the leading next alpha-first candidate.", ] (base_dir / "dragon_glued_refined_branch_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()