dragon_forward_observation_pipeline.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. from __future__ import annotations
  2. from datetime import datetime
  3. import json
  4. from pathlib import Path
  5. import pandas as pd
  6. import dragon_daily_signal_pipeline as daily
  7. import dragon_html_reports as html_reports
  8. import dragon_rollout_governance_check as rollout_governance
  9. def _load_csv(path: Path) -> pd.DataFrame:
  10. if not path.exists():
  11. return pd.DataFrame()
  12. return pd.read_csv(path, encoding="utf-8-sig")
  13. def _write_csv(df: pd.DataFrame, path: Path) -> None:
  14. df.to_csv(path, index=False, encoding="utf-8-sig")
  15. def _append_dedup(existing: pd.DataFrame, new_rows: pd.DataFrame, key_cols: list[str]) -> pd.DataFrame:
  16. if existing.empty:
  17. out = new_rows.copy()
  18. else:
  19. out = pd.concat([existing, new_rows], ignore_index=True, sort=False)
  20. out = out.drop_duplicates(subset=key_cols, keep="last")
  21. return out.sort_values(key_cols).reset_index(drop=True)
  22. def _json_default(value: object) -> object:
  23. if isinstance(value, (pd.Timestamp, datetime)):
  24. return value.isoformat()
  25. if pd.isna(value):
  26. return None
  27. return value
  28. def _load_state(path: Path) -> dict[str, object]:
  29. if not path.exists():
  30. return {}
  31. return json.loads(path.read_text(encoding="utf-8"))
  32. def _build_monitor_summary(monitor_df: pd.DataFrame) -> dict[str, object]:
  33. return {
  34. "warning_count": int((monitor_df["status"] == "warning").sum()),
  35. "missing_data_count": int((monitor_df["status"] == "missing_data").sum()),
  36. "hard_breach_count": int(monitor_df["status"].isin(["hard_breach", "missing_data"]).sum()),
  37. "warning_metrics": monitor_df.loc[monitor_df["status"] == "warning", "metric"].tolist(),
  38. "missing_data_metrics": monitor_df.loc[monitor_df["status"] == "missing_data", "metric"].tolist(),
  39. "hard_breach_metrics": monitor_df.loc[monitor_df["status"].isin(["hard_breach", "missing_data"]), "metric"].tolist(),
  40. }
  41. def _branch_event_signature(row: pd.Series) -> str:
  42. return f"{row['latest_real_event_date']}|{row['latest_real_event_side']}|{row['latest_real_event_reason']}"
  43. def _build_signal_changes(
  44. branch_status: pd.DataFrame,
  45. prev_state: dict[str, object],
  46. latest_bar_date: str,
  47. monitor_summary: dict[str, object],
  48. ) -> pd.DataFrame:
  49. prev_branches = prev_state.get("branches", {}) if isinstance(prev_state, dict) else {}
  50. prev_monitor = prev_state.get("monitor_summary", {}) if isinstance(prev_state, dict) else {}
  51. rows: list[dict[str, object]] = []
  52. for _, row in branch_status.iterrows():
  53. branch = str(row["branch"])
  54. prev = prev_branches.get(branch, {})
  55. indicator_context = (
  56. f"close={float(row['latest_close']):.3f},"
  57. f"a1={float(row['latest_a1']):.4f},"
  58. f"b1={float(row['latest_b1']):.4f},"
  59. f"c1={float(row['latest_c1']):.2f}"
  60. )
  61. event_context = str(row["events_today"]) if isinstance(row["events_today"], str) and row["events_today"] else "none"
  62. if not prev:
  63. rows.append(
  64. {
  65. "latest_bar_date": latest_bar_date,
  66. "branch": branch,
  67. "change_type": "branch_initialized",
  68. "old_value": "",
  69. "new_value": _branch_event_signature(row),
  70. "reason": "first forward-observation snapshot for this branch",
  71. "event_context": event_context,
  72. "indicator_context": indicator_context,
  73. }
  74. )
  75. continue
  76. if bool(prev.get("in_position")) != bool(row["in_position"]):
  77. rows.append(
  78. {
  79. "latest_bar_date": latest_bar_date,
  80. "branch": branch,
  81. "change_type": "position_changed",
  82. "old_value": str(prev.get("in_position")),
  83. "new_value": str(bool(row["in_position"])),
  84. "reason": "position state changed between forward snapshots",
  85. "event_context": event_context,
  86. "indicator_context": indicator_context,
  87. }
  88. )
  89. prev_sig = f"{prev.get('latest_real_event_date', '')}|{prev.get('latest_real_event_side', '')}|{prev.get('latest_real_event_reason', '')}"
  90. new_sig = _branch_event_signature(row)
  91. if prev_sig != new_sig:
  92. rows.append(
  93. {
  94. "latest_bar_date": latest_bar_date,
  95. "branch": branch,
  96. "change_type": "latest_real_event_changed",
  97. "old_value": prev_sig,
  98. "new_value": new_sig,
  99. "reason": "latest real-trade event changed",
  100. "event_context": event_context,
  101. "indicator_context": indicator_context,
  102. }
  103. )
  104. if int(row["events_today_count"]) > 0:
  105. rows.append(
  106. {
  107. "latest_bar_date": latest_bar_date,
  108. "branch": branch,
  109. "change_type": "new_event_on_latest_bar",
  110. "old_value": "",
  111. "new_value": str(row["events_today"]),
  112. "reason": "new signal fired on the latest market bar",
  113. "event_context": event_context,
  114. "indicator_context": indicator_context,
  115. }
  116. )
  117. prev_warn = int(prev_monitor.get("warning_count", 0)) if isinstance(prev_monitor, dict) else 0
  118. prev_hard = int(prev_monitor.get("hard_breach_count", 0)) if isinstance(prev_monitor, dict) else 0
  119. prev_missing = int(prev_monitor.get("missing_data_count", 0)) if isinstance(prev_monitor, dict) else 0
  120. warn = int(monitor_summary["warning_count"])
  121. hard = int(monitor_summary["hard_breach_count"])
  122. missing = int(monitor_summary["missing_data_count"])
  123. if warn != prev_warn:
  124. rows.append(
  125. {
  126. "latest_bar_date": latest_bar_date,
  127. "branch": "system",
  128. "change_type": "monitor_warning_count_changed",
  129. "old_value": str(prev_warn),
  130. "new_value": str(warn),
  131. "reason": "warning count changed versus prior forward snapshot",
  132. "event_context": "",
  133. "indicator_context": "",
  134. }
  135. )
  136. if hard != prev_hard:
  137. rows.append(
  138. {
  139. "latest_bar_date": latest_bar_date,
  140. "branch": "system",
  141. "change_type": "monitor_hard_breach_count_changed",
  142. "old_value": str(prev_hard),
  143. "new_value": str(hard),
  144. "reason": "hard breach count changed versus prior forward snapshot",
  145. "event_context": "",
  146. "indicator_context": "",
  147. }
  148. )
  149. if missing != prev_missing:
  150. rows.append(
  151. {
  152. "latest_bar_date": latest_bar_date,
  153. "branch": "system",
  154. "change_type": "monitor_missing_data_count_changed",
  155. "old_value": str(prev_missing),
  156. "new_value": str(missing),
  157. "reason": "missing-data metric count changed versus prior forward snapshot",
  158. "event_context": "",
  159. "indicator_context": "",
  160. }
  161. )
  162. return pd.DataFrame(rows)
  163. def _divergence_level(row: dict[str, object]) -> str:
  164. if int(row["hard_breach_count"]) > 0:
  165. return "review_required"
  166. if not bool(row["same_position_flag"]):
  167. return "review_required"
  168. if not bool(row["same_latest_real_event_flag"]):
  169. return "material"
  170. if int(row["warning_count"]) > 0:
  171. return "mild"
  172. return "none"
  173. def _update_monitor_history(existing: pd.DataFrame, current: pd.DataFrame, latest_bar_date: str) -> pd.DataFrame:
  174. current = current.copy()
  175. current["latest_bar_date"] = latest_bar_date
  176. cols = [
  177. "latest_bar_date",
  178. "metric",
  179. "actual_value",
  180. "status",
  181. "warning_threshold",
  182. "hard_threshold",
  183. "scope",
  184. "cadence",
  185. "action_on_warning",
  186. "action_on_hard_breach",
  187. "rationale",
  188. ]
  189. current = current[cols]
  190. history = _append_dedup(existing, current, ["latest_bar_date", "metric"])
  191. history["warning_streak"] = 0
  192. history["hard_breach_streak"] = 0
  193. for metric, idx in history.groupby("metric", sort=False).groups.items():
  194. subset = history.loc[list(idx)].sort_values("latest_bar_date")
  195. warn = 0
  196. hard = 0
  197. for row_idx, row in subset.iterrows():
  198. warn = warn + 1 if row["status"] == "warning" else 0
  199. hard = hard + 1 if row["status"] in {"hard_breach", "missing_data"} else 0
  200. history.at[row_idx, "warning_streak"] = warn
  201. history.at[row_idx, "hard_breach_streak"] = hard
  202. return history.sort_values(["latest_bar_date", "metric"]).reset_index(drop=True)
  203. def _build_weekly_summary(observation_log: pd.DataFrame, change_log: pd.DataFrame, divergence_log: pd.DataFrame, monitor_history: pd.DataFrame) -> pd.DataFrame:
  204. if observation_log.empty:
  205. return pd.DataFrame()
  206. unique_dates = sorted(observation_log["latest_bar_date"].unique())[-5:]
  207. obs = observation_log[observation_log["latest_bar_date"].isin(unique_dates)].copy()
  208. changes = change_log[change_log["latest_bar_date"].isin(unique_dates)].copy() if not change_log.empty else pd.DataFrame()
  209. divergence = divergence_log[divergence_log["latest_bar_date"].isin(unique_dates)].copy() if not divergence_log.empty else pd.DataFrame()
  210. monitor = monitor_history[monitor_history["latest_bar_date"].isin(unique_dates)].copy() if not monitor_history.empty else pd.DataFrame()
  211. rows: list[dict[str, object]] = []
  212. for branch, group in obs.groupby("branch"):
  213. rows.append(
  214. {
  215. "window_start": unique_dates[0],
  216. "window_end": unique_dates[-1],
  217. "observation_days": int(len(group)),
  218. "branch": branch,
  219. "days_in_position": int(group["in_position"].astype(bool).sum()),
  220. "latest_real_event_changed_count": int(
  221. 0
  222. if changes.empty
  223. else len(changes[(changes["branch"] == branch) & (changes["change_type"] == "latest_real_event_changed")])
  224. ),
  225. "new_event_days": int(group["events_today_count"].fillna(0).astype(int).gt(0).sum()),
  226. "warning_days": 0,
  227. "hard_breach_days": 0,
  228. "material_divergence_days": 0,
  229. }
  230. )
  231. rows.append(
  232. {
  233. "window_start": unique_dates[0],
  234. "window_end": unique_dates[-1],
  235. "observation_days": int(len(unique_dates)),
  236. "branch": "system_monitor",
  237. "days_in_position": 0,
  238. "latest_real_event_changed_count": 0,
  239. "new_event_days": 0,
  240. "warning_days": int(0 if monitor.empty else monitor[monitor["status"] == "warning"]["latest_bar_date"].nunique()),
  241. "hard_breach_days": int(0 if monitor.empty else monitor[monitor["status"] == "hard_breach"]["latest_bar_date"].nunique()),
  242. "material_divergence_days": int(
  243. 0 if divergence.empty else divergence["divergence_level"].isin(["material", "review_required"]).sum()
  244. ),
  245. }
  246. )
  247. return pd.DataFrame(rows)
  248. def main() -> None:
  249. base_dir = Path(__file__).resolve().parent
  250. forward_dir = base_dir / "forward_reports"
  251. forward_dir.mkdir(exist_ok=True)
  252. daily.main()
  253. branch_status = _load_csv(base_dir / "dragon_daily_branch_status.csv")
  254. monitor_snapshot = _load_csv(base_dir / "dragon_daily_monitor_snapshot.csv")
  255. manifest_path = base_dir / "dragon_daily_rc1_manifest.json"
  256. manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
  257. latest_bar_date = str(manifest["latest_bar_date"])
  258. request_date = str(manifest["as_of_request_date"])
  259. run_ts = datetime.now().isoformat(timespec="seconds")
  260. monitor_summary = _build_monitor_summary(monitor_snapshot)
  261. observation_rows = branch_status.copy()
  262. observation_rows["run_timestamp"] = run_ts
  263. observation_rows["request_date"] = request_date
  264. observation_rows["latest_bar_date"] = latest_bar_date
  265. observation_rows["monitor_warning_count"] = int(monitor_summary["warning_count"])
  266. observation_rows["monitor_hard_breach_count"] = int(monitor_summary["hard_breach_count"])
  267. observation_rows["monitor_missing_data_count"] = int(monitor_summary["missing_data_count"])
  268. observation_rows["latest_real_event"] = observation_rows.apply(_branch_event_signature, axis=1)
  269. observation_cols = [
  270. "run_timestamp",
  271. "request_date",
  272. "latest_bar_date",
  273. "branch",
  274. "in_position",
  275. "latest_real_event_date",
  276. "latest_real_event_side",
  277. "latest_real_event_reason",
  278. "latest_real_event",
  279. "events_today_count",
  280. "events_today",
  281. "latest_close",
  282. "latest_a1",
  283. "latest_b1",
  284. "latest_c1",
  285. "open_entry_date",
  286. "open_entry_reason",
  287. "open_holding_days",
  288. "open_return_pct",
  289. "monitor_warning_count",
  290. "monitor_hard_breach_count",
  291. "monitor_missing_data_count",
  292. ]
  293. observation_rows = observation_rows[observation_cols]
  294. observation_log_path = base_dir / "dragon_forward_observation_log.csv"
  295. existing_obs = _load_csv(observation_log_path)
  296. observation_log = _append_dedup(existing_obs, observation_rows, ["latest_bar_date", "branch"])
  297. _write_csv(observation_log, observation_log_path)
  298. state_path = base_dir / "dragon_forward_observation_state.json"
  299. prev_state = _load_state(state_path)
  300. change_log_path = base_dir / "dragon_signal_change_log.csv"
  301. existing_changes = _load_csv(change_log_path)
  302. change_rows = _build_signal_changes(branch_status, prev_state, latest_bar_date, monitor_summary)
  303. if not change_rows.empty:
  304. change_log = _append_dedup(existing_changes, change_rows, ["latest_bar_date", "branch", "change_type", "new_value"])
  305. else:
  306. change_log = existing_changes
  307. _write_csv(change_log, change_log_path)
  308. refined = branch_status[branch_status["branch"] == "alpha_first_glued_refined_hot_cap"].iloc[0]
  309. control = branch_status[branch_status["branch"] == "alpha_first_selective_veto"].iloc[0]
  310. actuals = {
  311. row["metric"]: row["actual_value"]
  312. for _, row in monitor_snapshot.iterrows()
  313. if pd.notna(row["actual_value"])
  314. }
  315. divergence_row = {
  316. "latest_bar_date": latest_bar_date,
  317. "request_date": request_date,
  318. "same_position_flag": bool(refined["in_position"]) == bool(control["in_position"]),
  319. "same_latest_real_event_flag": _branch_event_signature(refined) == _branch_event_signature(control),
  320. "refined_in_position": bool(refined["in_position"]),
  321. "control_in_position": bool(control["in_position"]),
  322. "refined_latest_event": _branch_event_signature(refined),
  323. "control_latest_event": _branch_event_signature(control),
  324. "next_open_avg_return_delta": float(actuals.get("next_open_avg_return_delta_vs_control", float("nan"))),
  325. "next_open_pf_delta": float(actuals.get("next_open_profit_factor_delta_vs_control", float("nan"))),
  326. "next_open_max_drawdown_refined": float(actuals.get("next_open_max_drawdown", float("nan"))),
  327. "next_open_max_loss_streak_refined": int(actuals.get("next_open_max_loss_streak", 0)),
  328. "warning_count": int(monitor_summary["warning_count"]),
  329. "hard_breach_count": int(monitor_summary["hard_breach_count"]),
  330. "missing_data_count": int(monitor_summary["missing_data_count"]),
  331. }
  332. divergence_row["divergence_level"] = _divergence_level(divergence_row)
  333. divergence_df = pd.DataFrame([divergence_row])
  334. divergence_log_path = base_dir / "dragon_branch_divergence_log.csv"
  335. existing_div = _load_csv(divergence_log_path)
  336. divergence_log = _append_dedup(existing_div, divergence_df, ["latest_bar_date"])
  337. _write_csv(divergence_log, divergence_log_path)
  338. monitor_history_path = base_dir / "dragon_monitor_history.csv"
  339. existing_monitor = _load_csv(monitor_history_path)
  340. monitor_history = _update_monitor_history(existing_monitor, monitor_snapshot, latest_bar_date)
  341. _write_csv(monitor_history, monitor_history_path)
  342. weekly_summary = _build_weekly_summary(observation_log, change_log, divergence_log, monitor_history)
  343. weekly_summary_path = base_dir / "dragon_forward_weekly_summary.csv"
  344. _write_csv(weekly_summary, weekly_summary_path)
  345. latest_changes = change_log[change_log["latest_bar_date"] == latest_bar_date].copy() if not change_log.empty else pd.DataFrame()
  346. change_lines = [
  347. "# Dragon Signal Change Review",
  348. "",
  349. f"- latest_bar_date: `{latest_bar_date}`",
  350. f"- change_count: `{len(latest_changes)}`",
  351. "",
  352. ]
  353. if latest_changes.empty:
  354. change_lines.append("- No state-change record was generated for the latest bar.")
  355. else:
  356. for _, row in latest_changes.iterrows():
  357. change_lines.extend(
  358. [
  359. f"## {row['branch']} / {row['change_type']}",
  360. f"- old: `{row['old_value']}`",
  361. f"- new: `{row['new_value']}`",
  362. f"- reason: {row['reason']}",
  363. f"- event_context: `{row['event_context']}`",
  364. f"- indicator_context: `{row['indicator_context']}`",
  365. "",
  366. ]
  367. )
  368. (base_dir / "dragon_signal_change_review.md").write_text("\n".join(change_lines) + "\n", encoding="utf-8")
  369. div_lines = [
  370. "# Dragon Branch Divergence Report",
  371. "",
  372. f"- latest_bar_date: `{latest_bar_date}`",
  373. f"- divergence_level: `{divergence_row['divergence_level']}`",
  374. f"- same_position_flag: `{divergence_row['same_position_flag']}`",
  375. f"- same_latest_real_event_flag: `{divergence_row['same_latest_real_event_flag']}`",
  376. f"- refined_latest_event: `{divergence_row['refined_latest_event']}`",
  377. f"- control_latest_event: `{divergence_row['control_latest_event']}`",
  378. f"- next_open_avg_return_delta: `{daily._format_pct(float(divergence_row['next_open_avg_return_delta']))}`",
  379. f"- next_open_pf_delta: `{daily._format_num(float(divergence_row['next_open_pf_delta']))}`",
  380. f"- warning_count: `{divergence_row['warning_count']}`",
  381. f"- hard_breach_count: `{divergence_row['hard_breach_count']}`",
  382. f"- missing_data_count: `{divergence_row['missing_data_count']}`",
  383. "",
  384. "## Recent Log",
  385. ]
  386. for _, row in divergence_log.tail(10).iterrows():
  387. div_lines.append(
  388. f"- `{row['latest_bar_date']}`: level `{row['divergence_level']}`, same_position `{row['same_position_flag']}`, same_event `{row['same_latest_real_event_flag']}`"
  389. )
  390. (base_dir / "dragon_branch_divergence_report.md").write_text("\n".join(div_lines) + "\n", encoding="utf-8")
  391. health_lines = [
  392. "# Dragon Monitor Health Report",
  393. "",
  394. f"- latest_bar_date: `{latest_bar_date}`",
  395. f"- warning_count: `{monitor_summary['warning_count']}`",
  396. f"- hard_breach_count: `{monitor_summary['hard_breach_count']}`",
  397. f"- missing_data_count: `{monitor_summary['missing_data_count']}`",
  398. "",
  399. "## Latest Metrics",
  400. ]
  401. latest_monitor = monitor_history[monitor_history["latest_bar_date"] == latest_bar_date].copy()
  402. for _, row in latest_monitor.iterrows():
  403. health_lines.append(
  404. f"- `{row['metric']}`: actual `{row['actual_value']}` | status `{row['status']}` | warning_streak `{int(row['warning_streak'])}` | hard_breach_streak `{int(row['hard_breach_streak'])}`"
  405. )
  406. (base_dir / "dragon_monitor_health_report.md").write_text("\n".join(health_lines) + "\n", encoding="utf-8")
  407. weekly_lines = [
  408. "# Dragon Forward Weekly Review",
  409. "",
  410. f"- latest_window_end: `{latest_bar_date}`",
  411. "",
  412. ]
  413. if weekly_summary.empty:
  414. weekly_lines.append("- Weekly summary is empty.")
  415. else:
  416. for _, row in weekly_summary.iterrows():
  417. weekly_lines.extend(
  418. [
  419. f"## {row['branch']}",
  420. f"- window: `{row['window_start']}` -> `{row['window_end']}`",
  421. f"- observation_days: `{int(row['observation_days'])}`",
  422. f"- days_in_position: `{int(row['days_in_position'])}`",
  423. f"- latest_real_event_changed_count: `{int(row['latest_real_event_changed_count'])}`",
  424. f"- new_event_days: `{int(row['new_event_days'])}`",
  425. f"- warning_days: `{int(row['warning_days'])}`",
  426. f"- hard_breach_days: `{int(row['hard_breach_days'])}`",
  427. f"- material_divergence_days: `{int(row['material_divergence_days'])}`",
  428. "",
  429. ]
  430. )
  431. (base_dir / "dragon_forward_weekly_review.md").write_text("\n".join(weekly_lines) + "\n", encoding="utf-8")
  432. state_payload = {
  433. "last_run_timestamp": run_ts,
  434. "request_date": request_date,
  435. "latest_bar_date": latest_bar_date,
  436. "branches": {
  437. str(row["branch"]): {
  438. key: _json_default(row[key])
  439. for key in branch_status.columns
  440. }
  441. for _, row in branch_status.iterrows()
  442. },
  443. "monitor_summary": monitor_summary,
  444. "divergence": {key: _json_default(value) for key, value in divergence_row.items()},
  445. }
  446. state_path.write_text(json.dumps(state_payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
  447. archive_paths = [
  448. observation_log_path,
  449. change_log_path,
  450. divergence_log_path,
  451. monitor_history_path,
  452. weekly_summary_path,
  453. ]
  454. for path in archive_paths:
  455. archived = forward_dir / f"{path.stem}_{latest_bar_date}{path.suffix}"
  456. if path.exists():
  457. path_df = _load_csv(path)
  458. _write_csv(path_df, archived)
  459. rollout_governance.main()
  460. html_reports.main()
  461. if __name__ == "__main__":
  462. main()