from __future__ import annotations import json from dataclasses import asdict from pathlib import Path import pandas as pd from dragon_shared import END_DATE, START_DATE, format_num as _format_num, format_pct as _format_pct, profit_factor from dragon_strategy import DragonRuleEngine from dragon_strategy_config import StrategyConfig def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame: df = pd.read_csv(base_dir / "dragon_indicator_snapshot.csv", encoding="utf-8-sig") df["date"] = pd.to_datetime(df["date"]) return df.set_index("date", drop=False) def _load_true_trade_events(base_dir: Path) -> pd.DataFrame: return pd.read_csv(base_dir / "true_trade_events.csv", encoding="utf-8-sig") def _holding_bucket(days: int) -> str: if days <= 5: return "00-05d" if days <= 10: return "06-10d" if days <= 20: return "11-20d" if days <= 40: return "21-40d" return "41d+" def _event_match(strategy_events: pd.DataFrame, workbook_events: pd.DataFrame, side: str) -> tuple[int, int, int]: wb = set(workbook_events[(workbook_events["side"] == side) & (workbook_events["layer"] == "real_trade")]["date"]) st = set(strategy_events[(strategy_events["side"] == side) & (strategy_events["layer"] == "real_trade")]["date"]) return len(wb & st), len(wb - st), len(st - wb) def _segment_stats(df: pd.DataFrame) -> dict[str, float | int]: if df.empty: return { "trades": 0, "win_rate": float("nan"), "avg_return": float("nan"), "profit_factor": float("nan"), "compounded_return": float("nan"), } returns = df["return_pct"].astype(float) return { "trades": int(len(df)), "win_rate": float((returns > 0).mean()), "avg_return": float(returns.mean()), "profit_factor": profit_factor(returns), "compounded_return": float((1.0 + returns).prod() - 1.0), } def _build_walk_forward(trades: pd.DataFrame, branch_name: str) -> pd.DataFrame: years = sorted(int(year) for year in trades["sell_year"].unique()) rows: list[dict[str, object]] = [] for idx, test_year in enumerate(years): if idx >= 1: train_years = years[:idx] train_df = trades[trades["sell_year"].isin(train_years)] test_df = trades[trades["sell_year"] == test_year] rows.append( { "branch": branch_name, "scheme": "anchored_expanding", "train_start_year": train_years[0], "train_end_year": train_years[-1], "test_year": test_year, **{f"train_{k}": v for k, v in _segment_stats(train_df).items()}, **{f"test_{k}": v for k, v in _segment_stats(test_df).items()}, } ) if idx >= 3: train_years = years[idx - 3 : idx] train_df = trades[trades["sell_year"].isin(train_years)] test_df = trades[trades["sell_year"] == test_year] rows.append( { "branch": branch_name, "scheme": "rolling_3y", "train_start_year": train_years[0], "train_end_year": train_years[-1], "test_year": test_year, **{f"train_{k}": v for k, v in _segment_stats(train_df).items()}, **{f"test_{k}": v for k, v in _segment_stats(test_df).items()}, } ) return pd.DataFrame(rows) def _run_branch( name: str, config: StrategyConfig, indicator_df: pd.DataFrame, workbook_events: pd.DataFrame, first_date: str, last_date: str, ) -> tuple[dict[str, object], pd.DataFrame, pd.DataFrame, pd.DataFrame]: engine = DragonRuleEngine(config=config) events, trades = engine.run(indicator_df) start = max(first_date, START_DATE) end = min(last_date, END_DATE) events = events[(events["date"] >= start) & (events["date"] <= end)].copy() trades = trades[ (trades["buy_date"] >= start) & (trades["buy_date"] <= end) & (trades["sell_date"] >= start) & (trades["sell_date"] <= end) ].copy() buy_overlap, buy_missing, buy_extra = _event_match(events, workbook_events, "BUY") sell_overlap, sell_missing, sell_extra = _event_match(events, workbook_events, "SELL") trades["branch"] = name trades["sell_dt"] = pd.to_datetime(trades["sell_date"]) trades["sell_year"] = trades["sell_dt"].dt.year.astype(int) trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket) returns = trades["return_pct"].astype(float) if not trades.empty else pd.Series(dtype=float) summary = { "branch": name, "trades": int(len(trades)), "win_rate": float((returns > 0).mean()) if not trades.empty else float("nan"), "avg_return": float(returns.mean()) if not trades.empty else float("nan"), "median_return": float(returns.median()) if not trades.empty else float("nan"), "profit_factor": profit_factor(returns) if not trades.empty else float("nan"), "real_buy_overlap": int(buy_overlap), "real_buy_missing": int(buy_missing), "real_buy_extra": int(buy_extra), "real_sell_overlap": int(sell_overlap), "real_sell_missing": int(sell_missing), "real_sell_extra": int(sell_extra), "short_00_05d_avg_return": float(trades[trades["holding_bucket"] == "00-05d"]["return_pct"].mean()), "short_06_10d_avg_return": float(trades[trades["holding_bucket"] == "06-10d"]["return_pct"].mean()), } bucket_rows: list[dict[str, object]] = [] for bucket, group in trades.groupby("holding_bucket", dropna=False): bucket_rows.append( { "branch": name, "holding_bucket": bucket, "trades": int(len(group)), "win_rate": float((group["return_pct"] > 0).mean()), "avg_return": float(group["return_pct"].mean()), "profit_factor": profit_factor(group["return_pct"]), } ) holding_df = pd.DataFrame(bucket_rows).sort_values("holding_bucket") walk_forward_df = _build_walk_forward(trades, name) return summary, trades, holding_df, walk_forward_df def _config_snapshot(config: StrategyConfig) -> dict[str, object]: snapshot = asdict(config) snapshot["disabled_rules"] = sorted(config.disabled_rules) return snapshot def main() -> None: base_dir = Path(__file__).resolve().parent indicator_df = _load_indicator_snapshot(base_dir) workbook_events = _load_true_trade_events(base_dir) first_date = workbook_events["date"].min() last_date = workbook_events["date"].max() workbook_config = StrategyConfig() alpha_config = workbook_config.with_updates( deep_oversold_selective_positive_b1_c1_max=15.3, deep_oversold_selective_shallow_c1_min=12.0, deep_oversold_selective_shallow_b1_min=-0.025, deep_oversold_selective_mixed_c1_max=10.2, deep_oversold_selective_mixed_require_no_ql=True, ) workbook_summary, workbook_trades, workbook_holding, workbook_walk = _run_branch( "workbook_preserving", workbook_config, indicator_df, workbook_events, first_date, last_date, ) alpha_summary, alpha_trades, alpha_holding, alpha_walk = _run_branch( "alpha_first_selective_veto", alpha_config, indicator_df, workbook_events, first_date, last_date, ) summary_df = pd.DataFrame([workbook_summary, alpha_summary]) baseline_row = summary_df[summary_df["branch"] == "workbook_preserving"].iloc[0] alpha_row = summary_df[summary_df["branch"] == "alpha_first_selective_veto"].iloc[0] comparison = pd.DataFrame( [ { "metric": col, "workbook_preserving": baseline_row[col], "alpha_first_selective_veto": alpha_row[col], "delta_alpha_minus_workbook": alpha_row[col] - baseline_row[col] if isinstance(alpha_row[col], (int, float)) and isinstance(baseline_row[col], (int, float)) else None, } for col in [ "trades", "win_rate", "avg_return", "median_return", "profit_factor", "real_buy_overlap", "real_sell_overlap", "short_00_05d_avg_return", "short_06_10d_avg_return", ] ] ) baseline_set = set(zip(workbook_trades["buy_date"], workbook_trades["sell_date"], workbook_trades["buy_reason"], workbook_trades["sell_reason"])) alpha_set = set(zip(alpha_trades["buy_date"], alpha_trades["sell_date"], alpha_trades["buy_reason"], alpha_trades["sell_reason"])) trade_diff_rows: list[dict[str, object]] = [] for row in sorted(baseline_set - alpha_set): trade_diff_rows.append( { "change_type": "removed_from_alpha", "buy_date": row[0], "sell_date": row[1], "buy_reason": row[2], "sell_reason": row[3], } ) for row in sorted(alpha_set - baseline_set): trade_diff_rows.append( { "change_type": "added_in_alpha", "buy_date": row[0], "sell_date": row[1], "buy_reason": row[2], "sell_reason": row[3], } ) trade_diff_df = pd.DataFrame(trade_diff_rows) combined_holding = pd.concat([workbook_holding, alpha_holding], ignore_index=True) combined_walk = pd.concat([workbook_walk, alpha_walk], ignore_index=True) summary_df.to_csv(base_dir / "dragon_alpha_first_branch_summary.csv", index=False, encoding="utf-8-sig") comparison.to_csv(base_dir / "dragon_alpha_first_branch_comparison.csv", index=False, encoding="utf-8-sig") combined_holding.to_csv(base_dir / "dragon_alpha_first_branch_holding_buckets.csv", index=False, encoding="utf-8-sig") combined_walk.to_csv(base_dir / "dragon_alpha_first_branch_walk_forward.csv", index=False, encoding="utf-8-sig") trade_diff_df.to_csv(base_dir / "dragon_alpha_first_branch_trade_diff.csv", index=False, encoding="utf-8-sig") (base_dir / "dragon_alpha_first_config_snapshot.json").write_text( json.dumps(_config_snapshot(alpha_config), indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) def _wf_stats(df: pd.DataFrame, scheme: str) -> tuple[int, int, float]: view = df[df["scheme"] == scheme] positive = int((view["test_avg_return"] > 0).sum()) if not view.empty else 0 total = int(len(view)) avg_oos = float(view["test_avg_return"].mean()) if not view.empty else float("nan") return positive, total, avg_oos wb_anchor_pos, wb_anchor_total, wb_anchor_avg = _wf_stats(workbook_walk, "anchored_expanding") af_anchor_pos, af_anchor_total, af_anchor_avg = _wf_stats(alpha_walk, "anchored_expanding") wb_roll_pos, wb_roll_total, wb_roll_avg = _wf_stats(workbook_walk, "rolling_3y") af_roll_pos, af_roll_total, af_roll_avg = _wf_stats(alpha_walk, "rolling_3y") lines = [ "# Dragon Alpha-First Branch Report", "", "## Branches", f"- Evaluation window: `{START_DATE}` to `{END_DATE}`.", "- `workbook_preserving`: official formal baseline, preserves workbook structure as much as possible.", "- `alpha_first_selective_veto`: research branch using the current best narrow deep-oversold veto package.", "", "## Headline Comparison", f"- workbook_preserving: trades `{int(baseline_row['trades'])}`, avg_return `{_format_pct(float(baseline_row['avg_return']))}`, profit_factor `{_format_num(float(baseline_row['profit_factor']))}`, real BUY / SELL `{int(baseline_row['real_buy_overlap'])}/{int(baseline_row['real_sell_overlap'])}`", f"- alpha_first_selective_veto: trades `{int(alpha_row['trades'])}`, avg_return `{_format_pct(float(alpha_row['avg_return']))}`, profit_factor `{_format_num(float(alpha_row['profit_factor']))}`, real BUY / SELL `{int(alpha_row['real_buy_overlap'])}/{int(alpha_row['real_sell_overlap'])}`", "", "## Short-Holding Impact", f"- `00-05d` avg_return: workbook `{_format_pct(float(baseline_row['short_00_05d_avg_return']))}` vs alpha-first `{_format_pct(float(alpha_row['short_00_05d_avg_return']))}`", f"- `06-10d` avg_return: workbook `{_format_pct(float(baseline_row['short_06_10d_avg_return']))}` vs alpha-first `{_format_pct(float(alpha_row['short_06_10d_avg_return']))}`", "", "## Walk-Forward Comparison", f"- Anchored expanding: workbook positive `{wb_anchor_pos}/{wb_anchor_total}`, avg test return `{_format_pct(wb_anchor_avg)}`; alpha-first positive `{af_anchor_pos}/{af_anchor_total}`, avg test return `{_format_pct(af_anchor_avg)}`", f"- Rolling 3Y: workbook positive `{wb_roll_pos}/{wb_roll_total}`, avg test return `{_format_pct(wb_roll_avg)}`; alpha-first positive `{af_roll_pos}/{af_roll_total}`, avg test return `{_format_pct(af_roll_avg)}`", "", "## Trade-Diff Summary", f"- trades removed from alpha-first vs workbook: `{int((trade_diff_df['change_type'] == 'removed_from_alpha').sum())}`", f"- trades added in alpha-first vs workbook: `{int((trade_diff_df['change_type'] == 'added_in_alpha').sum())}`", "- Key removed deep-oversold trades are the narrow pathological subset identified in Track A, not the full weak-subtype family.", "", "## Governance", "- Keep `workbook_preserving` as the official reconstruction baseline.", "- Keep `alpha_first_selective_veto` as the leading performance-oriented research branch.", "- Do not merge alpha-first veto rules back into the official baseline unless the objective explicitly changes from workbook preservation to alpha-first optimization.", "", "## Quant Judgment", "- Stage 3 is complete once both baselines are explicitly separated and reproducible.", "- The workbook-preserving baseline remains the authoritative reconstruction target.", "- The alpha-first branch now has a concrete candidate baseline with better trade quality and better short-holding behavior, at the cost of expected workbook alignment loss.", "- Future work should choose one branch explicitly before optimizing further; the main unresolved technical decision is governance, not missing analysis.", ] (base_dir / "dragon_alpha_first_baseline.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()