from __future__ import annotations from dataclasses import asdict, dataclass from typing import Any import pandas as pd FORWARD_OK = "FORWARD_OK" HOLD_AND_REVIEW = "HOLD_AND_REVIEW" ROLLBACK_REVIEW_REQUIRED = "ROLLBACK_REVIEW_REQUIRED" @dataclass(frozen=True) class GateResult: gate: str status: str value: str threshold: str detail: str action: str def _max_iso_date(values: pd.Series) -> str: cleaned = values.dropna().astype(str) if cleaned.empty: return "" return str(cleaned.max()) def _safe_int(value: Any) -> int: if value is None or (isinstance(value, float) and pd.isna(value)): return 0 return int(value) def _safe_bool(value: Any, default: bool = True) -> bool: if value is None or (isinstance(value, float) and pd.isna(value)): return default if isinstance(value, bool): return value text = str(value).strip().lower() if text in {"true", "1", "yes"}: return True if text in {"false", "0", "no"}: return False return default def _latest_monitor_slice(monitor_history: pd.DataFrame, latest_bar_date: str) -> pd.DataFrame: if monitor_history.empty or not latest_bar_date: return pd.DataFrame() if "latest_bar_date" not in monitor_history.columns: return pd.DataFrame() return monitor_history[monitor_history["latest_bar_date"].astype(str) == str(latest_bar_date)].copy() def _latest_divergence_row(divergence_log: pd.DataFrame, latest_bar_date: str) -> dict[str, Any]: if divergence_log.empty or not latest_bar_date: return {} if "latest_bar_date" not in divergence_log.columns: return {} rows = divergence_log[divergence_log["latest_bar_date"].astype(str) == str(latest_bar_date)].copy() if rows.empty: return {} return rows.iloc[-1].to_dict() def evaluate_rollout( manifest: dict[str, Any], monitor_history: pd.DataFrame, divergence_log: pd.DataFrame, monitor_health_report_exists: bool, ) -> tuple[str, list[GateResult], list[str], str]: latest_bar_date = str(manifest.get("latest_bar_date", "")) manifest_branch = str(manifest.get("branch", "alpha_first_glued_refined_hot_cap")) monitor_latest_date = _max_iso_date(monitor_history.get("latest_bar_date", pd.Series(dtype=str))) divergence_latest_date = _max_iso_date(divergence_log.get("latest_bar_date", pd.Series(dtype=str))) latest_monitor = _latest_monitor_slice(monitor_history=monitor_history, latest_bar_date=latest_bar_date) divergence_row = _latest_divergence_row(divergence_log=divergence_log, latest_bar_date=latest_bar_date) warning_count = _safe_int((latest_monitor["status"] == "warning").sum()) if not latest_monitor.empty else 0 hard_count = ( _safe_int(latest_monitor["status"].isin(["hard_breach", "missing_data"]).sum()) if not latest_monitor.empty else 0 ) max_warning_streak = _safe_int(latest_monitor.get("warning_streak", pd.Series(dtype=float)).max()) max_hard_streak = _safe_int(latest_monitor.get("hard_breach_streak", pd.Series(dtype=float)).max()) divergence_level = str(divergence_row.get("divergence_level", "unknown")) if divergence_row else "unknown" same_position = _safe_bool(divergence_row.get("same_position_flag"), default=True) if divergence_row else True same_event = _safe_bool(divergence_row.get("same_latest_real_event_flag"), default=True) if divergence_row else True gates: list[GateResult] = [] freshness_ok = bool(latest_bar_date) and latest_bar_date == monitor_latest_date and ( not divergence_latest_date or latest_bar_date == divergence_latest_date ) gates.append( GateResult( gate="data_freshness", status="ok" if freshness_ok else "hard_fail", value=f"manifest={latest_bar_date},monitor={monitor_latest_date or 'missing'},divergence={divergence_latest_date or 'missing'}", threshold="all dates aligned", detail="Daily manifest, monitor history, and divergence log must point to the same latest bar.", action="rerun forward pipeline before any decision" if not freshness_ok else "none", ) ) gates.append( GateResult( gate="monitor_health_report_present", status="ok" if monitor_health_report_exists else "hard_fail", value="present" if monitor_health_report_exists else "missing", threshold="present", detail="Rollout decision requires a generated monitor health report artifact.", action="rerun forward observation pipeline" if not monitor_health_report_exists else "none", ) ) gates.append( GateResult( gate="hard_breach_budget", status="ok" if hard_count == 0 else "hard_fail", value=str(hard_count), threshold="0", detail="Latest monitor snapshot should not contain hard breach or missing-data metrics.", action="freeze candidate and open rollback review" if hard_count > 0 else "none", ) ) gates.append( GateResult( gate="hard_breach_streak", status="ok" if max_hard_streak == 0 else "hard_fail", value=str(max_hard_streak), threshold="0", detail="Consecutive hard-breach streak must remain zero.", action="switch active branch to control and investigate root cause" if max_hard_streak > 0 else "none", ) ) warn_status = "ok" if warning_count > 1: warn_status = "warning" gates.append( GateResult( gate="warning_budget", status=warn_status, value=str(warning_count), threshold="<=1", detail="Latest warning count should stay low and bounded.", action="hold forward promotion until warning count recovers" if warn_status != "ok" else "none", ) ) streak_status = "ok" if max_warning_streak > 2: streak_status = "warning" gates.append( GateResult( gate="warning_streak", status=streak_status, value=str(max_warning_streak), threshold="<=2", detail="Warning streaks above two bars indicate persistent instability.", action="hold rollout and run targeted stress replay" if streak_status != "ok" else "none", ) ) divergence_status = "ok" if divergence_level == "review_required": divergence_status = "hard_fail" elif divergence_level in {"material", "unknown"}: divergence_status = "warning" gates.append( GateResult( gate="branch_divergence_level", status=divergence_status, value=divergence_level, threshold="none|mild", detail="Divergence should stay none/mild for routine forward tracking.", action=( "freeze candidate and perform rollback drill" if divergence_status == "hard_fail" else "review divergence context before continuing" if divergence_status == "warning" else "none" ), ) ) alignment_status = "ok" if (same_position and same_event) else "warning" gates.append( GateResult( gate="refined_control_alignment", status=alignment_status, value=f"same_position={same_position},same_event={same_event}", threshold="both true", detail="Forward candidate and control branch should remain aligned unless divergence is intentional.", action="review event diff and attribution pack" if alignment_status != "ok" else "none", ) ) hard_fail = [g.gate for g in gates if g.status == "hard_fail"] warning = [g.gate for g in gates if g.status == "warning"] if hard_fail: decision = ROLLBACK_REVIEW_REQUIRED elif warning: decision = HOLD_AND_REVIEW else: decision = FORWARD_OK if decision == ROLLBACK_REVIEW_REQUIRED: active_branch = "alpha_first_selective_veto" else: active_branch = manifest_branch reasons = [f"{g.gate}:{g.status}" for g in gates if g.status != "ok"] return decision, gates, reasons, active_branch def gate_rows(gates: list[GateResult]) -> list[dict[str, Any]]: return [asdict(g) for g in gates]