| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- from __future__ import annotations
- from pathlib import Path
- import pandas as pd
- from dragon_branch_configs import alpha_first_selective_veto_config
- from dragon_strategy import DragonRuleEngine
- SHORT_BUCKETS = {"00-05d", "06-10d"}
- def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
- return pd.read_csv(base_dir / name, encoding="utf-8-sig")
- def _holding_bucket(days: int) -> str:
- if days <= 5:
- return "00-05d"
- if days <= 10:
- return "06-10d"
- if days <= 20:
- return "11-20d"
- if days <= 40:
- return "21-40d"
- return "41d+"
- def _entry_family(reason: str) -> str:
- return reason.split(":", 1)[0]
- def _exit_family(reason: str) -> str:
- return reason.split(":", 1)[0]
- def _compute_forward_return(indicators: pd.DataFrame, date_str: str, base_price: float, days: int) -> float:
- row = indicators[indicators["date_str"] == date_str]
- if row.empty:
- return float("nan")
- idx = int(row.iloc[0]["row_id"])
- target_idx = idx + days
- if target_idx >= len(indicators):
- return float("nan")
- return float(indicators.iloc[target_idx]["close"]) / base_price - 1.0
- def _compute_metrics(trade: pd.Series, indicators: pd.DataFrame) -> dict[str, float]:
- buy_date = trade["buy_date"]
- sell_date = trade["sell_date"]
- entry_price = float(trade["buy_price"])
- exit_price = float(trade["sell_price"])
- buy_row = indicators[indicators["date_str"] == buy_date]
- sell_row = indicators[indicators["date_str"] == sell_date]
- if buy_row.empty or sell_row.empty:
- return {}
- buy_idx = int(buy_row.iloc[0]["row_id"])
- sell_idx = int(sell_row.iloc[0]["row_id"])
- window = indicators.iloc[buy_idx : sell_idx + 1].copy()
- peak_close = float(window["close"].max())
- trough_close = float(window["close"].min())
- peak_before_exit = peak_close / entry_price - 1.0
- drawdown_before_exit = trough_close / entry_price - 1.0
- mfe_pct = float(window["high"].max()) / entry_price - 1.0
- mae_pct = float(window["low"].min()) / entry_price - 1.0
- sell_plus_3d = float("nan")
- sell_plus_5d = float("nan")
- if sell_idx + 3 < len(indicators):
- sell_plus_3d = float(indicators.iloc[sell_idx + 3]["close"]) / exit_price - 1.0
- if sell_idx + 5 < len(indicators):
- sell_plus_5d = float(indicators.iloc[sell_idx + 5]["close"]) / exit_price - 1.0
- return {
- "buy_plus_1d_return": _compute_forward_return(indicators, buy_date, entry_price, 1),
- "buy_plus_2d_return": _compute_forward_return(indicators, buy_date, entry_price, 2),
- "buy_plus_3d_return": _compute_forward_return(indicators, buy_date, entry_price, 3),
- "sell_plus_3d_followthrough": sell_plus_3d,
- "sell_plus_5d_followthrough": sell_plus_5d,
- "peak_before_exit": peak_before_exit,
- "drawdown_before_exit": drawdown_before_exit,
- "mfe_pct": mfe_pct,
- "mae_pct": mae_pct,
- }
- def _failure_shape(row: pd.Series) -> str:
- if bool(row["is_predictive_bridge_chain"]):
- return "bridge_trade"
- if row["return_pct"] >= 0 and row["sell_plus_3d_followthrough"] > 0.02:
- return "exit_too_early"
- if row["return_pct"] < 0:
- if row["holding_days"] <= 5 and row["peak_before_exit"] <= 0.01:
- return "immediate_failure"
- if row["peak_before_exit"] >= 0.02:
- return "rebound_then_fail"
- if 0 < row["peak_before_exit"] < 0.02:
- return "small_profit_reversal"
- if row["sell_plus_3d_followthrough"] > 0.03 or row["sell_plus_5d_followthrough"] > 0.04:
- return "exit_too_early"
- return "flat_noise"
- def _failure_root(row: pd.Series) -> str:
- if bool(row["is_predictive_bridge_chain"]):
- return "bridge_trade"
- if row["failure_shape"] == "exit_too_early":
- return "exit_too_fast"
- if row["failure_shape"] == "immediate_failure":
- return "entry_bad"
- if row["failure_shape"] in {"rebound_then_fail", "small_profit_reversal"}:
- return "hold_bad"
- return "mixed"
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- indicators = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
- indicators["date"] = pd.to_datetime(indicators["date"])
- indicators = indicators.sort_values("date").reset_index(drop=True)
- indicators["date_str"] = indicators["date"].dt.date.astype(str)
- indicators["row_id"] = indicators.index
- path_trace = _load_csv(base_dir, "dragon_trade_path_trace.csv")
- workbook_events = _load_csv(base_dir, "true_trade_events.csv")
- first_date = workbook_events["date"].min()
- last_date = workbook_events["date"].max()
- engine = DragonRuleEngine(alpha_first_selective_veto_config())
- events, trades = engine.run(indicators.set_index("date", drop=False))
- trades = trades[
- (trades["buy_date"] >= first_date)
- & (trades["buy_date"] <= last_date)
- & (trades["sell_date"] >= first_date)
- & (trades["sell_date"] <= last_date)
- ].copy()
- trades["entry_family"] = trades["buy_reason"].astype(str).map(_entry_family)
- trades["exit_family"] = trades["sell_reason"].astype(str).map(_exit_family)
- trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
- audit = trades[trades["holding_bucket"].isin(SHORT_BUCKETS)].copy()
- merge_cols = [
- "buy_date",
- "sell_date",
- "market_state_layer",
- "entry_qualification_layer",
- "position_management_layer",
- "aux_context_layer",
- ]
- audit = audit.merge(path_trace[merge_cols], on=["buy_date", "sell_date"], how="left")
- metric_rows = []
- for _, trade in audit.iterrows():
- metrics = _compute_metrics(trade, indicators)
- metric_rows.append(metrics)
- metric_df = pd.DataFrame(metric_rows)
- audit = pd.concat([audit.reset_index(drop=True), metric_df], axis=1)
- audit["is_deep_oversold_family"] = audit["entry_family"].eq("deep_oversold_rebound_buy")
- audit["is_post_sell_rebound_family"] = audit["entry_family"].eq("post_sell_rebound_buy")
- audit["is_predictive_bridge_chain"] = audit["buy_reason"].eq("predictive_error_reentry_buy") | audit["sell_reason"].eq("predictive_b1_break_exit")
- audit["failure_shape"] = audit.apply(_failure_shape, axis=1)
- audit["failure_root"] = audit.apply(_failure_root, axis=1)
- audit = audit[
- [
- "buy_date",
- "sell_date",
- "buy_reason",
- "sell_reason",
- "entry_family",
- "exit_family",
- "holding_days",
- "holding_bucket",
- "return_pct",
- "mfe_pct",
- "mae_pct",
- "market_state_layer",
- "entry_qualification_layer",
- "position_management_layer",
- "aux_context_layer",
- "is_deep_oversold_family",
- "is_post_sell_rebound_family",
- "is_predictive_bridge_chain",
- "buy_plus_1d_return",
- "buy_plus_2d_return",
- "buy_plus_3d_return",
- "sell_plus_3d_followthrough",
- "sell_plus_5d_followthrough",
- "peak_before_exit",
- "drawdown_before_exit",
- "failure_shape",
- "failure_root",
- ]
- ].sort_values(["holding_bucket", "return_pct", "buy_date"])
- audit.to_csv(base_dir / "dragon_short_holding_audit.csv", index=False, encoding="utf-8-sig")
- failure_summary = (
- audit.groupby(["holding_bucket", "failure_root", "failure_shape"], dropna=False)
- .agg(
- trades=("buy_date", "count"),
- avg_return=("return_pct", "mean"),
- avg_mfe=("mfe_pct", "mean"),
- avg_mae=("mae_pct", "mean"),
- )
- .reset_index()
- .sort_values(["holding_bucket", "trades", "avg_return"], ascending=[True, False, True])
- )
- family_summary = (
- audit.groupby(["holding_bucket", "entry_family"], dropna=False)
- .agg(
- trades=("buy_date", "count"),
- win_rate=("return_pct", lambda s: float((s > 0).mean())),
- avg_return=("return_pct", "mean"),
- avg_mfe=("mfe_pct", "mean"),
- avg_mae=("mae_pct", "mean"),
- )
- .reset_index()
- .sort_values(["holding_bucket", "avg_return", "trades"], ascending=[True, True, False])
- )
- lines = [
- "# Dragon Short Holding Review",
- "",
- "- Branch: `alpha_first_selective_veto`.",
- "- Scope: only `00-05d` and `06-10d` trades.",
- f"- audited short trades: `{int(len(audit))}`",
- f"- `00-05d` avg_return: `{audit[audit['holding_bucket'] == '00-05d']['return_pct'].mean():.2%}`",
- f"- `06-10d` avg_return: `{audit[audit['holding_bucket'] == '06-10d']['return_pct'].mean():.2%}`",
- "",
- "## Failure Root Summary",
- ]
- for _, row in failure_summary.iterrows():
- lines.append(
- f"- `{row['holding_bucket']} / {row['failure_root']} / {row['failure_shape']}`: trades `{int(row['trades'])}`, "
- f"avg_return `{row['avg_return']:.2%}`, avg_mfe `{row['avg_mfe']:.2%}`, avg_mae `{row['avg_mae']:.2%}`"
- )
- worst_families = family_summary.groupby("holding_bucket", group_keys=False).head(5)
- lines.extend(["", "## Weak Entry Families"])
- for _, row in worst_families.iterrows():
- lines.append(
- f"- `{row['holding_bucket']} / {row['entry_family']}`: trades `{int(row['trades'])}`, win_rate `{row['win_rate']:.2%}`, "
- f"avg_return `{row['avg_return']:.2%}`, avg_mfe `{row['avg_mfe']:.2%}`, avg_mae `{row['avg_mae']:.2%}`"
- )
- bridge_count = int(audit["is_predictive_bridge_chain"].sum())
- deep_count = int(audit["is_deep_oversold_family"].sum())
- post_sell_count = int(audit["is_post_sell_rebound_family"].sum())
- entry_bad = int((audit["failure_root"] == "entry_bad").sum())
- hold_bad = int((audit["failure_root"] == "hold_bad").sum())
- exit_fast = int((audit["failure_root"] == "exit_too_fast").sum())
- lines.extend(
- [
- "",
- "## Quant Judgment",
- f"- Deep-oversold short trades: `{deep_count}`; post-sell-rebound short trades: `{post_sell_count}`; bridge trades: `{bridge_count}`.",
- f"- Root split: entry_bad `{entry_bad}`, hold_bad `{hold_bad}`, exit_too_fast `{exit_fast}`.",
- "- The next experiment pack should prioritize the dominant drag family and separate bad-entry veto from early-exit extension.",
- ]
- )
- (base_dir / "dragon_short_holding_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|