| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- from __future__ import annotations
- from dataclasses import asdict, dataclass
- from typing import Any
- import pandas as pd
- FORWARD_OK = "FORWARD_OK"
- HOLD_AND_REVIEW = "HOLD_AND_REVIEW"
- ROLLBACK_REVIEW_REQUIRED = "ROLLBACK_REVIEW_REQUIRED"
- @dataclass(frozen=True)
- class GateResult:
- gate: str
- status: str
- value: str
- threshold: str
- detail: str
- action: str
- def _max_iso_date(values: pd.Series) -> str:
- cleaned = values.dropna().astype(str)
- if cleaned.empty:
- return ""
- return str(cleaned.max())
- def _safe_int(value: Any) -> int:
- if value is None or (isinstance(value, float) and pd.isna(value)):
- return 0
- return int(value)
- def _safe_bool(value: Any, default: bool = True) -> bool:
- if value is None or (isinstance(value, float) and pd.isna(value)):
- return default
- if isinstance(value, bool):
- return value
- text = str(value).strip().lower()
- if text in {"true", "1", "yes"}:
- return True
- if text in {"false", "0", "no"}:
- return False
- return default
- def _latest_monitor_slice(monitor_history: pd.DataFrame, latest_bar_date: str) -> pd.DataFrame:
- if monitor_history.empty or not latest_bar_date:
- return pd.DataFrame()
- if "latest_bar_date" not in monitor_history.columns:
- return pd.DataFrame()
- return monitor_history[monitor_history["latest_bar_date"].astype(str) == str(latest_bar_date)].copy()
- def _latest_divergence_row(divergence_log: pd.DataFrame, latest_bar_date: str) -> dict[str, Any]:
- if divergence_log.empty or not latest_bar_date:
- return {}
- if "latest_bar_date" not in divergence_log.columns:
- return {}
- rows = divergence_log[divergence_log["latest_bar_date"].astype(str) == str(latest_bar_date)].copy()
- if rows.empty:
- return {}
- return rows.iloc[-1].to_dict()
- def evaluate_rollout(
- manifest: dict[str, Any],
- monitor_history: pd.DataFrame,
- divergence_log: pd.DataFrame,
- monitor_health_report_exists: bool,
- ) -> tuple[str, list[GateResult], list[str], str]:
- latest_bar_date = str(manifest.get("latest_bar_date", ""))
- manifest_branch = str(manifest.get("branch", "alpha_first_glued_refined_hot_cap"))
- monitor_latest_date = _max_iso_date(monitor_history.get("latest_bar_date", pd.Series(dtype=str)))
- divergence_latest_date = _max_iso_date(divergence_log.get("latest_bar_date", pd.Series(dtype=str)))
- latest_monitor = _latest_monitor_slice(monitor_history=monitor_history, latest_bar_date=latest_bar_date)
- divergence_row = _latest_divergence_row(divergence_log=divergence_log, latest_bar_date=latest_bar_date)
- warning_count = _safe_int((latest_monitor["status"] == "warning").sum()) if not latest_monitor.empty else 0
- hard_count = (
- _safe_int(latest_monitor["status"].isin(["hard_breach", "missing_data"]).sum())
- if not latest_monitor.empty
- else 0
- )
- max_warning_streak = _safe_int(latest_monitor.get("warning_streak", pd.Series(dtype=float)).max())
- max_hard_streak = _safe_int(latest_monitor.get("hard_breach_streak", pd.Series(dtype=float)).max())
- divergence_level = str(divergence_row.get("divergence_level", "unknown")) if divergence_row else "unknown"
- same_position = _safe_bool(divergence_row.get("same_position_flag"), default=True) if divergence_row else True
- same_event = _safe_bool(divergence_row.get("same_latest_real_event_flag"), default=True) if divergence_row else True
- gates: list[GateResult] = []
- freshness_ok = bool(latest_bar_date) and latest_bar_date == monitor_latest_date and (
- not divergence_latest_date or latest_bar_date == divergence_latest_date
- )
- gates.append(
- GateResult(
- gate="data_freshness",
- status="ok" if freshness_ok else "hard_fail",
- value=f"manifest={latest_bar_date},monitor={monitor_latest_date or 'missing'},divergence={divergence_latest_date or 'missing'}",
- threshold="all dates aligned",
- detail="Daily manifest, monitor history, and divergence log must point to the same latest bar.",
- action="rerun forward pipeline before any decision" if not freshness_ok else "none",
- )
- )
- gates.append(
- GateResult(
- gate="monitor_health_report_present",
- status="ok" if monitor_health_report_exists else "hard_fail",
- value="present" if monitor_health_report_exists else "missing",
- threshold="present",
- detail="Rollout decision requires a generated monitor health report artifact.",
- action="rerun forward observation pipeline" if not monitor_health_report_exists else "none",
- )
- )
- gates.append(
- GateResult(
- gate="hard_breach_budget",
- status="ok" if hard_count == 0 else "hard_fail",
- value=str(hard_count),
- threshold="0",
- detail="Latest monitor snapshot should not contain hard breach or missing-data metrics.",
- action="freeze candidate and open rollback review" if hard_count > 0 else "none",
- )
- )
- gates.append(
- GateResult(
- gate="hard_breach_streak",
- status="ok" if max_hard_streak == 0 else "hard_fail",
- value=str(max_hard_streak),
- threshold="0",
- detail="Consecutive hard-breach streak must remain zero.",
- action="switch active branch to control and investigate root cause" if max_hard_streak > 0 else "none",
- )
- )
- warn_status = "ok"
- if warning_count > 1:
- warn_status = "warning"
- gates.append(
- GateResult(
- gate="warning_budget",
- status=warn_status,
- value=str(warning_count),
- threshold="<=1",
- detail="Latest warning count should stay low and bounded.",
- action="hold forward promotion until warning count recovers" if warn_status != "ok" else "none",
- )
- )
- streak_status = "ok"
- if max_warning_streak > 2:
- streak_status = "warning"
- gates.append(
- GateResult(
- gate="warning_streak",
- status=streak_status,
- value=str(max_warning_streak),
- threshold="<=2",
- detail="Warning streaks above two bars indicate persistent instability.",
- action="hold rollout and run targeted stress replay" if streak_status != "ok" else "none",
- )
- )
- divergence_status = "ok"
- if divergence_level == "review_required":
- divergence_status = "hard_fail"
- elif divergence_level in {"material", "unknown"}:
- divergence_status = "warning"
- gates.append(
- GateResult(
- gate="branch_divergence_level",
- status=divergence_status,
- value=divergence_level,
- threshold="none|mild",
- detail="Divergence should stay none/mild for routine forward tracking.",
- action=(
- "freeze candidate and perform rollback drill"
- if divergence_status == "hard_fail"
- else "review divergence context before continuing"
- if divergence_status == "warning"
- else "none"
- ),
- )
- )
- alignment_status = "ok" if (same_position and same_event) else "warning"
- gates.append(
- GateResult(
- gate="refined_control_alignment",
- status=alignment_status,
- value=f"same_position={same_position},same_event={same_event}",
- threshold="both true",
- detail="Forward candidate and control branch should remain aligned unless divergence is intentional.",
- action="review event diff and attribution pack" if alignment_status != "ok" else "none",
- )
- )
- hard_fail = [g.gate for g in gates if g.status == "hard_fail"]
- warning = [g.gate for g in gates if g.status == "warning"]
- if hard_fail:
- decision = ROLLBACK_REVIEW_REQUIRED
- elif warning:
- decision = HOLD_AND_REVIEW
- else:
- decision = FORWARD_OK
- if decision == ROLLBACK_REVIEW_REQUIRED:
- active_branch = "alpha_first_selective_veto"
- else:
- active_branch = manifest_branch
- reasons = [f"{g.gate}:{g.status}" for g in gates if g.status != "ok"]
- return decision, gates, reasons, active_branch
- def gate_rows(gates: list[GateResult]) -> list[dict[str, Any]]:
- return [asdict(g) for g in gates]
|