from __future__ import annotations from pathlib import Path import pandas as pd from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine from dragon_strategy import DragonRuleEngine from dragon_strategy_config import StrategyConfig from dragon_workbook import DragonWorkbook def _find_workbook(base_dir: Path) -> Path: matches = sorted(base_dir.glob("*.xlsx")) if not matches: raise FileNotFoundError(f"No workbook found in {base_dir}") return matches[0] def _load_workbook_events(workbook_path: Path) -> pd.DataFrame: workbook = DragonWorkbook(workbook_path) return pd.DataFrame( [ { "date": event.date.isoformat(), "side": event.side, "layer": event.layer, "signal_reason": event.signal_reason, "note": event.note, } for event in workbook.split_layers() ] ) def _event_match_report(workbook_events: pd.DataFrame, strategy_events: pd.DataFrame, side: str, layer: str) -> dict[str, object]: wb = set(workbook_events[(workbook_events["side"] == side) & (workbook_events["layer"] == layer)]["date"]) st = set(strategy_events[(strategy_events["side"] == side) & (strategy_events["layer"] == layer)]["date"]) hit = wb & st return { "workbook": len(wb), "strategy": len(st), "overlap": len(hit), "missing": len(wb - st), "extra": len(st - wb), } def _profit_factor(series: pd.Series) -> float: gross_profit = series[series > 0].sum() gross_loss = -series[series < 0].sum() if gross_loss == 0: return float("inf") if gross_profit > 0 else 0.0 return float(gross_profit / gross_loss) def _trade_quality(trades: pd.DataFrame, indicator_df: pd.DataFrame) -> tuple[float, float]: if trades.empty: return float("nan"), float("nan") lookup = indicator_df.reset_index().rename(columns={"index": "dt"}) lookup["date_str"] = lookup["date"].dt.date.astype(str) pos_lookup = {date_str: idx for idx, date_str in enumerate(lookup["date_str"])} mfe_values: list[float] = [] mae_values: list[float] = [] for _, trade in trades.iterrows(): buy_idx = pos_lookup.get(trade["buy_date"]) sell_idx = pos_lookup.get(trade["sell_date"]) if buy_idx is None or sell_idx is None or sell_idx < buy_idx: continue window = lookup.iloc[buy_idx : sell_idx + 1] entry_price = float(trade["buy_price"]) mfe_values.append(float(window["high"].max()) / entry_price - 1.0) mae_values.append(float(window["low"].min()) / entry_price - 1.0) return ( float(pd.Series(mfe_values).mean()) if mfe_values else float("nan"), float(pd.Series(mae_values).mean()) if mae_values else float("nan"), ) def _run_single_experiment( label: str, config: StrategyConfig, workbook_events: pd.DataFrame, indicator_df: pd.DataFrame, first_workbook_date: str, last_workbook_date: str, ) -> dict[str, object]: strategy = DragonRuleEngine(config=config) events, trades = strategy.run(indicator_df) events = events[(events["date"] >= first_workbook_date) & (events["date"] <= last_workbook_date)].copy() trades = trades[ (trades["buy_date"] >= first_workbook_date) & (trades["buy_date"] <= last_workbook_date) & (trades["sell_date"] >= first_workbook_date) & (trades["sell_date"] <= last_workbook_date) ].copy() buy_match = _event_match_report(workbook_events, events, "BUY", "real_trade") sell_match = _event_match_report(workbook_events, events, "SELL", "real_trade") aux_buy_match = _event_match_report(workbook_events, events, "BUY", "aux_signal") aux_sell_match = _event_match_report(workbook_events, events, "SELL", "aux_signal") avg_mfe, avg_mae = _trade_quality(trades, indicator_df) win_rate = float((trades["return_pct"] > 0).mean()) if not trades.empty else float("nan") avg_return = float(trades["return_pct"].mean()) if not trades.empty else float("nan") median_return = float(trades["return_pct"].median()) if not trades.empty else float("nan") profit_factor = _profit_factor(trades["return_pct"]) if not trades.empty else float("nan") return { "experiment": label, "disabled_rules": "|".join(sorted(config.disabled_rules)), "aux_sell_same_side_once_per_cycle": config.aux_sell_same_side_once_per_cycle, "enable_knife_take_profit_2_wait_ql": config.enable_knife_take_profit_2_wait_ql, "trades": int(len(trades)), "win_rate": win_rate, "avg_return": avg_return, "median_return": median_return, "profit_factor": profit_factor, "avg_mfe_pct": avg_mfe, "avg_mae_pct": avg_mae, "real_buy_overlap": int(buy_match["overlap"]), "real_buy_missing": int(buy_match["missing"]), "real_buy_extra": int(buy_match["extra"]), "real_sell_overlap": int(sell_match["overlap"]), "real_sell_missing": int(sell_match["missing"]), "real_sell_extra": int(sell_match["extra"]), "aux_buy_overlap": int(aux_buy_match["overlap"]), "aux_buy_missing": int(aux_buy_match["missing"]), "aux_buy_extra": int(aux_buy_match["extra"]), "aux_sell_overlap": int(aux_sell_match["overlap"]), "aux_sell_missing": int(aux_sell_match["missing"]), "aux_sell_extra": int(aux_sell_match["extra"]), } def main() -> None: base_dir = Path(__file__).resolve().parent workbook_path = _find_workbook(base_dir) workbook_events = _load_workbook_events(workbook_path) first_workbook_date = pd.to_datetime(workbook_events["date"]).min().date().isoformat() last_workbook_date = pd.to_datetime(workbook_events["date"]).max().date().isoformat() engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date="2026-01-31")) indicator_df = engine.compute(engine.fetch_daily_data()) baseline_config = StrategyConfig() experiments: list[tuple[str, StrategyConfig]] = [ ("baseline", baseline_config), ("disable_entry_glued_buy", baseline_config.with_updates(disabled_rules={"glued_buy"})), ("disable_entry_deep_oversold_rebound_buy", baseline_config.with_updates(disabled_rules={"deep_oversold_rebound_buy"})), ("disable_entry_oversold_recovery_buy", baseline_config.with_updates(disabled_rules={"oversold_recovery_buy"})), ("disable_entry_post_sell_rebound_buy", baseline_config.with_updates(disabled_rules={"post_sell_rebound_buy"})), ("disable_entry_oversold_reversal_after_ql_buy", baseline_config.with_updates(disabled_rules={"oversold_reversal_after_ql_buy"})), ("disable_entry_non_glued_positive_expansion_buy", baseline_config.with_updates(disabled_rules={"non_glued_positive_expansion_buy"})), ("disable_entry_early_crash_probe_buy", baseline_config.with_updates(disabled_rules={"early_crash_probe_buy"})), ("disable_entry_dual_gold_resonance_buy", baseline_config.with_updates(disabled_rules={"dual_gold_resonance_buy"})), ("disable_exit_knife_take_profit_1", baseline_config.with_updates(disabled_rules={"knife_take_profit_1"})), ("disable_exit_knife_take_profit_2_glued", baseline_config.with_updates(disabled_rules={"knife_take_profit_2_glued"})), ("disable_exit_ql_mid_zone_take_profit", baseline_config.with_updates(disabled_rules={"ql_mid_zone_take_profit"})), ("disable_exit_high_regime_confirmed_exit_kdj", baseline_config.with_updates(disabled_rules={"high_regime_confirmed_exit:kdj_sell"})), ("disable_exit_predictive_b1_break_exit", baseline_config.with_updates(disabled_rules={"predictive_b1_break_exit"})), ("disable_exit_prewarning_reduction_exit", baseline_config.with_updates(disabled_rules={"prewarning_reduction_exit"})), ("disable_exit_crash_protection_exit", baseline_config.with_updates(disabled_rules={"crash_protection_exit"})), ("disable_aux_same_side_cycle_cap", baseline_config.with_updates(aux_sell_same_side_once_per_cycle=False)), ("disable_knife_take_profit_2_wait_ql", baseline_config.with_updates(enable_knife_take_profit_2_wait_ql=False)), ] rows = [ _run_single_experiment( label, config, workbook_events, indicator_df, first_workbook_date, last_workbook_date, ) for label, config in experiments ] result_df = pd.DataFrame(rows) baseline_row = result_df[result_df["experiment"] == "baseline"].iloc[0] for col in [ "trades", "win_rate", "avg_return", "median_return", "profit_factor", "avg_mfe_pct", "avg_mae_pct", "real_buy_overlap", "real_sell_overlap", "aux_sell_overlap", ]: result_df[f"delta_{col}"] = result_df[col] - baseline_row[col] result_df.to_csv(base_dir / "dragon_rule_ablation.csv", index=False, encoding="utf-8-sig") protected = result_df[ (result_df["experiment"] != "baseline") & (result_df["real_buy_overlap"] == baseline_row["real_buy_overlap"]) & (result_df["real_sell_overlap"] == baseline_row["real_sell_overlap"]) ].copy() protected_sorted = protected.sort_values("delta_avg_return", ascending=False) harmful_sorted = result_df[result_df["experiment"] != "baseline"].sort_values("delta_avg_return") lines = [ "# Dragon Rule Ablation", "", "## Baseline", f"- trades: `{int(baseline_row['trades'])}`", f"- win_rate: `{baseline_row['win_rate']:.2%}`", f"- avg_return: `{baseline_row['avg_return']:.2%}`", f"- profit_factor: `{baseline_row['profit_factor']:.2f}`", f"- real BUY overlap: `{int(baseline_row['real_buy_overlap'])}`", f"- real SELL overlap: `{int(baseline_row['real_sell_overlap'])}`", "", "## Protected Experiments", "- Interpretation: these experiments preserved current real-trade overlap and only changed quality or auxiliary behavior.", ] if protected_sorted.empty: lines.append("- None.") else: for _, row in protected_sorted.head(8).iterrows(): lines.append( f"- `{row['experiment']}`: delta_avg_return `{row['delta_avg_return']:.2%}`, " f"delta_profit_factor `{row['delta_profit_factor']:.2f}`, delta_aux_sell_overlap `{int(row['delta_aux_sell_overlap'])}`" ) lines.extend(["", "## Most Harmful Removals"]) for _, row in harmful_sorted.head(8).iterrows(): lines.append( f"- `{row['experiment']}`: delta_avg_return `{row['delta_avg_return']:.2%}`, " f"real BUY `{int(row['real_buy_overlap'])}`, real SELL `{int(row['real_sell_overlap'])}`, " f"delta_trades `{int(row['delta_trades'])}`" ) lines.extend(["", "## Best Removal Candidates"]) best_candidates = result_df[ (result_df["experiment"] != "baseline") & (result_df["delta_avg_return"] > 0) ].sort_values(["real_buy_overlap", "real_sell_overlap", "delta_avg_return"], ascending=[False, False, False]) for _, row in best_candidates.head(8).iterrows(): lines.append( f"- `{row['experiment']}`: delta_avg_return `{row['delta_avg_return']:.2%}`, " f"delta_profit_factor `{row['delta_profit_factor']:.2f}`, " f"real BUY `{int(row['real_buy_overlap'])}`, real SELL `{int(row['real_sell_overlap'])}`" ) (base_dir / "dragon_rule_ablation.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()