dragon_alpha_first_baseline.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. from __future__ import annotations
  2. import json
  3. from dataclasses import asdict
  4. from pathlib import Path
  5. import pandas as pd
  6. from dragon_shared import END_DATE, START_DATE, format_num as _format_num, format_pct as _format_pct, profit_factor
  7. from dragon_strategy import DragonRuleEngine
  8. from dragon_strategy_config import StrategyConfig
  9. def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame:
  10. df = pd.read_csv(base_dir / "dragon_indicator_snapshot.csv", encoding="utf-8-sig")
  11. df["date"] = pd.to_datetime(df["date"])
  12. return df.set_index("date", drop=False)
  13. def _load_true_trade_events(base_dir: Path) -> pd.DataFrame:
  14. return pd.read_csv(base_dir / "true_trade_events.csv", encoding="utf-8-sig")
  15. def _holding_bucket(days: int) -> str:
  16. if days <= 5:
  17. return "00-05d"
  18. if days <= 10:
  19. return "06-10d"
  20. if days <= 20:
  21. return "11-20d"
  22. if days <= 40:
  23. return "21-40d"
  24. return "41d+"
  25. def _event_match(strategy_events: pd.DataFrame, workbook_events: pd.DataFrame, side: str) -> tuple[int, int, int]:
  26. wb = set(workbook_events[(workbook_events["side"] == side) & (workbook_events["layer"] == "real_trade")]["date"])
  27. st = set(strategy_events[(strategy_events["side"] == side) & (strategy_events["layer"] == "real_trade")]["date"])
  28. return len(wb & st), len(wb - st), len(st - wb)
  29. def _segment_stats(df: pd.DataFrame) -> dict[str, float | int]:
  30. if df.empty:
  31. return {
  32. "trades": 0,
  33. "win_rate": float("nan"),
  34. "avg_return": float("nan"),
  35. "profit_factor": float("nan"),
  36. "compounded_return": float("nan"),
  37. }
  38. returns = df["return_pct"].astype(float)
  39. return {
  40. "trades": int(len(df)),
  41. "win_rate": float((returns > 0).mean()),
  42. "avg_return": float(returns.mean()),
  43. "profit_factor": profit_factor(returns),
  44. "compounded_return": float((1.0 + returns).prod() - 1.0),
  45. }
  46. def _build_walk_forward(trades: pd.DataFrame, branch_name: str) -> pd.DataFrame:
  47. years = sorted(int(year) for year in trades["sell_year"].unique())
  48. rows: list[dict[str, object]] = []
  49. for idx, test_year in enumerate(years):
  50. if idx >= 1:
  51. train_years = years[:idx]
  52. train_df = trades[trades["sell_year"].isin(train_years)]
  53. test_df = trades[trades["sell_year"] == test_year]
  54. rows.append(
  55. {
  56. "branch": branch_name,
  57. "scheme": "anchored_expanding",
  58. "train_start_year": train_years[0],
  59. "train_end_year": train_years[-1],
  60. "test_year": test_year,
  61. **{f"train_{k}": v for k, v in _segment_stats(train_df).items()},
  62. **{f"test_{k}": v for k, v in _segment_stats(test_df).items()},
  63. }
  64. )
  65. if idx >= 3:
  66. train_years = years[idx - 3 : idx]
  67. train_df = trades[trades["sell_year"].isin(train_years)]
  68. test_df = trades[trades["sell_year"] == test_year]
  69. rows.append(
  70. {
  71. "branch": branch_name,
  72. "scheme": "rolling_3y",
  73. "train_start_year": train_years[0],
  74. "train_end_year": train_years[-1],
  75. "test_year": test_year,
  76. **{f"train_{k}": v for k, v in _segment_stats(train_df).items()},
  77. **{f"test_{k}": v for k, v in _segment_stats(test_df).items()},
  78. }
  79. )
  80. return pd.DataFrame(rows)
  81. def _run_branch(
  82. name: str,
  83. config: StrategyConfig,
  84. indicator_df: pd.DataFrame,
  85. workbook_events: pd.DataFrame,
  86. first_date: str,
  87. last_date: str,
  88. ) -> tuple[dict[str, object], pd.DataFrame, pd.DataFrame, pd.DataFrame]:
  89. engine = DragonRuleEngine(config=config)
  90. events, trades = engine.run(indicator_df)
  91. start = max(first_date, START_DATE)
  92. end = min(last_date, END_DATE)
  93. events = events[(events["date"] >= start) & (events["date"] <= end)].copy()
  94. trades = trades[
  95. (trades["buy_date"] >= start)
  96. & (trades["buy_date"] <= end)
  97. & (trades["sell_date"] >= start)
  98. & (trades["sell_date"] <= end)
  99. ].copy()
  100. buy_overlap, buy_missing, buy_extra = _event_match(events, workbook_events, "BUY")
  101. sell_overlap, sell_missing, sell_extra = _event_match(events, workbook_events, "SELL")
  102. trades["branch"] = name
  103. trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
  104. trades["sell_year"] = trades["sell_dt"].dt.year.astype(int)
  105. trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
  106. returns = trades["return_pct"].astype(float) if not trades.empty else pd.Series(dtype=float)
  107. summary = {
  108. "branch": name,
  109. "trades": int(len(trades)),
  110. "win_rate": float((returns > 0).mean()) if not trades.empty else float("nan"),
  111. "avg_return": float(returns.mean()) if not trades.empty else float("nan"),
  112. "median_return": float(returns.median()) if not trades.empty else float("nan"),
  113. "profit_factor": profit_factor(returns) if not trades.empty else float("nan"),
  114. "real_buy_overlap": int(buy_overlap),
  115. "real_buy_missing": int(buy_missing),
  116. "real_buy_extra": int(buy_extra),
  117. "real_sell_overlap": int(sell_overlap),
  118. "real_sell_missing": int(sell_missing),
  119. "real_sell_extra": int(sell_extra),
  120. "short_00_05d_avg_return": float(trades[trades["holding_bucket"] == "00-05d"]["return_pct"].mean()),
  121. "short_06_10d_avg_return": float(trades[trades["holding_bucket"] == "06-10d"]["return_pct"].mean()),
  122. }
  123. bucket_rows: list[dict[str, object]] = []
  124. for bucket, group in trades.groupby("holding_bucket", dropna=False):
  125. bucket_rows.append(
  126. {
  127. "branch": name,
  128. "holding_bucket": bucket,
  129. "trades": int(len(group)),
  130. "win_rate": float((group["return_pct"] > 0).mean()),
  131. "avg_return": float(group["return_pct"].mean()),
  132. "profit_factor": profit_factor(group["return_pct"]),
  133. }
  134. )
  135. holding_df = pd.DataFrame(bucket_rows).sort_values("holding_bucket")
  136. walk_forward_df = _build_walk_forward(trades, name)
  137. return summary, trades, holding_df, walk_forward_df
  138. def _config_snapshot(config: StrategyConfig) -> dict[str, object]:
  139. snapshot = asdict(config)
  140. snapshot["disabled_rules"] = sorted(config.disabled_rules)
  141. return snapshot
  142. def main() -> None:
  143. base_dir = Path(__file__).resolve().parent
  144. indicator_df = _load_indicator_snapshot(base_dir)
  145. workbook_events = _load_true_trade_events(base_dir)
  146. first_date = workbook_events["date"].min()
  147. last_date = workbook_events["date"].max()
  148. workbook_config = StrategyConfig()
  149. alpha_config = workbook_config.with_updates(
  150. deep_oversold_selective_positive_b1_c1_max=15.3,
  151. deep_oversold_selective_shallow_c1_min=12.0,
  152. deep_oversold_selective_shallow_b1_min=-0.025,
  153. deep_oversold_selective_mixed_c1_max=10.2,
  154. deep_oversold_selective_mixed_require_no_ql=True,
  155. )
  156. workbook_summary, workbook_trades, workbook_holding, workbook_walk = _run_branch(
  157. "workbook_preserving",
  158. workbook_config,
  159. indicator_df,
  160. workbook_events,
  161. first_date,
  162. last_date,
  163. )
  164. alpha_summary, alpha_trades, alpha_holding, alpha_walk = _run_branch(
  165. "alpha_first_selective_veto",
  166. alpha_config,
  167. indicator_df,
  168. workbook_events,
  169. first_date,
  170. last_date,
  171. )
  172. summary_df = pd.DataFrame([workbook_summary, alpha_summary])
  173. baseline_row = summary_df[summary_df["branch"] == "workbook_preserving"].iloc[0]
  174. alpha_row = summary_df[summary_df["branch"] == "alpha_first_selective_veto"].iloc[0]
  175. comparison = pd.DataFrame(
  176. [
  177. {
  178. "metric": col,
  179. "workbook_preserving": baseline_row[col],
  180. "alpha_first_selective_veto": alpha_row[col],
  181. "delta_alpha_minus_workbook": alpha_row[col] - baseline_row[col]
  182. if isinstance(alpha_row[col], (int, float)) and isinstance(baseline_row[col], (int, float))
  183. else None,
  184. }
  185. for col in [
  186. "trades",
  187. "win_rate",
  188. "avg_return",
  189. "median_return",
  190. "profit_factor",
  191. "real_buy_overlap",
  192. "real_sell_overlap",
  193. "short_00_05d_avg_return",
  194. "short_06_10d_avg_return",
  195. ]
  196. ]
  197. )
  198. baseline_set = set(zip(workbook_trades["buy_date"], workbook_trades["sell_date"], workbook_trades["buy_reason"], workbook_trades["sell_reason"]))
  199. alpha_set = set(zip(alpha_trades["buy_date"], alpha_trades["sell_date"], alpha_trades["buy_reason"], alpha_trades["sell_reason"]))
  200. trade_diff_rows: list[dict[str, object]] = []
  201. for row in sorted(baseline_set - alpha_set):
  202. trade_diff_rows.append(
  203. {
  204. "change_type": "removed_from_alpha",
  205. "buy_date": row[0],
  206. "sell_date": row[1],
  207. "buy_reason": row[2],
  208. "sell_reason": row[3],
  209. }
  210. )
  211. for row in sorted(alpha_set - baseline_set):
  212. trade_diff_rows.append(
  213. {
  214. "change_type": "added_in_alpha",
  215. "buy_date": row[0],
  216. "sell_date": row[1],
  217. "buy_reason": row[2],
  218. "sell_reason": row[3],
  219. }
  220. )
  221. trade_diff_df = pd.DataFrame(trade_diff_rows)
  222. combined_holding = pd.concat([workbook_holding, alpha_holding], ignore_index=True)
  223. combined_walk = pd.concat([workbook_walk, alpha_walk], ignore_index=True)
  224. summary_df.to_csv(base_dir / "dragon_alpha_first_branch_summary.csv", index=False, encoding="utf-8-sig")
  225. comparison.to_csv(base_dir / "dragon_alpha_first_branch_comparison.csv", index=False, encoding="utf-8-sig")
  226. combined_holding.to_csv(base_dir / "dragon_alpha_first_branch_holding_buckets.csv", index=False, encoding="utf-8-sig")
  227. combined_walk.to_csv(base_dir / "dragon_alpha_first_branch_walk_forward.csv", index=False, encoding="utf-8-sig")
  228. trade_diff_df.to_csv(base_dir / "dragon_alpha_first_branch_trade_diff.csv", index=False, encoding="utf-8-sig")
  229. (base_dir / "dragon_alpha_first_config_snapshot.json").write_text(
  230. json.dumps(_config_snapshot(alpha_config), indent=2, ensure_ascii=False) + "\n",
  231. encoding="utf-8",
  232. )
  233. def _wf_stats(df: pd.DataFrame, scheme: str) -> tuple[int, int, float]:
  234. view = df[df["scheme"] == scheme]
  235. positive = int((view["test_avg_return"] > 0).sum()) if not view.empty else 0
  236. total = int(len(view))
  237. avg_oos = float(view["test_avg_return"].mean()) if not view.empty else float("nan")
  238. return positive, total, avg_oos
  239. wb_anchor_pos, wb_anchor_total, wb_anchor_avg = _wf_stats(workbook_walk, "anchored_expanding")
  240. af_anchor_pos, af_anchor_total, af_anchor_avg = _wf_stats(alpha_walk, "anchored_expanding")
  241. wb_roll_pos, wb_roll_total, wb_roll_avg = _wf_stats(workbook_walk, "rolling_3y")
  242. af_roll_pos, af_roll_total, af_roll_avg = _wf_stats(alpha_walk, "rolling_3y")
  243. lines = [
  244. "# Dragon Alpha-First Branch Report",
  245. "",
  246. "## Branches",
  247. f"- Evaluation window: `{START_DATE}` to `{END_DATE}`.",
  248. "- `workbook_preserving`: official formal baseline, preserves workbook structure as much as possible.",
  249. "- `alpha_first_selective_veto`: research branch using the current best narrow deep-oversold veto package.",
  250. "",
  251. "## Headline Comparison",
  252. f"- workbook_preserving: trades `{int(baseline_row['trades'])}`, avg_return `{_format_pct(float(baseline_row['avg_return']))}`, profit_factor `{_format_num(float(baseline_row['profit_factor']))}`, real BUY / SELL `{int(baseline_row['real_buy_overlap'])}/{int(baseline_row['real_sell_overlap'])}`",
  253. f"- alpha_first_selective_veto: trades `{int(alpha_row['trades'])}`, avg_return `{_format_pct(float(alpha_row['avg_return']))}`, profit_factor `{_format_num(float(alpha_row['profit_factor']))}`, real BUY / SELL `{int(alpha_row['real_buy_overlap'])}/{int(alpha_row['real_sell_overlap'])}`",
  254. "",
  255. "## Short-Holding Impact",
  256. f"- `00-05d` avg_return: workbook `{_format_pct(float(baseline_row['short_00_05d_avg_return']))}` vs alpha-first `{_format_pct(float(alpha_row['short_00_05d_avg_return']))}`",
  257. f"- `06-10d` avg_return: workbook `{_format_pct(float(baseline_row['short_06_10d_avg_return']))}` vs alpha-first `{_format_pct(float(alpha_row['short_06_10d_avg_return']))}`",
  258. "",
  259. "## Walk-Forward Comparison",
  260. f"- Anchored expanding: workbook positive `{wb_anchor_pos}/{wb_anchor_total}`, avg test return `{_format_pct(wb_anchor_avg)}`; alpha-first positive `{af_anchor_pos}/{af_anchor_total}`, avg test return `{_format_pct(af_anchor_avg)}`",
  261. f"- Rolling 3Y: workbook positive `{wb_roll_pos}/{wb_roll_total}`, avg test return `{_format_pct(wb_roll_avg)}`; alpha-first positive `{af_roll_pos}/{af_roll_total}`, avg test return `{_format_pct(af_roll_avg)}`",
  262. "",
  263. "## Trade-Diff Summary",
  264. f"- trades removed from alpha-first vs workbook: `{int((trade_diff_df['change_type'] == 'removed_from_alpha').sum())}`",
  265. f"- trades added in alpha-first vs workbook: `{int((trade_diff_df['change_type'] == 'added_in_alpha').sum())}`",
  266. "- Key removed deep-oversold trades are the narrow pathological subset identified in Track A, not the full weak-subtype family.",
  267. "",
  268. "## Governance",
  269. "- Keep `workbook_preserving` as the official reconstruction baseline.",
  270. "- Keep `alpha_first_selective_veto` as the leading performance-oriented research branch.",
  271. "- Do not merge alpha-first veto rules back into the official baseline unless the objective explicitly changes from workbook preservation to alpha-first optimization.",
  272. "",
  273. "## Quant Judgment",
  274. "- Stage 3 is complete once both baselines are explicitly separated and reproducible.",
  275. "- The workbook-preserving baseline remains the authoritative reconstruction target.",
  276. "- The alpha-first branch now has a concrete candidate baseline with better trade quality and better short-holding behavior, at the cost of expected workbook alignment loss.",
  277. "- Future work should choose one branch explicitly before optimizing further; the main unresolved technical decision is governance, not missing analysis.",
  278. ]
  279. (base_dir / "dragon_alpha_first_baseline.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  280. if __name__ == "__main__":
  281. main()