from __future__ import annotations from pathlib import Path import pandas as pd from dragon_branch_configs import alpha_first_selective_veto_config from dragon_strategy import DragonRuleEngine SHORT_BUCKETS = {"00-05d", "06-10d"} def _load_csv(base_dir: Path, name: str) -> pd.DataFrame: return pd.read_csv(base_dir / name, encoding="utf-8-sig") def _holding_bucket(days: int) -> str: if days <= 5: return "00-05d" if days <= 10: return "06-10d" if days <= 20: return "11-20d" if days <= 40: return "21-40d" return "41d+" def _entry_family(reason: str) -> str: return reason.split(":", 1)[0] def _exit_family(reason: str) -> str: return reason.split(":", 1)[0] def _compute_forward_return(indicators: pd.DataFrame, date_str: str, base_price: float, days: int) -> float: row = indicators[indicators["date_str"] == date_str] if row.empty: return float("nan") idx = int(row.iloc[0]["row_id"]) target_idx = idx + days if target_idx >= len(indicators): return float("nan") return float(indicators.iloc[target_idx]["close"]) / base_price - 1.0 def _compute_metrics(trade: pd.Series, indicators: pd.DataFrame) -> dict[str, float]: buy_date = trade["buy_date"] sell_date = trade["sell_date"] entry_price = float(trade["buy_price"]) exit_price = float(trade["sell_price"]) buy_row = indicators[indicators["date_str"] == buy_date] sell_row = indicators[indicators["date_str"] == sell_date] if buy_row.empty or sell_row.empty: return {} buy_idx = int(buy_row.iloc[0]["row_id"]) sell_idx = int(sell_row.iloc[0]["row_id"]) window = indicators.iloc[buy_idx : sell_idx + 1].copy() peak_close = float(window["close"].max()) trough_close = float(window["close"].min()) peak_before_exit = peak_close / entry_price - 1.0 drawdown_before_exit = trough_close / entry_price - 1.0 mfe_pct = float(window["high"].max()) / entry_price - 1.0 mae_pct = float(window["low"].min()) / entry_price - 1.0 sell_plus_3d = float("nan") sell_plus_5d = float("nan") if sell_idx + 3 < len(indicators): sell_plus_3d = float(indicators.iloc[sell_idx + 3]["close"]) / exit_price - 1.0 if sell_idx + 5 < len(indicators): sell_plus_5d = float(indicators.iloc[sell_idx + 5]["close"]) / exit_price - 1.0 return { "buy_plus_1d_return": _compute_forward_return(indicators, buy_date, entry_price, 1), "buy_plus_2d_return": _compute_forward_return(indicators, buy_date, entry_price, 2), "buy_plus_3d_return": _compute_forward_return(indicators, buy_date, entry_price, 3), "sell_plus_3d_followthrough": sell_plus_3d, "sell_plus_5d_followthrough": sell_plus_5d, "peak_before_exit": peak_before_exit, "drawdown_before_exit": drawdown_before_exit, "mfe_pct": mfe_pct, "mae_pct": mae_pct, } def _failure_shape(row: pd.Series) -> str: if bool(row["is_predictive_bridge_chain"]): return "bridge_trade" if row["return_pct"] >= 0 and row["sell_plus_3d_followthrough"] > 0.02: return "exit_too_early" if row["return_pct"] < 0: if row["holding_days"] <= 5 and row["peak_before_exit"] <= 0.01: return "immediate_failure" if row["peak_before_exit"] >= 0.02: return "rebound_then_fail" if 0 < row["peak_before_exit"] < 0.02: return "small_profit_reversal" if row["sell_plus_3d_followthrough"] > 0.03 or row["sell_plus_5d_followthrough"] > 0.04: return "exit_too_early" return "flat_noise" def _failure_root(row: pd.Series) -> str: if bool(row["is_predictive_bridge_chain"]): return "bridge_trade" if row["failure_shape"] == "exit_too_early": return "exit_too_fast" if row["failure_shape"] == "immediate_failure": return "entry_bad" if row["failure_shape"] in {"rebound_then_fail", "small_profit_reversal"}: return "hold_bad" return "mixed" def main() -> None: base_dir = Path(__file__).resolve().parent indicators = _load_csv(base_dir, "dragon_indicator_snapshot.csv") indicators["date"] = pd.to_datetime(indicators["date"]) indicators = indicators.sort_values("date").reset_index(drop=True) indicators["date_str"] = indicators["date"].dt.date.astype(str) indicators["row_id"] = indicators.index path_trace = _load_csv(base_dir, "dragon_trade_path_trace.csv") workbook_events = _load_csv(base_dir, "true_trade_events.csv") first_date = workbook_events["date"].min() last_date = workbook_events["date"].max() engine = DragonRuleEngine(alpha_first_selective_veto_config()) events, trades = engine.run(indicators.set_index("date", drop=False)) trades = trades[ (trades["buy_date"] >= first_date) & (trades["buy_date"] <= last_date) & (trades["sell_date"] >= first_date) & (trades["sell_date"] <= last_date) ].copy() trades["entry_family"] = trades["buy_reason"].astype(str).map(_entry_family) trades["exit_family"] = trades["sell_reason"].astype(str).map(_exit_family) trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket) audit = trades[trades["holding_bucket"].isin(SHORT_BUCKETS)].copy() merge_cols = [ "buy_date", "sell_date", "market_state_layer", "entry_qualification_layer", "position_management_layer", "aux_context_layer", ] audit = audit.merge(path_trace[merge_cols], on=["buy_date", "sell_date"], how="left") metric_rows = [] for _, trade in audit.iterrows(): metrics = _compute_metrics(trade, indicators) metric_rows.append(metrics) metric_df = pd.DataFrame(metric_rows) audit = pd.concat([audit.reset_index(drop=True), metric_df], axis=1) audit["is_deep_oversold_family"] = audit["entry_family"].eq("deep_oversold_rebound_buy") audit["is_post_sell_rebound_family"] = audit["entry_family"].eq("post_sell_rebound_buy") audit["is_predictive_bridge_chain"] = audit["buy_reason"].eq("predictive_error_reentry_buy") | audit["sell_reason"].eq("predictive_b1_break_exit") audit["failure_shape"] = audit.apply(_failure_shape, axis=1) audit["failure_root"] = audit.apply(_failure_root, axis=1) audit = audit[ [ "buy_date", "sell_date", "buy_reason", "sell_reason", "entry_family", "exit_family", "holding_days", "holding_bucket", "return_pct", "mfe_pct", "mae_pct", "market_state_layer", "entry_qualification_layer", "position_management_layer", "aux_context_layer", "is_deep_oversold_family", "is_post_sell_rebound_family", "is_predictive_bridge_chain", "buy_plus_1d_return", "buy_plus_2d_return", "buy_plus_3d_return", "sell_plus_3d_followthrough", "sell_plus_5d_followthrough", "peak_before_exit", "drawdown_before_exit", "failure_shape", "failure_root", ] ].sort_values(["holding_bucket", "return_pct", "buy_date"]) audit.to_csv(base_dir / "dragon_short_holding_audit.csv", index=False, encoding="utf-8-sig") failure_summary = ( audit.groupby(["holding_bucket", "failure_root", "failure_shape"], dropna=False) .agg( trades=("buy_date", "count"), avg_return=("return_pct", "mean"), avg_mfe=("mfe_pct", "mean"), avg_mae=("mae_pct", "mean"), ) .reset_index() .sort_values(["holding_bucket", "trades", "avg_return"], ascending=[True, False, True]) ) family_summary = ( audit.groupby(["holding_bucket", "entry_family"], dropna=False) .agg( trades=("buy_date", "count"), win_rate=("return_pct", lambda s: float((s > 0).mean())), avg_return=("return_pct", "mean"), avg_mfe=("mfe_pct", "mean"), avg_mae=("mae_pct", "mean"), ) .reset_index() .sort_values(["holding_bucket", "avg_return", "trades"], ascending=[True, True, False]) ) lines = [ "# Dragon Short Holding Review", "", "- Branch: `alpha_first_selective_veto`.", "- Scope: only `00-05d` and `06-10d` trades.", f"- audited short trades: `{int(len(audit))}`", f"- `00-05d` avg_return: `{audit[audit['holding_bucket'] == '00-05d']['return_pct'].mean():.2%}`", f"- `06-10d` avg_return: `{audit[audit['holding_bucket'] == '06-10d']['return_pct'].mean():.2%}`", "", "## Failure Root Summary", ] for _, row in failure_summary.iterrows(): lines.append( f"- `{row['holding_bucket']} / {row['failure_root']} / {row['failure_shape']}`: trades `{int(row['trades'])}`, " f"avg_return `{row['avg_return']:.2%}`, avg_mfe `{row['avg_mfe']:.2%}`, avg_mae `{row['avg_mae']:.2%}`" ) worst_families = family_summary.groupby("holding_bucket", group_keys=False).head(5) lines.extend(["", "## Weak Entry Families"]) for _, row in worst_families.iterrows(): lines.append( f"- `{row['holding_bucket']} / {row['entry_family']}`: trades `{int(row['trades'])}`, win_rate `{row['win_rate']:.2%}`, " f"avg_return `{row['avg_return']:.2%}`, avg_mfe `{row['avg_mfe']:.2%}`, avg_mae `{row['avg_mae']:.2%}`" ) bridge_count = int(audit["is_predictive_bridge_chain"].sum()) deep_count = int(audit["is_deep_oversold_family"].sum()) post_sell_count = int(audit["is_post_sell_rebound_family"].sum()) entry_bad = int((audit["failure_root"] == "entry_bad").sum()) hold_bad = int((audit["failure_root"] == "hold_bad").sum()) exit_fast = int((audit["failure_root"] == "exit_too_fast").sum()) lines.extend( [ "", "## Quant Judgment", f"- Deep-oversold short trades: `{deep_count}`; post-sell-rebound short trades: `{post_sell_count}`; bridge trades: `{bridge_count}`.", f"- Root split: entry_bad `{entry_bad}`, hold_bad `{hold_bad}`, exit_too_fast `{exit_fast}`.", "- The next experiment pack should prioritize the dominant drag family and separate bad-entry veto from early-exit extension.", ] ) (base_dir / "dragon_short_holding_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()