| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- from __future__ import annotations
- import json
- from pathlib import Path
- import pandas as pd
- from dragon_branch_configs import alpha_first_glued_refined_hot_cap_config
- from dragon_execution_common import apply_execution_model, summary
- from dragon_rc1_golden_baseline import _load_indicator_snapshot
- from dragon_shared import END_DATE, START_DATE, format_num, format_pct
- from dragon_strategy import DragonRuleEngine
- from dragon_strategy_config import StrategyConfig
- def _bounded_trades(trades: pd.DataFrame) -> pd.DataFrame:
- return trades[
- (trades["buy_date"] >= START_DATE)
- & (trades["buy_date"] <= END_DATE)
- & (trades["sell_date"] >= START_DATE)
- & (trades["sell_date"] <= END_DATE)
- ].copy()
- def _trade_key(frame: pd.DataFrame) -> pd.Series:
- return (
- frame["buy_date"].astype(str)
- + "|"
- + frame["sell_date"].astype(str)
- + "|"
- + frame["buy_reason"].astype(str)
- + "|"
- + frame["sell_reason"].astype(str)
- )
- def _trade_diff(baseline: pd.DataFrame, candidate: pd.DataFrame, label: str) -> pd.DataFrame:
- base = baseline.copy()
- cand = candidate.copy()
- base["trade_key"] = _trade_key(base)
- cand["trade_key"] = _trade_key(cand)
- removed = base.loc[~base["trade_key"].isin(cand["trade_key"])].copy()
- removed["diff_type"] = "REMOVED"
- added = cand.loc[~cand["trade_key"].isin(base["trade_key"])].copy()
- added["diff_type"] = "ADDED"
- diff = pd.concat([removed, added], ignore_index=True)
- diff.insert(0, "experiment", label)
- return diff
- def _add_execution_prices(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
- result = trades.copy()
- ind = indicators.copy().sort_values("date").reset_index(drop=True)
- ind["date"] = pd.to_datetime(ind["date"])
- open_col = "open" if "open" in ind.columns else "close"
- close_col = "close"
- next_by_date: dict[pd.Timestamp, pd.Series] = {}
- for idx in range(len(ind) - 1):
- next_by_date[pd.Timestamp(ind.iloc[idx]["date"])] = ind.iloc[idx + 1]
- next_open_entry: list[float] = []
- next_open_exit: list[float] = []
- next_close_entry: list[float] = []
- next_close_exit: list[float] = []
- same_close_entry: list[float] = []
- same_close_exit: list[float] = []
- for _, row in result.iterrows():
- buy_date = pd.Timestamp(row["buy_date"])
- sell_date = pd.Timestamp(row["sell_date"])
- buy_next = next_by_date.get(buy_date)
- sell_next = next_by_date.get(sell_date)
- next_open_entry.append(float("nan") if buy_next is None else float(buy_next[open_col]))
- next_open_exit.append(float("nan") if sell_next is None else float(sell_next[open_col]))
- next_close_entry.append(float("nan") if buy_next is None else float(buy_next[close_col]))
- next_close_exit.append(float("nan") if sell_next is None else float(sell_next[close_col]))
- same_close_entry.append(float(row["buy_price"]))
- same_close_exit.append(float(row["sell_price"]))
- result["exec_same_close_entry"] = same_close_entry
- result["exec_same_close_exit"] = same_close_exit
- result["exec_next_open_entry"] = next_open_entry
- result["exec_next_open_exit"] = next_open_exit
- result["exec_next_close_entry"] = next_close_entry
- result["exec_next_close_exit"] = next_close_exit
- result["entry_family"] = result["buy_reason_family"].astype(str)
- return result
- def _run_variant(indexed: pd.DataFrame, config: StrategyConfig) -> pd.DataFrame:
- _, trades = DragonRuleEngine(config=config).run(indexed)
- return _bounded_trades(trades)
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- indexed, source = _load_indicator_snapshot(base_dir)
- indicators = indexed.reset_index(drop=False).rename(columns={"index": "date"})
- baseline_cfg = alpha_first_glued_refined_hot_cap_config()
- deep_oversold_cfg = baseline_cfg.with_updates(
- deep_oversold_confirm_weak_with_ql=True,
- deep_oversold_block_shallow_false_start_without_ql=True,
- deep_oversold_selective_positive_b1_c1_max=15.3,
- )
- predictive_bridge_only_cfg = baseline_cfg.with_updates(
- disabled_rules=frozenset(set(baseline_cfg.disabled_rules) | {"predictive_b1_break_exit", "predictive_error_reentry_buy"}),
- )
- variants: dict[str, StrategyConfig] = {
- "rc1_baseline": baseline_cfg,
- "deep_oversold_confirmation_v2": deep_oversold_cfg,
- "predictive_bridge_only": predictive_bridge_only_cfg,
- }
- trades_by_variant = {name: _run_variant(indexed, cfg) for name, cfg in variants.items()}
- summary_rows: list[dict[str, object]] = []
- for name, trades in trades_by_variant.items():
- returns = trades["return_pct"].astype(float)
- summary_rows.append(
- {
- "experiment": name,
- "trades": int(len(trades)),
- "win_rate": float((returns > 0).mean()) if not trades.empty else float("nan"),
- "avg_return": float(returns.mean()) if not trades.empty else float("nan"),
- "median_return": float(returns.median()) if not trades.empty else float("nan"),
- "compounded_return": float((1.0 + returns).prod() - 1.0) if not trades.empty else float("nan"),
- }
- )
- summary_df = pd.DataFrame(summary_rows).sort_values("experiment").reset_index(drop=True)
- summary_df.to_csv(base_dir / "dragon_weak_family_experiment_summary.csv", index=False, encoding="utf-8-sig")
- baseline_trades = trades_by_variant["rc1_baseline"]
- diff_rows = []
- for name, trades in trades_by_variant.items():
- if name == "rc1_baseline":
- continue
- diff_rows.append(_trade_diff(baseline_trades, trades, label=name))
- diff_df = pd.concat(diff_rows, ignore_index=True) if diff_rows else pd.DataFrame()
- if not diff_df.empty:
- diff_df.to_csv(base_dir / "dragon_weak_family_trade_diff.csv", index=False, encoding="utf-8-sig")
- stress_rows: list[dict[str, object]] = []
- for name, trades in trades_by_variant.items():
- enriched = _add_execution_prices(trades, indicators)
- for model in ["same_close", "next_open", "next_close"]:
- model_trades = apply_execution_model(enriched, model, 0.0)
- stress_rows.append(summary(name, model_trades))
- if model == "next_open":
- stressed = apply_execution_model(enriched, model, 20.0)
- stress_rows.append(summary(name, stressed))
- stress_df = pd.DataFrame(stress_rows).rename(columns={"branch": "experiment"})
- stress_df = stress_df.sort_values(["experiment", "execution_model", "cost_bps_side"]).reset_index(drop=True)
- stress_df.to_csv(base_dir / "dragon_weak_family_execution_stress.csv", index=False, encoding="utf-8-sig")
- (base_dir / "dragon_weak_family_experiment_config_snapshot.json").write_text(
- json.dumps(
- {
- "indicator_source": source,
- "evaluation_window": {"start": START_DATE, "end": END_DATE},
- "variants": {name: cfg.__dict__ for name, cfg in variants.items()},
- },
- ensure_ascii=False,
- indent=2,
- default=list,
- ),
- encoding="utf-8",
- )
- lines = [
- "# Dragon Weak Family Experiment Review",
- "",
- f"- indicator source: `{source}`",
- f"- window: `{START_DATE} -> {END_DATE}`",
- "",
- "## Summary",
- ]
- for _, row in summary_df.iterrows():
- lines.append(
- "- "
- f"{row['experiment']}: "
- f"trades `{int(row['trades'])}`, "
- f"win_rate `{format_pct(float(row['win_rate']))}`, "
- f"avg_return `{format_pct(float(row['avg_return']))}`, "
- f"compounded `{format_pct(float(row['compounded_return']))}`"
- )
- if not diff_df.empty:
- lines.extend(["", "## Added/Removed Trades vs RC1"])
- for experiment, group in diff_df.groupby("experiment", dropna=False):
- added = int((group["diff_type"] == "ADDED").sum())
- removed = int((group["diff_type"] == "REMOVED").sum())
- avg_added = float(group.loc[group["diff_type"] == "ADDED", "return_pct"].mean()) if added > 0 else float("nan")
- avg_removed = float(group.loc[group["diff_type"] == "REMOVED", "return_pct"].mean()) if removed > 0 else float("nan")
- lines.append(
- "- "
- f"{experiment}: added `{added}` (avg `{format_pct(avg_added)}`), "
- f"removed `{removed}` (avg `{format_pct(avg_removed)}`)"
- )
- lines.extend(["", "## Execution Stress (next_open + 20 bps/side)"])
- stress_20 = stress_df[(stress_df["execution_model"] == "next_open") & (stress_df["cost_bps_side"] == 20.0)]
- for _, row in stress_20.iterrows():
- lines.append(
- "- "
- f"{row['experiment']}: "
- f"avg_return `{format_pct(float(row['avg_return']))}`, "
- f"PF `{format_num(float(row['profit_factor']))}`, "
- f"max_dd `{format_pct(float(row['max_drawdown']))}`"
- )
- (base_dir / "dragon_weak_family_experiment.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|