dragon_glued_veto_attribution.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. from __future__ import annotations
  2. from pathlib import Path
  3. from typing import Optional
  4. import pandas as pd
  5. from dragon_branch_configs import alpha_first_glued_selective_veto_config, alpha_first_selective_veto_config
  6. from dragon_strategy import DragonRuleEngine
  7. def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
  8. return pd.read_csv(base_dir / name, encoding="utf-8-sig")
  9. def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame:
  10. df = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
  11. df["date"] = pd.to_datetime(df["date"])
  12. return df.sort_values("date").reset_index(drop=True)
  13. def _profit_factor(series: pd.Series) -> float:
  14. gross_profit = series[series > 0].sum()
  15. gross_loss = -series[series < 0].sum()
  16. if gross_loss == 0:
  17. return float("inf") if gross_profit > 0 else 0.0
  18. return float(gross_profit / gross_loss)
  19. def _pct(value: Optional[float]) -> str:
  20. if value is None or pd.isna(value):
  21. return "n/a"
  22. if value == float("inf"):
  23. return "inf"
  24. return f"{float(value):.2%}"
  25. def _fmt_num(value: Optional[float]) -> str:
  26. if value is None or pd.isna(value):
  27. return "n/a"
  28. if value == float("inf"):
  29. return "inf"
  30. return f"{float(value):.2f}"
  31. def _holding_bucket(days: int) -> str:
  32. if days <= 5:
  33. return "00-05d"
  34. if days <= 10:
  35. return "06-10d"
  36. if days <= 20:
  37. return "11-20d"
  38. if days <= 40:
  39. return "21-40d"
  40. return "41d+"
  41. def _build_trade_quality(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
  42. trades = trades.copy()
  43. trades["buy_dt"] = pd.to_datetime(trades["buy_date"])
  44. trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
  45. trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
  46. pos_lookup = {dt.date().isoformat(): idx for idx, dt in enumerate(indicators["date"])}
  47. buy_a1: list[float] = []
  48. buy_b1: list[float] = []
  49. buy_c1: list[float] = []
  50. sell_a1: list[float] = []
  51. sell_b1: list[float] = []
  52. sell_c1: list[float] = []
  53. mfe_list: list[float] = []
  54. mae_list: list[float] = []
  55. exit_followthrough_list: list[float] = []
  56. exit_rebound_list: list[float] = []
  57. entry_forward_5d_list: list[float] = []
  58. indicator_by_date = indicators.set_index(indicators["date"].dt.date)
  59. for _, trade in trades.iterrows():
  60. buy_date = pd.Timestamp(trade["buy_date"]).date()
  61. sell_date = pd.Timestamp(trade["sell_date"]).date()
  62. entry_price = float(trade["buy_price"])
  63. exit_price = float(trade["sell_price"])
  64. buy_row = indicator_by_date.loc[buy_date]
  65. sell_row = indicator_by_date.loc[sell_date]
  66. buy_a1.append(float(buy_row["a1"]))
  67. buy_b1.append(float(buy_row["b1"]))
  68. buy_c1.append(float(buy_row["c1"]))
  69. sell_a1.append(float(sell_row["a1"]))
  70. sell_b1.append(float(sell_row["b1"]))
  71. sell_c1.append(float(sell_row["c1"]))
  72. window = indicators[(indicators["date"] >= pd.Timestamp(trade["buy_date"])) & (indicators["date"] <= pd.Timestamp(trade["sell_date"]))]
  73. max_high = float(window["high"].max())
  74. min_low = float(window["low"].min())
  75. mfe_list.append(max_high / entry_price - 1.0)
  76. mae_list.append(min_low / entry_price - 1.0)
  77. sell_idx = pos_lookup[trade["sell_date"]]
  78. future = indicators.iloc[sell_idx + 1 : sell_idx + 6]
  79. if future.empty:
  80. exit_followthrough_list.append(float("nan"))
  81. exit_rebound_list.append(float("nan"))
  82. else:
  83. exit_followthrough_list.append(float(future["low"].min()) / exit_price - 1.0)
  84. exit_rebound_list.append(float(future["high"].max()) / exit_price - 1.0)
  85. buy_idx = pos_lookup[trade["buy_date"]]
  86. entry_future = indicators.iloc[buy_idx + 1 : buy_idx + 6]
  87. if entry_future.empty:
  88. entry_forward_5d_list.append(float("nan"))
  89. else:
  90. entry_forward_5d_list.append(float(entry_future["close"].iloc[-1]) / entry_price - 1.0)
  91. trades["buy_a1"] = buy_a1
  92. trades["buy_b1"] = buy_b1
  93. trades["buy_c1"] = buy_c1
  94. trades["sell_a1"] = sell_a1
  95. trades["sell_b1"] = sell_b1
  96. trades["sell_c1"] = sell_c1
  97. trades["mfe_pct"] = mfe_list
  98. trades["mae_pct"] = mae_list
  99. trades["exit_followthrough_5d_pct"] = exit_followthrough_list
  100. trades["exit_rebound_5d_pct"] = exit_rebound_list
  101. trades["entry_forward_5d_pct"] = entry_forward_5d_list
  102. return trades
  103. def _run_branch(indicators: pd.DataFrame, config) -> pd.DataFrame:
  104. indicator_indexed = indicators.set_index("date", drop=False)
  105. engine = DragonRuleEngine(config=config)
  106. _, trades = engine.run(indicator_indexed)
  107. trades = trades.copy()
  108. return _build_trade_quality(trades, indicators)
  109. def _trade_key(df: pd.DataFrame) -> set[tuple[str, str, str, str]]:
  110. return set(zip(df["buy_date"], df["sell_date"], df["buy_reason"], df["sell_reason"]))
  111. def _veto_bucket(c1: float, b1: float) -> str:
  112. if c1 >= 40 and b1 >= 0.10:
  113. return "hot_positive_b1"
  114. if 23 <= c1 < 28 and b1 <= 0.02:
  115. return "low_weak_range"
  116. return "other"
  117. def _recommendation(row: pd.Series) -> tuple[str, str]:
  118. ret = float(row["return_pct"])
  119. holding = int(row["holding_days"])
  120. mfe = float(row["mfe_pct"])
  121. replacement_ret = row.get("replacement_return_pct")
  122. if ret > 0:
  123. if ret <= 0.01 and holding <= 15:
  124. if pd.notna(replacement_ret) and float(replacement_ret) <= ret:
  125. return "OBSERVE_VETO", "原交易仅微利,但当前替代路径没有更强,建议进一步细化 hot 过滤而不是直接全盘保留。"
  126. return "OBSERVE_VETO", "原交易为微利短单,暂不应视为强 alpha,先保留为观察样本。"
  127. return "OVER_VETO", "原交易本身为明显盈利单,当前过滤过度。"
  128. if holding <= 10 and ret < 0 and float(row["exit_followthrough_5d_pct"]) <= 0:
  129. return "KEEP_VETO", "短持仓亏损且卖出后继续走弱,属于应优先清理的噪音单。"
  130. if ret < 0 and mfe <= 0.02:
  131. return "KEEP_VETO", "持仓期间几乎没有有效盈利空间,删除逻辑合理。"
  132. return "KEEP_VETO", "总体为负收益短单,保留 veto 更符合当前 alpha-first 目标。"
  133. def _summary_line(group: pd.DataFrame) -> str:
  134. if group.empty:
  135. return "n/a"
  136. return (
  137. f"trades `{len(group)}`, win_rate `{_pct(float((group['return_pct'] > 0).mean()))}`, "
  138. f"avg_return `{_pct(float(group['return_pct'].mean()))}`, avg_mfe `{_pct(float(group['mfe_pct'].mean()))}`, "
  139. f"avg_mae `{_pct(float(group['mae_pct'].mean()))}`, avg_holding `{float(group['holding_days'].mean()):.2f}`"
  140. )
  141. def main() -> None:
  142. base_dir = Path(__file__).resolve().parent
  143. indicators = _load_indicator_snapshot(base_dir)
  144. workbook_events = _load_csv(base_dir, "true_trade_events.csv")
  145. alpha_trades = _run_branch(indicators, alpha_first_selective_veto_config())
  146. glued_trades = _run_branch(indicators, alpha_first_glued_selective_veto_config())
  147. workbook_buy = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "BUY")]["date"])
  148. workbook_sell = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "SELL")]["date"])
  149. alpha_set = _trade_key(alpha_trades)
  150. glued_set = _trade_key(glued_trades)
  151. removed = pd.DataFrame(
  152. sorted(alpha_set - glued_set),
  153. columns=["buy_date", "sell_date", "buy_reason", "sell_reason"],
  154. )
  155. rows: list[dict[str, object]] = []
  156. for _, removed_row in removed.iterrows():
  157. match = alpha_trades[
  158. (alpha_trades["buy_date"] == removed_row["buy_date"])
  159. & (alpha_trades["sell_date"] == removed_row["sell_date"])
  160. & (alpha_trades["buy_reason"] == removed_row["buy_reason"])
  161. & (alpha_trades["sell_reason"] == removed_row["sell_reason"])
  162. ]
  163. if match.empty:
  164. continue
  165. trade = match.iloc[0]
  166. sell_dt = pd.Timestamp(trade["sell_date"])
  167. replacement = glued_trades[
  168. (pd.to_datetime(glued_trades["buy_date"]) > sell_dt)
  169. & (pd.to_datetime(glued_trades["buy_date"]) <= sell_dt + pd.Timedelta(days=10))
  170. ].sort_values("buy_date")
  171. replacement_row = replacement.iloc[0] if not replacement.empty else None
  172. rec, rec_reason = _recommendation(
  173. pd.Series(
  174. {
  175. **trade.to_dict(),
  176. "replacement_return_pct": None if replacement_row is None else float(replacement_row["return_pct"]),
  177. }
  178. )
  179. )
  180. rows.append(
  181. {
  182. "buy_date": trade["buy_date"],
  183. "sell_date": trade["sell_date"],
  184. "buy_reason": trade["buy_reason"],
  185. "sell_reason": trade["sell_reason"],
  186. "holding_bucket": trade["holding_bucket"],
  187. "holding_days": int(trade["holding_days"]),
  188. "return_pct": float(trade["return_pct"]),
  189. "mfe_pct": float(trade["mfe_pct"]),
  190. "mae_pct": float(trade["mae_pct"]),
  191. "entry_forward_5d_pct": float(trade["entry_forward_5d_pct"]),
  192. "exit_followthrough_5d_pct": float(trade["exit_followthrough_5d_pct"]),
  193. "exit_rebound_5d_pct": float(trade["exit_rebound_5d_pct"]),
  194. "buy_a1": float(trade["buy_a1"]),
  195. "buy_b1": float(trade["buy_b1"]),
  196. "buy_c1": float(trade["buy_c1"]),
  197. "sell_a1": float(trade["sell_a1"]),
  198. "sell_b1": float(trade["sell_b1"]),
  199. "sell_c1": float(trade["sell_c1"]),
  200. "veto_bucket": _veto_bucket(float(trade["buy_c1"]), float(trade["buy_b1"])),
  201. "buy_aligned_with_workbook": trade["buy_date"] in workbook_buy,
  202. "sell_aligned_with_workbook": trade["sell_date"] in workbook_sell,
  203. "replacement_buy_date": "" if replacement_row is None else str(replacement_row["buy_date"]),
  204. "replacement_sell_date": "" if replacement_row is None else str(replacement_row["sell_date"]),
  205. "replacement_buy_reason": "" if replacement_row is None else str(replacement_row["buy_reason"]),
  206. "replacement_sell_reason": "" if replacement_row is None else str(replacement_row["sell_reason"]),
  207. "replacement_return_pct": float("nan") if replacement_row is None else float(replacement_row["return_pct"]),
  208. "replacement_gap_days": float("nan")
  209. if replacement_row is None
  210. else int((pd.Timestamp(replacement_row["buy_date"]) - sell_dt).days),
  211. "recommendation": rec,
  212. "recommendation_reason": rec_reason,
  213. }
  214. )
  215. attribution = pd.DataFrame(rows).sort_values(["veto_bucket", "buy_date"]).reset_index(drop=True)
  216. attribution.to_csv(base_dir / "dragon_glued_veto_attribution.csv", index=False, encoding="utf-8-sig")
  217. bucket_summary = (
  218. attribution.groupby("veto_bucket", dropna=False)
  219. .agg(
  220. trades=("buy_date", "count"),
  221. win_rate=("return_pct", lambda s: float((s > 0).mean())),
  222. avg_return=("return_pct", "mean"),
  223. avg_mfe=("mfe_pct", "mean"),
  224. avg_mae=("mae_pct", "mean"),
  225. avg_holding_days=("holding_days", "mean"),
  226. keep_veto_count=("recommendation", lambda s: int((s == "KEEP_VETO").sum())),
  227. observe_veto_count=("recommendation", lambda s: int((s == "OBSERVE_VETO").sum())),
  228. over_veto_count=("recommendation", lambda s: int((s == "OVER_VETO").sum())),
  229. )
  230. .reset_index()
  231. .sort_values(["trades", "avg_return"], ascending=[False, True])
  232. )
  233. bucket_summary.to_csv(base_dir / "dragon_glued_veto_bucket_summary.csv", index=False, encoding="utf-8-sig")
  234. lines = [
  235. "# Dragon Glued Veto Attribution Review",
  236. "",
  237. "## Snapshot",
  238. f"- removed trades vs current alpha-first: `{len(attribution)}`",
  239. f"- total avg_return of removed set: `{_pct(float(attribution['return_pct'].mean()))}`",
  240. f"- total win_rate of removed set: `{_pct(float((attribution['return_pct'] > 0).mean()))}`",
  241. f"- removed-set profit_factor: `{_fmt_num(_profit_factor(attribution['return_pct']))}`",
  242. "",
  243. "## Bucket Summary",
  244. ]
  245. for _, row in bucket_summary.iterrows():
  246. lines.append(
  247. f"- `{row['veto_bucket']}`: trades `{int(row['trades'])}`, win_rate `{_pct(float(row['win_rate']))}`, "
  248. f"avg_return `{_pct(float(row['avg_return']))}`, avg_mfe `{_pct(float(row['avg_mfe']))}`, "
  249. f"avg_mae `{_pct(float(row['avg_mae']))}`, avg_holding `{float(row['avg_holding_days']):.2f}`, "
  250. f"KEEP/OBSERVE/OVER = `{int(row['keep_veto_count'])}/{int(row['observe_veto_count'])}/{int(row['over_veto_count'])}`"
  251. )
  252. lines.extend(
  253. [
  254. "",
  255. "## Quant Judgment",
  256. f"- `low_weak_range`: {_summary_line(attribution[attribution['veto_bucket'] == 'low_weak_range'])}.",
  257. f"- `hot_positive_b1`: {_summary_line(attribution[attribution['veto_bucket'] == 'hot_positive_b1'])}.",
  258. "- `low_weak_range` is now a clean promotion candidate: all removed trades are short, losing, and there is no positive sample in this bucket.",
  259. "- `hot_positive_b1` is directionally correct but not fully clean: most removed trades are weak, but one micro-profit sample remains and should be used as the first refinement target.",
  260. "- Immediate next research action: keep the low bucket intact, and narrow the hot bucket rather than rolling back the whole glued veto branch.",
  261. "",
  262. "## Detailed Cards",
  263. ]
  264. )
  265. for _, row in attribution.iterrows():
  266. replacement = "none"
  267. if isinstance(row["replacement_buy_date"], str) and row["replacement_buy_date"]:
  268. replacement = (
  269. f"{row['replacement_buy_date']} -> {row['replacement_sell_date']} / "
  270. f"{row['replacement_buy_reason']} -> {row['replacement_sell_reason']} / "
  271. f"{_pct(row['replacement_return_pct'])}"
  272. )
  273. lines.extend(
  274. [
  275. f"### {row['buy_date']} -> {row['sell_date']}",
  276. f"- Bucket: `{row['veto_bucket']}`",
  277. f"- Trade: `{row['buy_reason']} -> {row['sell_reason']}` | `{int(row['holding_days'])}` days | return `{_pct(row['return_pct'])}`",
  278. f"- MFE / MAE: `{_pct(row['mfe_pct'])}` / `{_pct(row['mae_pct'])}`",
  279. f"- Entry 5d / Exit followthrough 5d: `{_pct(row['entry_forward_5d_pct'])}` / `{_pct(row['exit_followthrough_5d_pct'])}`",
  280. f"- Entry indicators: `a1={float(row['buy_a1']):.4f}` `b1={float(row['buy_b1']):.4f}` `c1={float(row['buy_c1']):.2f}`",
  281. f"- Workbook aligned: buy `{bool(row['buy_aligned_with_workbook'])}` / sell `{bool(row['sell_aligned_with_workbook'])}`",
  282. f"- Candidate replacement within 10d after exit: `{replacement}`",
  283. f"- Recommendation: `{row['recommendation']}` | {row['recommendation_reason']}",
  284. "",
  285. ]
  286. )
  287. (base_dir / "dragon_glued_veto_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  288. if __name__ == "__main__":
  289. main()