dragon_robustness_report.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. from __future__ import annotations
  2. from pathlib import Path
  3. import pandas as pd
  4. def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
  5. return pd.read_csv(base_dir / name, encoding="utf-8-sig")
  6. def _profit_factor(series: pd.Series) -> float:
  7. gross_profit = series[series > 0].sum()
  8. gross_loss = -series[series < 0].sum()
  9. if gross_loss == 0:
  10. return float("inf") if gross_profit > 0 else 0.0
  11. return float(gross_profit / gross_loss)
  12. def _holding_bucket(days: int) -> str:
  13. if days <= 5:
  14. return "00-05d"
  15. if days <= 10:
  16. return "06-10d"
  17. if days <= 20:
  18. return "11-20d"
  19. if days <= 40:
  20. return "21-40d"
  21. return "41d+"
  22. def _summarize(df: pd.DataFrame, group_cols: list[str]) -> pd.DataFrame:
  23. if group_cols:
  24. grouped = df.groupby(group_cols, dropna=False)
  25. else:
  26. grouped = df.groupby(lambda _: "ALL")
  27. rows: list[dict[str, object]] = []
  28. for key, group in grouped:
  29. if not isinstance(key, tuple):
  30. key = (key,)
  31. row = {col: val for col, val in zip(group_cols or ["scope"], key)}
  32. row["trades"] = int(len(group))
  33. row["win_rate"] = float((group["return_pct"] > 0).mean())
  34. row["avg_return"] = float(group["return_pct"].mean())
  35. row["median_return"] = float(group["return_pct"].median())
  36. row["profit_factor"] = _profit_factor(group["return_pct"])
  37. row["expectancy"] = float(group["return_pct"].mean())
  38. row["avg_holding_days"] = float(group["holding_days"].mean())
  39. row["avg_mfe_pct"] = float(group["mfe_pct"].mean())
  40. row["avg_mae_pct"] = float(group["mae_pct"].mean())
  41. row["avg_giveback_from_peak_pct"] = float(group["giveback_from_peak_pct"].mean())
  42. row["avg_exit_followthrough_5d_pct"] = float(group["exit_followthrough_5d_pct"].mean())
  43. row["avg_exit_rebound_5d_pct"] = float(group["exit_rebound_5d_pct"].mean())
  44. rows.append(row)
  45. return pd.DataFrame(rows)
  46. def _format_pct(value: float) -> str:
  47. if pd.isna(value):
  48. return "NA"
  49. if value == float("inf"):
  50. return "inf"
  51. return f"{value:.2%}"
  52. def _safe_value(df: pd.DataFrame, col: str) -> str:
  53. if df.empty:
  54. return "NA"
  55. value = df.iloc[0][col]
  56. if isinstance(value, float):
  57. return _format_pct(value) if "rate" in col or "return" in col or "pct" in col else f"{value:.2f}"
  58. return str(value)
  59. def _build_trade_quality(trades: pd.DataFrame, path_trace: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
  60. trades = trades.copy()
  61. path_trace = path_trace.copy()
  62. indicators = indicators.copy()
  63. trades["buy_dt"] = pd.to_datetime(trades["buy_date"])
  64. trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
  65. trades["sell_year"] = trades["sell_dt"].dt.year
  66. trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
  67. indicators["dt"] = pd.to_datetime(indicators["date"])
  68. indicators = indicators.sort_values("dt").reset_index(drop=True)
  69. pos_lookup = {dt.date().isoformat(): idx for idx, dt in enumerate(indicators["dt"])}
  70. mfe_list: list[float] = []
  71. mae_list: list[float] = []
  72. giveback_list: list[float] = []
  73. exit_followthrough_list: list[float] = []
  74. exit_rebound_list: list[float] = []
  75. for _, trade in trades.iterrows():
  76. buy_date = trade["buy_date"]
  77. sell_date = trade["sell_date"]
  78. entry_price = float(trade["buy_price"])
  79. exit_price = float(trade["sell_price"])
  80. window = indicators[(indicators["dt"] >= trade["buy_dt"]) & (indicators["dt"] <= trade["sell_dt"])]
  81. max_high = float(window["high"].max())
  82. min_low = float(window["low"].min())
  83. mfe_list.append(max_high / entry_price - 1.0)
  84. mae_list.append(min_low / entry_price - 1.0)
  85. giveback_list.append(exit_price / max_high - 1.0)
  86. sell_idx = pos_lookup.get(sell_date)
  87. if sell_idx is None:
  88. exit_followthrough_list.append(float("nan"))
  89. exit_rebound_list.append(float("nan"))
  90. continue
  91. future = indicators.iloc[sell_idx + 1 : sell_idx + 6]
  92. if future.empty:
  93. exit_followthrough_list.append(float("nan"))
  94. exit_rebound_list.append(float("nan"))
  95. continue
  96. exit_followthrough_list.append(float(future["low"].min()) / exit_price - 1.0)
  97. exit_rebound_list.append(float(future["high"].max()) / exit_price - 1.0)
  98. trades["mfe_pct"] = mfe_list
  99. trades["mae_pct"] = mae_list
  100. trades["giveback_from_peak_pct"] = giveback_list
  101. trades["exit_followthrough_5d_pct"] = exit_followthrough_list
  102. trades["exit_rebound_5d_pct"] = exit_rebound_list
  103. merge_cols = [
  104. "buy_date",
  105. "sell_date",
  106. "market_state_layer",
  107. "entry_qualification_layer",
  108. "position_management_layer",
  109. "aux_context_layer",
  110. "aux_signal_count",
  111. "hold_aux_buy_count",
  112. "post_exit_aux_sell_count",
  113. "next_buy_date",
  114. "layer_path",
  115. ]
  116. return trades.merge(path_trace[merge_cols], on=["buy_date", "sell_date"], how="left")
  117. def _build_rule_stability(df: pd.DataFrame) -> pd.DataFrame:
  118. baseline = {
  119. "trades": int(len(df)),
  120. "win_rate": float((df["return_pct"] > 0).mean()),
  121. "avg_return": float(df["return_pct"].mean()),
  122. "profit_factor": _profit_factor(df["return_pct"]),
  123. }
  124. rows: list[dict[str, object]] = []
  125. for rule_type, col in [("entry_rule", "buy_reason"), ("exit_rule", "sell_reason")]:
  126. for rule_name, group in df.groupby(col):
  127. remaining = df[df[col] != rule_name]
  128. row = {
  129. "rule_type": rule_type,
  130. "rule_name": rule_name,
  131. "removed_trades": int(len(group)),
  132. "remaining_trades": int(len(remaining)),
  133. "baseline_trades": baseline["trades"],
  134. "baseline_win_rate": baseline["win_rate"],
  135. "baseline_avg_return": baseline["avg_return"],
  136. "baseline_profit_factor": baseline["profit_factor"],
  137. }
  138. if remaining.empty:
  139. row["remaining_win_rate"] = float("nan")
  140. row["remaining_avg_return"] = float("nan")
  141. row["remaining_profit_factor"] = float("nan")
  142. else:
  143. row["remaining_win_rate"] = float((remaining["return_pct"] > 0).mean())
  144. row["remaining_avg_return"] = float(remaining["return_pct"].mean())
  145. row["remaining_profit_factor"] = _profit_factor(remaining["return_pct"])
  146. row["delta_win_rate"] = row["remaining_win_rate"] - baseline["win_rate"]
  147. row["delta_avg_return"] = row["remaining_avg_return"] - baseline["avg_return"]
  148. row["delta_profit_factor"] = row["remaining_profit_factor"] - baseline["profit_factor"]
  149. rows.append(row)
  150. return pd.DataFrame(rows)
  151. def main() -> None:
  152. base_dir = Path(__file__).resolve().parent
  153. trades = _load_csv(base_dir, "dragon_strategy_trades.csv")
  154. path_trace = _load_csv(base_dir, "dragon_trade_path_trace.csv")
  155. indicators = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
  156. quality = _build_trade_quality(trades, path_trace, indicators)
  157. quality.to_csv(base_dir / "dragon_trade_quality.csv", index=False, encoding="utf-8-sig")
  158. baseline_summary = _summarize(quality, [])
  159. holding_summary = _summarize(quality, ["holding_bucket"]).sort_values("holding_bucket")
  160. yearly_summary = _summarize(quality, ["sell_year"]).sort_values("sell_year")
  161. state_summary = _summarize(quality, ["market_state_layer"]).sort_values("trades", ascending=False)
  162. entry_summary = _summarize(quality, ["buy_reason"]).sort_values("trades", ascending=False)
  163. exit_summary = _summarize(quality, ["sell_reason"]).sort_values("trades", ascending=False)
  164. path_summary = _summarize(quality, ["market_state_layer", "entry_qualification_layer", "position_management_layer"]).sort_values(
  165. "trades", ascending=False
  166. )
  167. split_summary = _summarize(
  168. quality.assign(sample_split=quality["sell_year"].apply(lambda x: "2016-2020" if x <= 2020 else "2021-2025")),
  169. ["sample_split"],
  170. ).sort_values("sample_split")
  171. stability = _build_rule_stability(quality).sort_values(["rule_type", "delta_avg_return"])
  172. group_frames = []
  173. for group_type, df in [
  174. ("holding_bucket", holding_summary),
  175. ("sell_year", yearly_summary),
  176. ("market_state_layer", state_summary),
  177. ("buy_reason", entry_summary),
  178. ("sell_reason", exit_summary),
  179. ("path_core", path_summary),
  180. ("sample_split", split_summary),
  181. ]:
  182. group_frames.append(df.assign(group_type=group_type))
  183. group_summary = pd.concat(group_frames, ignore_index=True, sort=False)
  184. group_summary.to_csv(base_dir / "dragon_trade_group_summary.csv", index=False, encoding="utf-8-sig")
  185. yearly_summary.to_csv(base_dir / "dragon_yearly_performance.csv", index=False, encoding="utf-8-sig")
  186. entry_summary.assign(rule_type="entry_rule").to_csv(
  187. base_dir / "dragon_rule_contribution_entry.csv", index=False, encoding="utf-8-sig"
  188. )
  189. exit_summary.assign(rule_type="exit_rule").to_csv(
  190. base_dir / "dragon_rule_contribution_exit.csv", index=False, encoding="utf-8-sig"
  191. )
  192. stability.to_csv(base_dir / "dragon_rule_stability.csv", index=False, encoding="utf-8-sig")
  193. best_entry = entry_summary[entry_summary["trades"] >= 3].sort_values("avg_return", ascending=False).head(3)
  194. weakest_entry = entry_summary[entry_summary["trades"] >= 3].sort_values("avg_return", ascending=True).head(3)
  195. best_exit = exit_summary[exit_summary["trades"] >= 3].sort_values("avg_exit_followthrough_5d_pct").head(3)
  196. weakest_exit = exit_summary[exit_summary["trades"] >= 3].sort_values("avg_exit_followthrough_5d_pct", ascending=False).head(3)
  197. worst_rule_removal = stability.sort_values("delta_avg_return").head(5)
  198. best_rule_removal = stability.sort_values("delta_avg_return", ascending=False).head(5)
  199. lines = [
  200. "# Dragon Robustness Report",
  201. "",
  202. "## Baseline",
  203. f"- trades: `{int(baseline_summary.iloc[0]['trades'])}`",
  204. f"- win_rate: `{_format_pct(float(baseline_summary.iloc[0]['win_rate']))}`",
  205. f"- avg_return: `{_format_pct(float(baseline_summary.iloc[0]['avg_return']))}`",
  206. f"- median_return: `{_format_pct(float(baseline_summary.iloc[0]['median_return']))}`",
  207. f"- profit_factor: `{baseline_summary.iloc[0]['profit_factor']:.2f}`",
  208. f"- avg_mfe: `{_format_pct(float(baseline_summary.iloc[0]['avg_mfe_pct']))}`",
  209. f"- avg_mae: `{_format_pct(float(baseline_summary.iloc[0]['avg_mae_pct']))}`",
  210. f"- avg_exit_followthrough_5d: `{_format_pct(float(baseline_summary.iloc[0]['avg_exit_followthrough_5d_pct']))}`",
  211. "",
  212. "## Holding-Bucket View",
  213. ]
  214. for _, row in holding_summary.iterrows():
  215. lines.append(
  216. f"- `{row['holding_bucket']}`: trades `{int(row['trades'])}`, win_rate `{_format_pct(float(row['win_rate']))}`, "
  217. f"avg_return `{_format_pct(float(row['avg_return']))}`, avg_mfe `{_format_pct(float(row['avg_mfe_pct']))}`, "
  218. f"avg_mae `{_format_pct(float(row['avg_mae_pct']))}`"
  219. )
  220. lines.extend(["", "## Yearly View"])
  221. for _, row in yearly_summary.iterrows():
  222. lines.append(
  223. f"- `{int(row['sell_year'])}`: trades `{int(row['trades'])}`, win_rate `{_format_pct(float(row['win_rate']))}`, "
  224. f"avg_return `{_format_pct(float(row['avg_return']))}`, profit_factor `{row['profit_factor']:.2f}`"
  225. )
  226. lines.extend(["", "## Sample Split"])
  227. for _, row in split_summary.iterrows():
  228. lines.append(
  229. f"- `{row['sample_split']}`: trades `{int(row['trades'])}`, win_rate `{_format_pct(float(row['win_rate']))}`, "
  230. f"avg_return `{_format_pct(float(row['avg_return']))}`, profit_factor `{row['profit_factor']:.2f}`"
  231. )
  232. lines.extend(["", "## Regime View"])
  233. for _, row in state_summary.iterrows():
  234. lines.append(
  235. f"- `{row['market_state_layer']}`: trades `{int(row['trades'])}`, avg_return `{_format_pct(float(row['avg_return']))}`, "
  236. f"profit_factor `{row['profit_factor']:.2f}`, avg_mae `{_format_pct(float(row['avg_mae_pct']))}`"
  237. )
  238. lines.extend(["", "## Best Entry Rules"])
  239. for _, row in best_entry.iterrows():
  240. lines.append(
  241. f"- `{row['buy_reason']}`: trades `{int(row['trades'])}`, avg_return `{_format_pct(float(row['avg_return']))}`, "
  242. f"win_rate `{_format_pct(float(row['win_rate']))}`, avg_mfe `{_format_pct(float(row['avg_mfe_pct']))}`"
  243. )
  244. lines.extend(["", "## Weakest Entry Rules"])
  245. for _, row in weakest_entry.iterrows():
  246. lines.append(
  247. f"- `{row['buy_reason']}`: trades `{int(row['trades'])}`, avg_return `{_format_pct(float(row['avg_return']))}`, "
  248. f"win_rate `{_format_pct(float(row['win_rate']))}`, avg_mae `{_format_pct(float(row['avg_mae_pct']))}`"
  249. )
  250. lines.extend(["", "## Best Exit Rules"])
  251. for _, row in best_exit.iterrows():
  252. lines.append(
  253. f"- `{row['sell_reason']}`: trades `{int(row['trades'])}`, avg_exit_followthrough_5d `{_format_pct(float(row['avg_exit_followthrough_5d_pct']))}`, "
  254. f"avg_return `{_format_pct(float(row['avg_return']))}`"
  255. )
  256. lines.extend(["", "## Weakest Exit Rules"])
  257. for _, row in weakest_exit.iterrows():
  258. lines.append(
  259. f"- `{row['sell_reason']}`: trades `{int(row['trades'])}`, avg_exit_followthrough_5d `{_format_pct(float(row['avg_exit_followthrough_5d_pct']))}`, "
  260. f"avg_return `{_format_pct(float(row['avg_return']))}`"
  261. )
  262. lines.extend(["", "## Realized Contribution Stress Test"])
  263. lines.append("- Interpretation: this removes realized trades by rule from the current trade set; it is not yet a full re-run stability test.")
  264. lines.append("- Worst removals for average return:")
  265. for _, row in worst_rule_removal.iterrows():
  266. lines.append(
  267. f"- `{row['rule_type']} / {row['rule_name']}`: removed `{int(row['removed_trades'])}` trades, "
  268. f"delta_avg_return `{_format_pct(float(row['delta_avg_return']))}`, delta_profit_factor `{row['delta_profit_factor']:.2f}`"
  269. )
  270. lines.append("- Best removals for average return:")
  271. for _, row in best_rule_removal.iterrows():
  272. lines.append(
  273. f"- `{row['rule_type']} / {row['rule_name']}`: removed `{int(row['removed_trades'])}` trades, "
  274. f"delta_avg_return `{_format_pct(float(row['delta_avg_return']))}`, delta_profit_factor `{row['delta_profit_factor']:.2f}`"
  275. )
  276. lines.extend(
  277. [
  278. "",
  279. "## Next Stage-3 Gaps",
  280. "- Threshold perturbation is not yet formalized because the current strategy logic is still hard-coded, not parameterized.",
  281. "- A true leave-one-rule-out stability test still needs rerun-able switches in `dragon_strategy.py` rather than ex-post trade deletion only.",
  282. ]
  283. )
  284. (base_dir / "dragon_robustness_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  285. if __name__ == "__main__":
  286. main()