dragon_followthrough_mid_exit_review.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. from __future__ import annotations
  2. from pathlib import Path
  3. import pandas as pd
  4. from dragon_branch_configs import (
  5. alpha_first_glued_followthrough_mid_exit_probe_config,
  6. alpha_first_glued_followthrough_probe_config,
  7. alpha_first_glued_refined_hot_cap_config,
  8. )
  9. from dragon_execution_common import apply_execution_model, summary
  10. from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine
  11. from dragon_shared import END_DATE, START_DATE, format_num as _format_num, format_pct as _format_pct
  12. from dragon_strategy import DragonRuleEngine
  13. REENTRY_REASON_PREFIX = "glued_followthrough_reentry_buy:confirmed_"
  14. OUTPUT_BRANCH_SUMMARY = "dragon_followthrough_mid_exit_branch_summary.csv"
  15. OUTPUT_TRADE_DETAILS = "dragon_followthrough_mid_exit_trade_details.csv"
  16. OUTPUT_TRADE_DIFF = "dragon_followthrough_mid_exit_trade_diff.csv"
  17. OUTPUT_REVIEW = "dragon_followthrough_mid_exit_review.md"
  18. def _load_history() -> pd.DataFrame:
  19. engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date=None))
  20. raw = engine.fetch_daily_data(include_intraday_snapshot=False)
  21. indicators = engine.compute(raw.reset_index(drop=False).rename(columns={"index": "date"}))
  22. indicators["date"] = pd.to_datetime(indicators["date"])
  23. return indicators.sort_values("date").reset_index(drop=True)
  24. def _release_window_trades(trades: pd.DataFrame) -> pd.DataFrame:
  25. return trades[
  26. (trades["buy_date"] >= START_DATE)
  27. & (trades["buy_date"] <= END_DATE)
  28. & (trades["sell_date"] >= START_DATE)
  29. & (trades["sell_date"] <= END_DATE)
  30. ].copy()
  31. def _add_execution_prices(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
  32. trades = trades.copy()
  33. lookup = indicators.set_index(indicators["date"].dt.date)
  34. next_by_date = {
  35. indicators.iloc[idx]["date"].date().isoformat(): indicators.iloc[idx + 1]
  36. for idx in range(len(indicators) - 1)
  37. }
  38. same_entry: list[float] = []
  39. same_exit: list[float] = []
  40. next_open_entry: list[float] = []
  41. next_open_exit: list[float] = []
  42. for _, trade in trades.iterrows():
  43. buy_row = lookup.loc[pd.Timestamp(trade["buy_date"]).date()]
  44. sell_row = lookup.loc[pd.Timestamp(trade["sell_date"]).date()]
  45. buy_next = next_by_date.get(trade["buy_date"])
  46. sell_next = next_by_date.get(trade["sell_date"])
  47. same_entry.append(float(buy_row["close"]))
  48. same_exit.append(float(sell_row["close"]))
  49. next_open_entry.append(float("nan") if buy_next is None else float(buy_next["open"]))
  50. next_open_exit.append(float("nan") if sell_next is None else float(sell_next["open"]))
  51. trades["exec_same_close_entry"] = same_entry
  52. trades["exec_same_close_exit"] = same_exit
  53. trades["exec_next_open_entry"] = next_open_entry
  54. trades["exec_next_open_exit"] = next_open_exit
  55. return trades
  56. def _run_branch(indicators: pd.DataFrame, name: str, config) -> tuple[pd.DataFrame, pd.DataFrame]:
  57. indexed = indicators.set_index("date", drop=False)
  58. _, trades = DragonRuleEngine(config=config).run(indexed)
  59. trades = _release_window_trades(trades)
  60. trades.insert(0, "branch", name)
  61. return indexed, trades
  62. def _branch_summary(indicators: pd.DataFrame, name: str, trades: pd.DataFrame) -> list[dict[str, object]]:
  63. exec_trades = _add_execution_prices(trades, indicators)
  64. same = summary(name, apply_execution_model(exec_trades, "same_close", 0.0))
  65. nxt = summary(name, apply_execution_model(exec_trades, "next_open", 0.0))
  66. return [
  67. {
  68. "branch": name,
  69. "execution_model": "same_close",
  70. "trades": int(same["trades"]),
  71. "win_rate": float(same["win_rate"]),
  72. "avg_return": float(same["avg_return"]),
  73. "profit_factor": float(same["profit_factor"]),
  74. "compounded_return": float(same["compounded_return"]),
  75. "max_drawdown": float(same["max_drawdown"]),
  76. },
  77. {
  78. "branch": name,
  79. "execution_model": "next_open",
  80. "trades": int(nxt["trades"]),
  81. "win_rate": float(nxt["win_rate"]),
  82. "avg_return": float(nxt["avg_return"]),
  83. "profit_factor": float(nxt["profit_factor"]),
  84. "compounded_return": float(nxt["compounded_return"]),
  85. "max_drawdown": float(nxt["max_drawdown"]),
  86. },
  87. ]
  88. def _reentry_trade_details(indicators: pd.DataFrame, branch: str, trades: pd.DataFrame) -> pd.DataFrame:
  89. details = trades[trades["buy_reason"].astype(str).str.startswith(REENTRY_REASON_PREFIX)].copy()
  90. if details.empty:
  91. return pd.DataFrame(
  92. columns=[
  93. "branch",
  94. "buy_date",
  95. "buy_reason",
  96. "sell_date",
  97. "sell_reason",
  98. "holding_days",
  99. "return_pct",
  100. "next_open_return_pct",
  101. "post_exit_max_close_return_10b",
  102. "post_exit_min_close_return_10b",
  103. ]
  104. )
  105. indicators = indicators.sort_values("date").reset_index(drop=True)
  106. date_to_pos = {row.date().isoformat(): idx for idx, row in enumerate(indicators["date"])}
  107. next_by_date = {
  108. indicators.iloc[idx]["date"].date().isoformat(): indicators.iloc[idx + 1]
  109. for idx in range(len(indicators) - 1)
  110. }
  111. rows: list[dict[str, object]] = []
  112. for _, trade in details.iterrows():
  113. sell_pos = date_to_pos[trade["sell_date"]]
  114. post10 = indicators.iloc[sell_pos + 1 : sell_pos + 11].copy()
  115. buy_next = next_by_date.get(trade["buy_date"])
  116. sell_next = next_by_date.get(trade["sell_date"])
  117. next_open_return = float("nan")
  118. if buy_next is not None and sell_next is not None:
  119. next_open_return = float(sell_next["open"]) / float(buy_next["open"]) - 1.0
  120. rows.append(
  121. {
  122. "branch": branch,
  123. "buy_date": trade["buy_date"],
  124. "buy_reason": trade["buy_reason"],
  125. "sell_date": trade["sell_date"],
  126. "sell_reason": trade["sell_reason"],
  127. "holding_days": int(trade["holding_days"]),
  128. "return_pct": float(trade["return_pct"]),
  129. "next_open_return_pct": next_open_return,
  130. "post_exit_max_close_return_10b": float(post10["close"].max()) / float(trade["sell_price"]) - 1.0 if not post10.empty else float("nan"),
  131. "post_exit_min_close_return_10b": float(post10["close"].min()) / float(trade["sell_price"]) - 1.0 if not post10.empty else float("nan"),
  132. }
  133. )
  134. return pd.DataFrame(rows)
  135. def _trade_diff(mid_probe: pd.DataFrame, mid_exit_probe: pd.DataFrame) -> pd.DataFrame:
  136. key_cols = ["buy_date", "sell_date", "buy_reason", "sell_reason"]
  137. left = {
  138. (row.buy_date, row.sell_date, row.buy_reason, row.sell_reason)
  139. for row in mid_probe.itertuples()
  140. }
  141. right = {
  142. (row.buy_date, row.sell_date, row.buy_reason, row.sell_reason)
  143. for row in mid_exit_probe.itertuples()
  144. }
  145. rows: list[dict[str, object]] = []
  146. for _, trade in mid_probe.iterrows():
  147. key = tuple(trade[col] for col in key_cols)
  148. if key not in right:
  149. rows.append({"status": "removed_vs_mid_probe", **trade.to_dict()})
  150. for _, trade in mid_exit_probe.iterrows():
  151. key = tuple(trade[col] for col in key_cols)
  152. if key not in left:
  153. rows.append({"status": "added_vs_mid_probe", **trade.to_dict()})
  154. return pd.DataFrame(rows)
  155. def _build_review(summary_df: pd.DataFrame, detail_df: pd.DataFrame, diff_df: pd.DataFrame) -> str:
  156. same = summary_df[summary_df["execution_model"] == "same_close"].set_index("branch")
  157. nxt = summary_df[summary_df["execution_model"] == "next_open"].set_index("branch")
  158. mid_probe_key = "alpha_first_glued_followthrough_probe"
  159. exit_probe_key = "alpha_first_glued_followthrough_mid_exit_probe"
  160. base_key = "alpha_first_glued_refined_hot_cap"
  161. lines = [
  162. "# Dragon Followthrough Mid Exit Review",
  163. "",
  164. "## Scope",
  165. "- focus: shadow-only exit treatment for `mid_zone_very_weak_b1` followthrough reentry",
  166. "- objective: test whether the current `knife_take_profit_2_glued` path is clipping a valid repaired trend too early",
  167. "",
  168. "## Branch Summary",
  169. f"- base same_close avg `{_format_pct(float(same.loc[base_key, 'avg_return']))}` | PF `{_format_num(float(same.loc[base_key, 'profit_factor']))}` | comp `{_format_pct(float(same.loc[base_key, 'compounded_return']))}`",
  170. f"- mid_probe same_close avg `{_format_pct(float(same.loc[mid_probe_key, 'avg_return']))}` | PF `{_format_num(float(same.loc[mid_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(same.loc[mid_probe_key, 'compounded_return']))}`",
  171. f"- mid_exit_probe same_close avg `{_format_pct(float(same.loc[exit_probe_key, 'avg_return']))}` | PF `{_format_num(float(same.loc[exit_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(same.loc[exit_probe_key, 'compounded_return']))}`",
  172. f"- base next_open avg `{_format_pct(float(nxt.loc[base_key, 'avg_return']))}` | PF `{_format_num(float(nxt.loc[base_key, 'profit_factor']))}` | comp `{_format_pct(float(nxt.loc[base_key, 'compounded_return']))}`",
  173. f"- mid_probe next_open avg `{_format_pct(float(nxt.loc[mid_probe_key, 'avg_return']))}` | PF `{_format_num(float(nxt.loc[mid_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(nxt.loc[mid_probe_key, 'compounded_return']))}`",
  174. f"- mid_exit_probe next_open avg `{_format_pct(float(nxt.loc[exit_probe_key, 'avg_return']))}` | PF `{_format_num(float(nxt.loc[exit_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(nxt.loc[exit_probe_key, 'compounded_return']))}`",
  175. "",
  176. "## Reentry Trade Detail",
  177. ]
  178. for row in detail_df.itertuples(index=False):
  179. lines.append(
  180. f"- `{row.branch}` | `{row.buy_date}` -> `{row.sell_date}` | same_close `{_format_pct(float(row.return_pct))}` | "
  181. f"next_open `{_format_pct(float(row.next_open_return_pct))}` | sell `{row.sell_reason}` | "
  182. f"post_exit_max_10b `{_format_pct(float(row.post_exit_max_close_return_10b))}`"
  183. )
  184. lines.extend(["", "## Trade Path Diff Vs Mid Probe"])
  185. for row in diff_df.itertuples(index=False):
  186. lines.append(
  187. f"- `{row.status}` | `{row.buy_date}` -> `{row.sell_date}` | `{row.buy_reason}` -> `{row.sell_reason}` | return `{_format_pct(float(row.return_pct))}`"
  188. )
  189. lines.extend(
  190. [
  191. "",
  192. "## Judgment",
  193. "- The narrow mid-exit probe is materially better than the original mid probe on the only confirmed followthrough sample.",
  194. "- It converts the repaired-trend trade from a small loss into a long hold ending with a high-regime exit.",
  195. "- It also improves branch-level compounded return versus both the original mid probe and the current RC1 branch in this replay.",
  196. "- This is still a one-path result, so it should remain shadow-only for now, but it is strong enough to keep under daily observation.",
  197. ]
  198. )
  199. return "\n".join(lines) + "\n"
  200. def main() -> None:
  201. base_dir = Path(__file__).resolve().parent
  202. indicators = _load_history()
  203. branches = [
  204. ("alpha_first_glued_refined_hot_cap", alpha_first_glued_refined_hot_cap_config()),
  205. ("alpha_first_glued_followthrough_probe", alpha_first_glued_followthrough_probe_config()),
  206. ("alpha_first_glued_followthrough_mid_exit_probe", alpha_first_glued_followthrough_mid_exit_probe_config()),
  207. ]
  208. summary_rows: list[dict[str, object]] = []
  209. detail_rows: list[pd.DataFrame] = []
  210. branch_trades: dict[str, pd.DataFrame] = {}
  211. for name, config in branches:
  212. _, trades = _run_branch(indicators, name, config)
  213. branch_trades[name] = trades
  214. summary_rows.extend(_branch_summary(indicators, name, trades))
  215. detail_rows.append(_reentry_trade_details(indicators, name, trades))
  216. summary_df = pd.DataFrame(summary_rows)
  217. detail_df = pd.concat(detail_rows, ignore_index=True)
  218. diff_df = _trade_diff(
  219. branch_trades["alpha_first_glued_followthrough_probe"],
  220. branch_trades["alpha_first_glued_followthrough_mid_exit_probe"],
  221. )
  222. review = _build_review(summary_df, detail_df, diff_df)
  223. summary_df.to_csv(base_dir / OUTPUT_BRANCH_SUMMARY, index=False, encoding="utf-8-sig")
  224. detail_df.to_csv(base_dir / OUTPUT_TRADE_DETAILS, index=False, encoding="utf-8-sig")
  225. diff_df.to_csv(base_dir / OUTPUT_TRADE_DIFF, index=False, encoding="utf-8-sig")
  226. (base_dir / OUTPUT_REVIEW).write_text(review, encoding="utf-8")
  227. if __name__ == "__main__":
  228. main()