dragon_short_holding_audit.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. from __future__ import annotations
  2. from pathlib import Path
  3. import pandas as pd
  4. from dragon_branch_configs import alpha_first_selective_veto_config
  5. from dragon_strategy import DragonRuleEngine
  6. SHORT_BUCKETS = {"00-05d", "06-10d"}
  7. def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
  8. return pd.read_csv(base_dir / name, encoding="utf-8-sig")
  9. def _holding_bucket(days: int) -> str:
  10. if days <= 5:
  11. return "00-05d"
  12. if days <= 10:
  13. return "06-10d"
  14. if days <= 20:
  15. return "11-20d"
  16. if days <= 40:
  17. return "21-40d"
  18. return "41d+"
  19. def _entry_family(reason: str) -> str:
  20. return reason.split(":", 1)[0]
  21. def _exit_family(reason: str) -> str:
  22. return reason.split(":", 1)[0]
  23. def _compute_forward_return(indicators: pd.DataFrame, date_str: str, base_price: float, days: int) -> float:
  24. row = indicators[indicators["date_str"] == date_str]
  25. if row.empty:
  26. return float("nan")
  27. idx = int(row.iloc[0]["row_id"])
  28. target_idx = idx + days
  29. if target_idx >= len(indicators):
  30. return float("nan")
  31. return float(indicators.iloc[target_idx]["close"]) / base_price - 1.0
  32. def _compute_metrics(trade: pd.Series, indicators: pd.DataFrame) -> dict[str, float]:
  33. buy_date = trade["buy_date"]
  34. sell_date = trade["sell_date"]
  35. entry_price = float(trade["buy_price"])
  36. exit_price = float(trade["sell_price"])
  37. buy_row = indicators[indicators["date_str"] == buy_date]
  38. sell_row = indicators[indicators["date_str"] == sell_date]
  39. if buy_row.empty or sell_row.empty:
  40. return {}
  41. buy_idx = int(buy_row.iloc[0]["row_id"])
  42. sell_idx = int(sell_row.iloc[0]["row_id"])
  43. window = indicators.iloc[buy_idx : sell_idx + 1].copy()
  44. peak_close = float(window["close"].max())
  45. trough_close = float(window["close"].min())
  46. peak_before_exit = peak_close / entry_price - 1.0
  47. drawdown_before_exit = trough_close / entry_price - 1.0
  48. mfe_pct = float(window["high"].max()) / entry_price - 1.0
  49. mae_pct = float(window["low"].min()) / entry_price - 1.0
  50. sell_plus_3d = float("nan")
  51. sell_plus_5d = float("nan")
  52. if sell_idx + 3 < len(indicators):
  53. sell_plus_3d = float(indicators.iloc[sell_idx + 3]["close"]) / exit_price - 1.0
  54. if sell_idx + 5 < len(indicators):
  55. sell_plus_5d = float(indicators.iloc[sell_idx + 5]["close"]) / exit_price - 1.0
  56. return {
  57. "buy_plus_1d_return": _compute_forward_return(indicators, buy_date, entry_price, 1),
  58. "buy_plus_2d_return": _compute_forward_return(indicators, buy_date, entry_price, 2),
  59. "buy_plus_3d_return": _compute_forward_return(indicators, buy_date, entry_price, 3),
  60. "sell_plus_3d_followthrough": sell_plus_3d,
  61. "sell_plus_5d_followthrough": sell_plus_5d,
  62. "peak_before_exit": peak_before_exit,
  63. "drawdown_before_exit": drawdown_before_exit,
  64. "mfe_pct": mfe_pct,
  65. "mae_pct": mae_pct,
  66. }
  67. def _failure_shape(row: pd.Series) -> str:
  68. if bool(row["is_predictive_bridge_chain"]):
  69. return "bridge_trade"
  70. if row["return_pct"] >= 0 and row["sell_plus_3d_followthrough"] > 0.02:
  71. return "exit_too_early"
  72. if row["return_pct"] < 0:
  73. if row["holding_days"] <= 5 and row["peak_before_exit"] <= 0.01:
  74. return "immediate_failure"
  75. if row["peak_before_exit"] >= 0.02:
  76. return "rebound_then_fail"
  77. if 0 < row["peak_before_exit"] < 0.02:
  78. return "small_profit_reversal"
  79. if row["sell_plus_3d_followthrough"] > 0.03 or row["sell_plus_5d_followthrough"] > 0.04:
  80. return "exit_too_early"
  81. return "flat_noise"
  82. def _failure_root(row: pd.Series) -> str:
  83. if bool(row["is_predictive_bridge_chain"]):
  84. return "bridge_trade"
  85. if row["failure_shape"] == "exit_too_early":
  86. return "exit_too_fast"
  87. if row["failure_shape"] == "immediate_failure":
  88. return "entry_bad"
  89. if row["failure_shape"] in {"rebound_then_fail", "small_profit_reversal"}:
  90. return "hold_bad"
  91. return "mixed"
  92. def main() -> None:
  93. base_dir = Path(__file__).resolve().parent
  94. indicators = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
  95. indicators["date"] = pd.to_datetime(indicators["date"])
  96. indicators = indicators.sort_values("date").reset_index(drop=True)
  97. indicators["date_str"] = indicators["date"].dt.date.astype(str)
  98. indicators["row_id"] = indicators.index
  99. path_trace = _load_csv(base_dir, "dragon_trade_path_trace.csv")
  100. workbook_events = _load_csv(base_dir, "true_trade_events.csv")
  101. first_date = workbook_events["date"].min()
  102. last_date = workbook_events["date"].max()
  103. engine = DragonRuleEngine(alpha_first_selective_veto_config())
  104. events, trades = engine.run(indicators.set_index("date", drop=False))
  105. trades = trades[
  106. (trades["buy_date"] >= first_date)
  107. & (trades["buy_date"] <= last_date)
  108. & (trades["sell_date"] >= first_date)
  109. & (trades["sell_date"] <= last_date)
  110. ].copy()
  111. trades["entry_family"] = trades["buy_reason"].astype(str).map(_entry_family)
  112. trades["exit_family"] = trades["sell_reason"].astype(str).map(_exit_family)
  113. trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
  114. audit = trades[trades["holding_bucket"].isin(SHORT_BUCKETS)].copy()
  115. merge_cols = [
  116. "buy_date",
  117. "sell_date",
  118. "market_state_layer",
  119. "entry_qualification_layer",
  120. "position_management_layer",
  121. "aux_context_layer",
  122. ]
  123. audit = audit.merge(path_trace[merge_cols], on=["buy_date", "sell_date"], how="left")
  124. metric_rows = []
  125. for _, trade in audit.iterrows():
  126. metrics = _compute_metrics(trade, indicators)
  127. metric_rows.append(metrics)
  128. metric_df = pd.DataFrame(metric_rows)
  129. audit = pd.concat([audit.reset_index(drop=True), metric_df], axis=1)
  130. audit["is_deep_oversold_family"] = audit["entry_family"].eq("deep_oversold_rebound_buy")
  131. audit["is_post_sell_rebound_family"] = audit["entry_family"].eq("post_sell_rebound_buy")
  132. audit["is_predictive_bridge_chain"] = audit["buy_reason"].eq("predictive_error_reentry_buy") | audit["sell_reason"].eq("predictive_b1_break_exit")
  133. audit["failure_shape"] = audit.apply(_failure_shape, axis=1)
  134. audit["failure_root"] = audit.apply(_failure_root, axis=1)
  135. audit = audit[
  136. [
  137. "buy_date",
  138. "sell_date",
  139. "buy_reason",
  140. "sell_reason",
  141. "entry_family",
  142. "exit_family",
  143. "holding_days",
  144. "holding_bucket",
  145. "return_pct",
  146. "mfe_pct",
  147. "mae_pct",
  148. "market_state_layer",
  149. "entry_qualification_layer",
  150. "position_management_layer",
  151. "aux_context_layer",
  152. "is_deep_oversold_family",
  153. "is_post_sell_rebound_family",
  154. "is_predictive_bridge_chain",
  155. "buy_plus_1d_return",
  156. "buy_plus_2d_return",
  157. "buy_plus_3d_return",
  158. "sell_plus_3d_followthrough",
  159. "sell_plus_5d_followthrough",
  160. "peak_before_exit",
  161. "drawdown_before_exit",
  162. "failure_shape",
  163. "failure_root",
  164. ]
  165. ].sort_values(["holding_bucket", "return_pct", "buy_date"])
  166. audit.to_csv(base_dir / "dragon_short_holding_audit.csv", index=False, encoding="utf-8-sig")
  167. failure_summary = (
  168. audit.groupby(["holding_bucket", "failure_root", "failure_shape"], dropna=False)
  169. .agg(
  170. trades=("buy_date", "count"),
  171. avg_return=("return_pct", "mean"),
  172. avg_mfe=("mfe_pct", "mean"),
  173. avg_mae=("mae_pct", "mean"),
  174. )
  175. .reset_index()
  176. .sort_values(["holding_bucket", "trades", "avg_return"], ascending=[True, False, True])
  177. )
  178. family_summary = (
  179. audit.groupby(["holding_bucket", "entry_family"], dropna=False)
  180. .agg(
  181. trades=("buy_date", "count"),
  182. win_rate=("return_pct", lambda s: float((s > 0).mean())),
  183. avg_return=("return_pct", "mean"),
  184. avg_mfe=("mfe_pct", "mean"),
  185. avg_mae=("mae_pct", "mean"),
  186. )
  187. .reset_index()
  188. .sort_values(["holding_bucket", "avg_return", "trades"], ascending=[True, True, False])
  189. )
  190. lines = [
  191. "# Dragon Short Holding Review",
  192. "",
  193. "- Branch: `alpha_first_selective_veto`.",
  194. "- Scope: only `00-05d` and `06-10d` trades.",
  195. f"- audited short trades: `{int(len(audit))}`",
  196. f"- `00-05d` avg_return: `{audit[audit['holding_bucket'] == '00-05d']['return_pct'].mean():.2%}`",
  197. f"- `06-10d` avg_return: `{audit[audit['holding_bucket'] == '06-10d']['return_pct'].mean():.2%}`",
  198. "",
  199. "## Failure Root Summary",
  200. ]
  201. for _, row in failure_summary.iterrows():
  202. lines.append(
  203. f"- `{row['holding_bucket']} / {row['failure_root']} / {row['failure_shape']}`: trades `{int(row['trades'])}`, "
  204. f"avg_return `{row['avg_return']:.2%}`, avg_mfe `{row['avg_mfe']:.2%}`, avg_mae `{row['avg_mae']:.2%}`"
  205. )
  206. worst_families = family_summary.groupby("holding_bucket", group_keys=False).head(5)
  207. lines.extend(["", "## Weak Entry Families"])
  208. for _, row in worst_families.iterrows():
  209. lines.append(
  210. f"- `{row['holding_bucket']} / {row['entry_family']}`: trades `{int(row['trades'])}`, win_rate `{row['win_rate']:.2%}`, "
  211. f"avg_return `{row['avg_return']:.2%}`, avg_mfe `{row['avg_mfe']:.2%}`, avg_mae `{row['avg_mae']:.2%}`"
  212. )
  213. bridge_count = int(audit["is_predictive_bridge_chain"].sum())
  214. deep_count = int(audit["is_deep_oversold_family"].sum())
  215. post_sell_count = int(audit["is_post_sell_rebound_family"].sum())
  216. entry_bad = int((audit["failure_root"] == "entry_bad").sum())
  217. hold_bad = int((audit["failure_root"] == "hold_bad").sum())
  218. exit_fast = int((audit["failure_root"] == "exit_too_fast").sum())
  219. lines.extend(
  220. [
  221. "",
  222. "## Quant Judgment",
  223. f"- Deep-oversold short trades: `{deep_count}`; post-sell-rebound short trades: `{post_sell_count}`; bridge trades: `{bridge_count}`.",
  224. f"- Root split: entry_bad `{entry_bad}`, hold_bad `{hold_bad}`, exit_too_fast `{exit_fast}`.",
  225. "- The next experiment pack should prioritize the dominant drag family and separate bad-entry veto from early-exit extension.",
  226. ]
  227. )
  228. (base_dir / "dragon_short_holding_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  229. if __name__ == "__main__":
  230. main()