from __future__ import annotations import json from pathlib import Path import pandas as pd from dragon_branch_configs import alpha_first_glued_refined_hot_cap_config from dragon_execution_common import apply_execution_model, summary from dragon_rc1_golden_baseline import _load_indicator_snapshot from dragon_shared import END_DATE, START_DATE, format_num, format_pct from dragon_strategy import DragonRuleEngine from dragon_strategy_config import StrategyConfig def _bounded_trades(trades: pd.DataFrame) -> pd.DataFrame: return trades[ (trades["buy_date"] >= START_DATE) & (trades["buy_date"] <= END_DATE) & (trades["sell_date"] >= START_DATE) & (trades["sell_date"] <= END_DATE) ].copy() def _trade_key(frame: pd.DataFrame) -> pd.Series: return ( frame["buy_date"].astype(str) + "|" + frame["sell_date"].astype(str) + "|" + frame["buy_reason"].astype(str) + "|" + frame["sell_reason"].astype(str) ) def _trade_diff(baseline: pd.DataFrame, candidate: pd.DataFrame, label: str) -> pd.DataFrame: base = baseline.copy() cand = candidate.copy() base["trade_key"] = _trade_key(base) cand["trade_key"] = _trade_key(cand) removed = base.loc[~base["trade_key"].isin(cand["trade_key"])].copy() removed["diff_type"] = "REMOVED" added = cand.loc[~cand["trade_key"].isin(base["trade_key"])].copy() added["diff_type"] = "ADDED" diff = pd.concat([removed, added], ignore_index=True) diff.insert(0, "experiment", label) return diff def _add_execution_prices(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame: result = trades.copy() ind = indicators.copy().sort_values("date").reset_index(drop=True) ind["date"] = pd.to_datetime(ind["date"]) open_col = "open" if "open" in ind.columns else "close" close_col = "close" next_by_date: dict[pd.Timestamp, pd.Series] = {} for idx in range(len(ind) - 1): next_by_date[pd.Timestamp(ind.iloc[idx]["date"])] = ind.iloc[idx + 1] next_open_entry: list[float] = [] next_open_exit: list[float] = [] next_close_entry: list[float] = [] next_close_exit: list[float] = [] same_close_entry: list[float] = [] same_close_exit: list[float] = [] for _, row in result.iterrows(): buy_date = pd.Timestamp(row["buy_date"]) sell_date = pd.Timestamp(row["sell_date"]) buy_next = next_by_date.get(buy_date) sell_next = next_by_date.get(sell_date) next_open_entry.append(float("nan") if buy_next is None else float(buy_next[open_col])) next_open_exit.append(float("nan") if sell_next is None else float(sell_next[open_col])) next_close_entry.append(float("nan") if buy_next is None else float(buy_next[close_col])) next_close_exit.append(float("nan") if sell_next is None else float(sell_next[close_col])) same_close_entry.append(float(row["buy_price"])) same_close_exit.append(float(row["sell_price"])) result["exec_same_close_entry"] = same_close_entry result["exec_same_close_exit"] = same_close_exit result["exec_next_open_entry"] = next_open_entry result["exec_next_open_exit"] = next_open_exit result["exec_next_close_entry"] = next_close_entry result["exec_next_close_exit"] = next_close_exit result["entry_family"] = result["buy_reason_family"].astype(str) return result def _run_variant(indexed: pd.DataFrame, config: StrategyConfig) -> pd.DataFrame: _, trades = DragonRuleEngine(config=config).run(indexed) return _bounded_trades(trades) def main() -> None: base_dir = Path(__file__).resolve().parent indexed, source = _load_indicator_snapshot(base_dir) indicators = indexed.reset_index(drop=False).rename(columns={"index": "date"}) baseline_cfg = alpha_first_glued_refined_hot_cap_config() deep_oversold_cfg = baseline_cfg.with_updates( deep_oversold_confirm_weak_with_ql=True, deep_oversold_block_shallow_false_start_without_ql=True, deep_oversold_selective_positive_b1_c1_max=15.3, ) predictive_bridge_only_cfg = baseline_cfg.with_updates( disabled_rules=frozenset(set(baseline_cfg.disabled_rules) | {"predictive_b1_break_exit", "predictive_error_reentry_buy"}), ) variants: dict[str, StrategyConfig] = { "rc1_baseline": baseline_cfg, "deep_oversold_confirmation_v2": deep_oversold_cfg, "predictive_bridge_only": predictive_bridge_only_cfg, } trades_by_variant = {name: _run_variant(indexed, cfg) for name, cfg in variants.items()} summary_rows: list[dict[str, object]] = [] for name, trades in trades_by_variant.items(): returns = trades["return_pct"].astype(float) summary_rows.append( { "experiment": name, "trades": int(len(trades)), "win_rate": float((returns > 0).mean()) if not trades.empty else float("nan"), "avg_return": float(returns.mean()) if not trades.empty else float("nan"), "median_return": float(returns.median()) if not trades.empty else float("nan"), "compounded_return": float((1.0 + returns).prod() - 1.0) if not trades.empty else float("nan"), } ) summary_df = pd.DataFrame(summary_rows).sort_values("experiment").reset_index(drop=True) summary_df.to_csv(base_dir / "dragon_weak_family_experiment_summary.csv", index=False, encoding="utf-8-sig") baseline_trades = trades_by_variant["rc1_baseline"] diff_rows = [] for name, trades in trades_by_variant.items(): if name == "rc1_baseline": continue diff_rows.append(_trade_diff(baseline_trades, trades, label=name)) diff_df = pd.concat(diff_rows, ignore_index=True) if diff_rows else pd.DataFrame() if not diff_df.empty: diff_df.to_csv(base_dir / "dragon_weak_family_trade_diff.csv", index=False, encoding="utf-8-sig") stress_rows: list[dict[str, object]] = [] for name, trades in trades_by_variant.items(): enriched = _add_execution_prices(trades, indicators) for model in ["same_close", "next_open", "next_close"]: model_trades = apply_execution_model(enriched, model, 0.0) stress_rows.append(summary(name, model_trades)) if model == "next_open": stressed = apply_execution_model(enriched, model, 20.0) stress_rows.append(summary(name, stressed)) stress_df = pd.DataFrame(stress_rows).rename(columns={"branch": "experiment"}) stress_df = stress_df.sort_values(["experiment", "execution_model", "cost_bps_side"]).reset_index(drop=True) stress_df.to_csv(base_dir / "dragon_weak_family_execution_stress.csv", index=False, encoding="utf-8-sig") (base_dir / "dragon_weak_family_experiment_config_snapshot.json").write_text( json.dumps( { "indicator_source": source, "evaluation_window": {"start": START_DATE, "end": END_DATE}, "variants": {name: cfg.__dict__ for name, cfg in variants.items()}, }, ensure_ascii=False, indent=2, default=list, ), encoding="utf-8", ) lines = [ "# Dragon Weak Family Experiment Review", "", f"- indicator source: `{source}`", f"- window: `{START_DATE} -> {END_DATE}`", "", "## Summary", ] for _, row in summary_df.iterrows(): lines.append( "- " f"{row['experiment']}: " f"trades `{int(row['trades'])}`, " f"win_rate `{format_pct(float(row['win_rate']))}`, " f"avg_return `{format_pct(float(row['avg_return']))}`, " f"compounded `{format_pct(float(row['compounded_return']))}`" ) if not diff_df.empty: lines.extend(["", "## Added/Removed Trades vs RC1"]) for experiment, group in diff_df.groupby("experiment", dropna=False): added = int((group["diff_type"] == "ADDED").sum()) removed = int((group["diff_type"] == "REMOVED").sum()) avg_added = float(group.loc[group["diff_type"] == "ADDED", "return_pct"].mean()) if added > 0 else float("nan") avg_removed = float(group.loc[group["diff_type"] == "REMOVED", "return_pct"].mean()) if removed > 0 else float("nan") lines.append( "- " f"{experiment}: added `{added}` (avg `{format_pct(avg_added)}`), " f"removed `{removed}` (avg `{format_pct(avg_removed)}`)" ) lines.extend(["", "## Execution Stress (next_open + 20 bps/side)"]) stress_20 = stress_df[(stress_df["execution_model"] == "next_open") & (stress_df["cost_bps_side"] == 20.0)] for _, row in stress_20.iterrows(): lines.append( "- " f"{row['experiment']}: " f"avg_return `{format_pct(float(row['avg_return']))}`, " f"PF `{format_num(float(row['profit_factor']))}`, " f"max_dd `{format_pct(float(row['max_drawdown']))}`" ) (base_dir / "dragon_weak_family_experiment.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()