dragon_rollout_governance.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. from __future__ import annotations
  2. from dataclasses import asdict, dataclass
  3. from typing import Any
  4. import pandas as pd
  5. FORWARD_OK = "FORWARD_OK"
  6. HOLD_AND_REVIEW = "HOLD_AND_REVIEW"
  7. ROLLBACK_REVIEW_REQUIRED = "ROLLBACK_REVIEW_REQUIRED"
  8. @dataclass(frozen=True)
  9. class GateResult:
  10. gate: str
  11. status: str
  12. value: str
  13. threshold: str
  14. detail: str
  15. action: str
  16. def _max_iso_date(values: pd.Series) -> str:
  17. cleaned = values.dropna().astype(str)
  18. if cleaned.empty:
  19. return ""
  20. return str(cleaned.max())
  21. def _safe_int(value: Any) -> int:
  22. if value is None or (isinstance(value, float) and pd.isna(value)):
  23. return 0
  24. return int(value)
  25. def _safe_bool(value: Any, default: bool = True) -> bool:
  26. if value is None or (isinstance(value, float) and pd.isna(value)):
  27. return default
  28. if isinstance(value, bool):
  29. return value
  30. text = str(value).strip().lower()
  31. if text in {"true", "1", "yes"}:
  32. return True
  33. if text in {"false", "0", "no"}:
  34. return False
  35. return default
  36. def _latest_monitor_slice(monitor_history: pd.DataFrame, latest_bar_date: str) -> pd.DataFrame:
  37. if monitor_history.empty or not latest_bar_date:
  38. return pd.DataFrame()
  39. if "latest_bar_date" not in monitor_history.columns:
  40. return pd.DataFrame()
  41. return monitor_history[monitor_history["latest_bar_date"].astype(str) == str(latest_bar_date)].copy()
  42. def _latest_divergence_row(divergence_log: pd.DataFrame, latest_bar_date: str) -> dict[str, Any]:
  43. if divergence_log.empty or not latest_bar_date:
  44. return {}
  45. if "latest_bar_date" not in divergence_log.columns:
  46. return {}
  47. rows = divergence_log[divergence_log["latest_bar_date"].astype(str) == str(latest_bar_date)].copy()
  48. if rows.empty:
  49. return {}
  50. return rows.iloc[-1].to_dict()
  51. def evaluate_rollout(
  52. manifest: dict[str, Any],
  53. monitor_history: pd.DataFrame,
  54. divergence_log: pd.DataFrame,
  55. monitor_health_report_exists: bool,
  56. ) -> tuple[str, list[GateResult], list[str], str]:
  57. latest_bar_date = str(manifest.get("latest_bar_date", ""))
  58. manifest_branch = str(manifest.get("branch", "alpha_first_glued_refined_hot_cap"))
  59. monitor_latest_date = _max_iso_date(monitor_history.get("latest_bar_date", pd.Series(dtype=str)))
  60. divergence_latest_date = _max_iso_date(divergence_log.get("latest_bar_date", pd.Series(dtype=str)))
  61. latest_monitor = _latest_monitor_slice(monitor_history=monitor_history, latest_bar_date=latest_bar_date)
  62. divergence_row = _latest_divergence_row(divergence_log=divergence_log, latest_bar_date=latest_bar_date)
  63. warning_count = _safe_int((latest_monitor["status"] == "warning").sum()) if not latest_monitor.empty else 0
  64. hard_count = (
  65. _safe_int(latest_monitor["status"].isin(["hard_breach", "missing_data"]).sum())
  66. if not latest_monitor.empty
  67. else 0
  68. )
  69. max_warning_streak = _safe_int(latest_monitor.get("warning_streak", pd.Series(dtype=float)).max())
  70. max_hard_streak = _safe_int(latest_monitor.get("hard_breach_streak", pd.Series(dtype=float)).max())
  71. divergence_level = str(divergence_row.get("divergence_level", "unknown")) if divergence_row else "unknown"
  72. same_position = _safe_bool(divergence_row.get("same_position_flag"), default=True) if divergence_row else True
  73. same_event = _safe_bool(divergence_row.get("same_latest_real_event_flag"), default=True) if divergence_row else True
  74. gates: list[GateResult] = []
  75. freshness_ok = bool(latest_bar_date) and latest_bar_date == monitor_latest_date and (
  76. not divergence_latest_date or latest_bar_date == divergence_latest_date
  77. )
  78. gates.append(
  79. GateResult(
  80. gate="data_freshness",
  81. status="ok" if freshness_ok else "hard_fail",
  82. value=f"manifest={latest_bar_date},monitor={monitor_latest_date or 'missing'},divergence={divergence_latest_date or 'missing'}",
  83. threshold="all dates aligned",
  84. detail="Daily manifest, monitor history, and divergence log must point to the same latest bar.",
  85. action="rerun forward pipeline before any decision" if not freshness_ok else "none",
  86. )
  87. )
  88. gates.append(
  89. GateResult(
  90. gate="monitor_health_report_present",
  91. status="ok" if monitor_health_report_exists else "hard_fail",
  92. value="present" if monitor_health_report_exists else "missing",
  93. threshold="present",
  94. detail="Rollout decision requires a generated monitor health report artifact.",
  95. action="rerun forward observation pipeline" if not monitor_health_report_exists else "none",
  96. )
  97. )
  98. gates.append(
  99. GateResult(
  100. gate="hard_breach_budget",
  101. status="ok" if hard_count == 0 else "hard_fail",
  102. value=str(hard_count),
  103. threshold="0",
  104. detail="Latest monitor snapshot should not contain hard breach or missing-data metrics.",
  105. action="freeze candidate and open rollback review" if hard_count > 0 else "none",
  106. )
  107. )
  108. gates.append(
  109. GateResult(
  110. gate="hard_breach_streak",
  111. status="ok" if max_hard_streak == 0 else "hard_fail",
  112. value=str(max_hard_streak),
  113. threshold="0",
  114. detail="Consecutive hard-breach streak must remain zero.",
  115. action="switch active branch to control and investigate root cause" if max_hard_streak > 0 else "none",
  116. )
  117. )
  118. warn_status = "ok"
  119. if warning_count > 1:
  120. warn_status = "warning"
  121. gates.append(
  122. GateResult(
  123. gate="warning_budget",
  124. status=warn_status,
  125. value=str(warning_count),
  126. threshold="<=1",
  127. detail="Latest warning count should stay low and bounded.",
  128. action="hold forward promotion until warning count recovers" if warn_status != "ok" else "none",
  129. )
  130. )
  131. streak_status = "ok"
  132. if max_warning_streak > 2:
  133. streak_status = "warning"
  134. gates.append(
  135. GateResult(
  136. gate="warning_streak",
  137. status=streak_status,
  138. value=str(max_warning_streak),
  139. threshold="<=2",
  140. detail="Warning streaks above two bars indicate persistent instability.",
  141. action="hold rollout and run targeted stress replay" if streak_status != "ok" else "none",
  142. )
  143. )
  144. divergence_status = "ok"
  145. if divergence_level == "review_required":
  146. divergence_status = "hard_fail"
  147. elif divergence_level in {"material", "unknown"}:
  148. divergence_status = "warning"
  149. gates.append(
  150. GateResult(
  151. gate="branch_divergence_level",
  152. status=divergence_status,
  153. value=divergence_level,
  154. threshold="none|mild",
  155. detail="Divergence should stay none/mild for routine forward tracking.",
  156. action=(
  157. "freeze candidate and perform rollback drill"
  158. if divergence_status == "hard_fail"
  159. else "review divergence context before continuing"
  160. if divergence_status == "warning"
  161. else "none"
  162. ),
  163. )
  164. )
  165. alignment_status = "ok" if (same_position and same_event) else "warning"
  166. gates.append(
  167. GateResult(
  168. gate="refined_control_alignment",
  169. status=alignment_status,
  170. value=f"same_position={same_position},same_event={same_event}",
  171. threshold="both true",
  172. detail="Forward candidate and control branch should remain aligned unless divergence is intentional.",
  173. action="review event diff and attribution pack" if alignment_status != "ok" else "none",
  174. )
  175. )
  176. hard_fail = [g.gate for g in gates if g.status == "hard_fail"]
  177. warning = [g.gate for g in gates if g.status == "warning"]
  178. if hard_fail:
  179. decision = ROLLBACK_REVIEW_REQUIRED
  180. elif warning:
  181. decision = HOLD_AND_REVIEW
  182. else:
  183. decision = FORWARD_OK
  184. if decision == ROLLBACK_REVIEW_REQUIRED:
  185. active_branch = "alpha_first_selective_veto"
  186. else:
  187. active_branch = manifest_branch
  188. reasons = [f"{g.gate}:{g.status}" for g in gates if g.status != "ok"]
  189. return decision, gates, reasons, active_branch
  190. def gate_rows(gates: list[GateResult]) -> list[dict[str, Any]]:
  191. return [asdict(g) for g in gates]