Bladeren bron

Add RC1 execution runtime decoupling and rollout governance

erwin 1 maand geleden
bovenliggende
commit
916c0b2894
19 gewijzigde bestanden met toevoegingen van 1009 en 148 verwijderingen
  1. 20 0
      research/dragon/v2/MEMORY.md
  2. 128 0
      research/dragon/v2/dragon_execution_runtime.py
  3. 2 0
      research/dragon/v2/dragon_forward_observation_pipeline.py
  4. 1 1
      research/dragon/v2/dragon_rc1_golden_manifest.json
  5. 228 0
      research/dragon/v2/dragon_rollout_governance.py
  6. 170 0
      research/dragon/v2/dragon_rollout_governance_check.py
  7. 76 0
      research/dragon/v2/dragon_rollout_governance_report.md
  8. 28 0
      research/dragon/v2/dragon_rollout_rollback_runbook.md
  9. 14 0
      research/dragon/v2/dragon_rollout_state.json
  10. 3 147
      research/dragon/v2/dragon_strategy.py
  11. 28 0
      research/dragon/v2/memory/2026-04-09.md
  12. 3 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/.openspec.yaml
  13. 54 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/acceptance-summary.md
  14. 54 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/design.md
  15. 27 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/proposal.md
  16. 17 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/specs/execution-runtime-decoupling/spec.md
  17. 24 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/specs/rollout-governance-gates/spec.md
  18. 24 0
      research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/tasks.md
  19. 108 0
      research/dragon/v2/tests/test_rollout_governance.py

+ 20 - 0
research/dragon/v2/MEMORY.md

@@ -1089,3 +1089,23 @@
 - attribution mapping still has no unknown reason mapping,
 - full test suite reached `14` passing tests,
 - daily signal pipeline smoke still passes.
+- Continued with a third OpenSpec execution round:
+- `openspec/changes/execution-decoupling-rollout-governance/`.
+- Main engineering outcomes:
+- decoupled compatibility runtime loop from `dragon_strategy.py` into `dragon_execution_runtime.py`,
+- added rollout gate engine and checker (`dragon_rollout_governance.py`, `dragon_rollout_governance_check.py`),
+- integrated checker into `dragon_forward_observation_pipeline.py`,
+- rollout checker now outputs explicit operational decision artifacts each run.
+- New rollout artifacts:
+- `dragon_rollout_state.json`
+- `dragon_rollout_governance_snapshot.csv`
+- `dragon_rollout_governance_report.md`
+- `dragon_rollout_rollback_runbook.md`
+- Added test `tests/test_rollout_governance.py`; full suite now `17` tests passing.
+- Revalidated guardrails after this round:
+- golden core hashes unchanged (`events 8965d1...a331`, `trades 1298be...cb97`),
+- daily and forward pipelines both passed smoke reruns.
+- latest rollout decision artifact shows:
+- `decision=FORWARD_OK`
+- `active_branch=alpha_first_glued_refined_hot_cap`
+- `fallback_branch=alpha_first_selective_veto`.

+ 128 - 0
research/dragon/v2/dragon_execution_runtime.py

@@ -0,0 +1,128 @@
+from __future__ import annotations
+
+from typing import Any, Optional
+
+import pandas as pd
+
+from dragon_rule_catalog import classify_entry_reason, classify_exit_reason
+
+
+def _event_payload(row: pd.Series, side: str, layer: str, reason: str) -> dict[str, object]:
+    return {
+        "date": row.name.date().isoformat(),
+        "side": side,
+        "layer": layer,
+        "reason": reason,
+        "close": float(row["close"]),
+        "a1": float(row["a1"]),
+        "b1": float(row["b1"]),
+        "c1": float(row["c1"]),
+        "kdj_buy": bool(row["kdj_buy"]),
+        "kdj_sell": bool(row["kdj_sell"]),
+        "ql_buy": bool(row["ql_buy"]),
+        "ql_sell": bool(row["ql_sell"]),
+    }
+
+
+def _trade_payload(engine: Any, row: pd.Series, reason: str) -> dict[str, object]:
+    return {
+        "buy_date": engine.context.entry_date.isoformat() if engine.context.entry_date else "",
+        "buy_price": engine.context.entry_price,
+        "buy_reason": engine.context.entry_reason,
+        "sell_date": row.name.date().isoformat(),
+        "sell_price": float(row["close"]),
+        "sell_reason": reason,
+        "holding_days": engine._holding_days(row.name.date()),
+        "return_pct": (
+            float(row["close"]) / engine.context.entry_price - 1
+            if engine.context.entry_price
+            else None
+        ),
+    }
+
+
+def _enrich_reason_metadata(engine: Any, events_df: pd.DataFrame, trades_df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
+    if not events_df.empty:
+        event_decisions = [
+            engine._build_decision(
+                side=str(row["side"]),
+                layer=str(row["layer"]),
+                reason=str(row["reason"]),
+            )
+            for _, row in events_df.iterrows()
+        ]
+        events_df["reason_layer"] = [d.reason.layer.value if d.reason is not None else "unknown" for d in event_decisions]
+        events_df["reason_family"] = [d.reason.family.value if d.reason is not None else "unknown" for d in event_decisions]
+        events_df["reason_code"] = [d.reason.code if d.reason is not None else "" for d in event_decisions]
+
+    if not trades_df.empty:
+        buy_meta = trades_df["buy_reason"].map(classify_entry_reason)
+        sell_meta = trades_df["sell_reason"].map(classify_exit_reason)
+        trades_df["buy_reason_layer"] = [meta.layer.value for meta in buy_meta]
+        trades_df["buy_reason_family"] = [meta.family.value for meta in buy_meta]
+        trades_df["buy_reason_code"] = [meta.code for meta in buy_meta]
+        trades_df["sell_reason_layer"] = [meta.layer.value for meta in sell_meta]
+        trades_df["sell_reason_family"] = [meta.family.value for meta in sell_meta]
+        trades_df["sell_reason_code"] = [meta.code for meta in sell_meta]
+
+    return events_df, trades_df
+
+
+def run_compat_execution(engine: Any, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
+    """
+    Execute the legacy-compatible strategy loop.
+
+    The engine object is expected to provide the same internal hooks as
+    DragonRuleEngine. This keeps the behavior path unchanged while physically
+    decoupling execution runtime from strategy rule declarations.
+    """
+
+    engine.context = engine.context.__class__()
+    events: list[dict[str, object]] = []
+    trades: list[dict[str, object]] = []
+    prev_row: Optional[pd.Series] = None
+
+    for _, row in df.iterrows():
+        engine._record_cross_counters(row)
+        engine._update_position_counters(row)
+        engine._update_pending_states(row)
+        just_bought = False
+
+        if (not engine.context.in_position) or bool(row["kdj_buy"] or row["ql_buy"]):
+            action, reason = engine._buy_decision(row, prev_row)
+            if action == "BUY":
+                engine._post_real_buy(row, reason)
+                just_bought = True
+                events.append(_event_payload(row, side="BUY", layer="real_trade", reason=reason))
+            elif action == "AUX_BUY":
+                engine._post_aux_buy(row)
+                events.append(_event_payload(row, side="BUY", layer="aux_signal", reason=reason))
+
+        state_aux_sell_candidate = (not engine.context.in_position) and engine._should_emit_state_aux_sell(row)
+        if not just_bought and (engine.context.in_position or bool(row["kdj_sell"] or row["ql_sell"]) or state_aux_sell_candidate):
+            if engine.context.in_position and bool(row["kdj_sell"] or row["ql_sell"]):
+                engine.context.sell_signal_count += 1
+                if bool(row["kdj_sell"]):
+                    engine.context.kdj_sell_signal_count += 1
+                if bool(row["ql_sell"]):
+                    engine.context.ql_sell_signal_count += 1
+                if float(row["b1"]) < 0:
+                    engine.context.b1_negative_sell_count += 1
+            action, reason = engine._sell_decision(row, prev_row)
+            if action == "SELL":
+                trades.append(_trade_payload(engine=engine, row=row, reason=reason))
+                engine.context.first_exit_checked = True
+                events.append(_event_payload(row, side="SELL", layer="real_trade", reason=reason))
+                engine._post_real_sell(row, reason)
+            elif action == "AUX_SELL":
+                if engine.context.in_position:
+                    engine.context.first_exit_checked = True
+                engine._post_aux_sell(row, reason)
+                events.append(_event_payload(row, side="SELL", layer="aux_signal", reason=reason))
+
+        prev_row = row
+
+    events_df = pd.DataFrame(events)
+    trades_df = pd.DataFrame(trades)
+    return _enrich_reason_metadata(engine=engine, events_df=events_df, trades_df=trades_df)
+

+ 2 - 0
research/dragon/v2/dragon_forward_observation_pipeline.py

@@ -8,6 +8,7 @@ import pandas as pd
 
 import dragon_daily_signal_pipeline as daily
 import dragon_html_reports as html_reports
+import dragon_rollout_governance_check as rollout_governance
 
 
 def _load_csv(path: Path) -> pd.DataFrame:
@@ -509,6 +510,7 @@ def main() -> None:
             path_df = _load_csv(path)
             _write_csv(path_df, archived)
 
+    rollout_governance.main()
     html_reports.main()
 
 

+ 1 - 1
research/dragon/v2/dragon_rc1_golden_manifest.json

@@ -1,5 +1,5 @@
 {
-  "generated_at": "2026-04-09T09:14:52",
+  "generated_at": "2026-04-09T10:26:03",
   "release_version": "RC1",
   "branch": "alpha_first_glued_refined_hot_cap",
   "evaluation_window": {

+ 228 - 0
research/dragon/v2/dragon_rollout_governance.py

@@ -0,0 +1,228 @@
+from __future__ import annotations
+
+from dataclasses import asdict, dataclass
+from typing import Any
+
+import pandas as pd
+
+
+FORWARD_OK = "FORWARD_OK"
+HOLD_AND_REVIEW = "HOLD_AND_REVIEW"
+ROLLBACK_REVIEW_REQUIRED = "ROLLBACK_REVIEW_REQUIRED"
+
+
+@dataclass(frozen=True)
+class GateResult:
+    gate: str
+    status: str
+    value: str
+    threshold: str
+    detail: str
+    action: str
+
+
+def _max_iso_date(values: pd.Series) -> str:
+    cleaned = values.dropna().astype(str)
+    if cleaned.empty:
+        return ""
+    return str(cleaned.max())
+
+
+def _safe_int(value: Any) -> int:
+    if value is None or (isinstance(value, float) and pd.isna(value)):
+        return 0
+    return int(value)
+
+
+def _safe_bool(value: Any, default: bool = True) -> bool:
+    if value is None or (isinstance(value, float) and pd.isna(value)):
+        return default
+    if isinstance(value, bool):
+        return value
+    text = str(value).strip().lower()
+    if text in {"true", "1", "yes"}:
+        return True
+    if text in {"false", "0", "no"}:
+        return False
+    return default
+
+
+def _latest_monitor_slice(monitor_history: pd.DataFrame, latest_bar_date: str) -> pd.DataFrame:
+    if monitor_history.empty or not latest_bar_date:
+        return pd.DataFrame()
+    if "latest_bar_date" not in monitor_history.columns:
+        return pd.DataFrame()
+    return monitor_history[monitor_history["latest_bar_date"].astype(str) == str(latest_bar_date)].copy()
+
+
+def _latest_divergence_row(divergence_log: pd.DataFrame, latest_bar_date: str) -> dict[str, Any]:
+    if divergence_log.empty or not latest_bar_date:
+        return {}
+    if "latest_bar_date" not in divergence_log.columns:
+        return {}
+    rows = divergence_log[divergence_log["latest_bar_date"].astype(str) == str(latest_bar_date)].copy()
+    if rows.empty:
+        return {}
+    return rows.iloc[-1].to_dict()
+
+
+def evaluate_rollout(
+    manifest: dict[str, Any],
+    monitor_history: pd.DataFrame,
+    divergence_log: pd.DataFrame,
+    monitor_health_report_exists: bool,
+) -> tuple[str, list[GateResult], list[str], str]:
+    latest_bar_date = str(manifest.get("latest_bar_date", ""))
+    manifest_branch = str(manifest.get("branch", "alpha_first_glued_refined_hot_cap"))
+
+    monitor_latest_date = _max_iso_date(monitor_history.get("latest_bar_date", pd.Series(dtype=str)))
+    divergence_latest_date = _max_iso_date(divergence_log.get("latest_bar_date", pd.Series(dtype=str)))
+    latest_monitor = _latest_monitor_slice(monitor_history=monitor_history, latest_bar_date=latest_bar_date)
+    divergence_row = _latest_divergence_row(divergence_log=divergence_log, latest_bar_date=latest_bar_date)
+
+    warning_count = _safe_int((latest_monitor["status"] == "warning").sum()) if not latest_monitor.empty else 0
+    hard_count = (
+        _safe_int(latest_monitor["status"].isin(["hard_breach", "missing_data"]).sum())
+        if not latest_monitor.empty
+        else 0
+    )
+    max_warning_streak = _safe_int(latest_monitor.get("warning_streak", pd.Series(dtype=float)).max())
+    max_hard_streak = _safe_int(latest_monitor.get("hard_breach_streak", pd.Series(dtype=float)).max())
+
+    divergence_level = str(divergence_row.get("divergence_level", "unknown")) if divergence_row else "unknown"
+    same_position = _safe_bool(divergence_row.get("same_position_flag"), default=True) if divergence_row else True
+    same_event = _safe_bool(divergence_row.get("same_latest_real_event_flag"), default=True) if divergence_row else True
+
+    gates: list[GateResult] = []
+
+    freshness_ok = bool(latest_bar_date) and latest_bar_date == monitor_latest_date and (
+        not divergence_latest_date or latest_bar_date == divergence_latest_date
+    )
+    gates.append(
+        GateResult(
+            gate="data_freshness",
+            status="ok" if freshness_ok else "hard_fail",
+            value=f"manifest={latest_bar_date},monitor={monitor_latest_date or 'missing'},divergence={divergence_latest_date or 'missing'}",
+            threshold="all dates aligned",
+            detail="Daily manifest, monitor history, and divergence log must point to the same latest bar.",
+            action="rerun forward pipeline before any decision" if not freshness_ok else "none",
+        )
+    )
+
+    gates.append(
+        GateResult(
+            gate="monitor_health_report_present",
+            status="ok" if monitor_health_report_exists else "hard_fail",
+            value="present" if monitor_health_report_exists else "missing",
+            threshold="present",
+            detail="Rollout decision requires a generated monitor health report artifact.",
+            action="rerun forward observation pipeline" if not monitor_health_report_exists else "none",
+        )
+    )
+
+    gates.append(
+        GateResult(
+            gate="hard_breach_budget",
+            status="ok" if hard_count == 0 else "hard_fail",
+            value=str(hard_count),
+            threshold="0",
+            detail="Latest monitor snapshot should not contain hard breach or missing-data metrics.",
+            action="freeze candidate and open rollback review" if hard_count > 0 else "none",
+        )
+    )
+
+    gates.append(
+        GateResult(
+            gate="hard_breach_streak",
+            status="ok" if max_hard_streak == 0 else "hard_fail",
+            value=str(max_hard_streak),
+            threshold="0",
+            detail="Consecutive hard-breach streak must remain zero.",
+            action="switch active branch to control and investigate root cause" if max_hard_streak > 0 else "none",
+        )
+    )
+
+    warn_status = "ok"
+    if warning_count > 1:
+        warn_status = "warning"
+    gates.append(
+        GateResult(
+            gate="warning_budget",
+            status=warn_status,
+            value=str(warning_count),
+            threshold="<=1",
+            detail="Latest warning count should stay low and bounded.",
+            action="hold forward promotion until warning count recovers" if warn_status != "ok" else "none",
+        )
+    )
+
+    streak_status = "ok"
+    if max_warning_streak > 2:
+        streak_status = "warning"
+    gates.append(
+        GateResult(
+            gate="warning_streak",
+            status=streak_status,
+            value=str(max_warning_streak),
+            threshold="<=2",
+            detail="Warning streaks above two bars indicate persistent instability.",
+            action="hold rollout and run targeted stress replay" if streak_status != "ok" else "none",
+        )
+    )
+
+    divergence_status = "ok"
+    if divergence_level == "review_required":
+        divergence_status = "hard_fail"
+    elif divergence_level in {"material", "unknown"}:
+        divergence_status = "warning"
+    gates.append(
+        GateResult(
+            gate="branch_divergence_level",
+            status=divergence_status,
+            value=divergence_level,
+            threshold="none|mild",
+            detail="Divergence should stay none/mild for routine forward tracking.",
+            action=(
+                "freeze candidate and perform rollback drill"
+                if divergence_status == "hard_fail"
+                else "review divergence context before continuing"
+                if divergence_status == "warning"
+                else "none"
+            ),
+        )
+    )
+
+    alignment_status = "ok" if (same_position and same_event) else "warning"
+    gates.append(
+        GateResult(
+            gate="refined_control_alignment",
+            status=alignment_status,
+            value=f"same_position={same_position},same_event={same_event}",
+            threshold="both true",
+            detail="Forward candidate and control branch should remain aligned unless divergence is intentional.",
+            action="review event diff and attribution pack" if alignment_status != "ok" else "none",
+        )
+    )
+
+    hard_fail = [g.gate for g in gates if g.status == "hard_fail"]
+    warning = [g.gate for g in gates if g.status == "warning"]
+
+    if hard_fail:
+        decision = ROLLBACK_REVIEW_REQUIRED
+    elif warning:
+        decision = HOLD_AND_REVIEW
+    else:
+        decision = FORWARD_OK
+
+    if decision == ROLLBACK_REVIEW_REQUIRED:
+        active_branch = "alpha_first_selective_veto"
+    else:
+        active_branch = manifest_branch
+
+    reasons = [f"{g.gate}:{g.status}" for g in gates if g.status != "ok"]
+    return decision, gates, reasons, active_branch
+
+
+def gate_rows(gates: list[GateResult]) -> list[dict[str, Any]]:
+    return [asdict(g) for g in gates]
+

+ 170 - 0
research/dragon/v2/dragon_rollout_governance_check.py

@@ -0,0 +1,170 @@
+from __future__ import annotations
+
+from datetime import datetime
+import json
+from pathlib import Path
+
+import pandas as pd
+
+from dragon_rollout_governance import (
+    FORWARD_OK,
+    HOLD_AND_REVIEW,
+    ROLLBACK_REVIEW_REQUIRED,
+    evaluate_rollout,
+    gate_rows,
+)
+
+
+def _load_json(path: Path) -> dict[str, object]:
+    if not path.exists():
+        return {}
+    return json.loads(path.read_text(encoding="utf-8"))
+
+
+def _load_csv(path: Path) -> pd.DataFrame:
+    if not path.exists():
+        return pd.DataFrame()
+    return pd.read_csv(path, encoding="utf-8-sig")
+
+
+def main() -> None:
+    base_dir = Path(__file__).resolve().parent
+    manifest_path = base_dir / "dragon_daily_rc1_manifest.json"
+    monitor_history_path = base_dir / "dragon_monitor_history.csv"
+    divergence_log_path = base_dir / "dragon_branch_divergence_log.csv"
+    monitor_health_path = base_dir / "dragon_monitor_health_report.md"
+
+    manifest = _load_json(manifest_path)
+    monitor_history = _load_csv(monitor_history_path)
+    divergence_log = _load_csv(divergence_log_path)
+
+    decision, gates, reasons, active_branch = evaluate_rollout(
+        manifest=manifest,
+        monitor_history=monitor_history,
+        divergence_log=divergence_log,
+        monitor_health_report_exists=monitor_health_path.exists(),
+    )
+
+    fallback_branch = "alpha_first_selective_veto"
+    latest_bar_date = str(manifest.get("latest_bar_date", ""))
+    request_date = str(manifest.get("as_of_request_date", ""))
+    generated_at = datetime.now().isoformat(timespec="seconds")
+
+    snapshot = pd.DataFrame(gate_rows(gates))
+    snapshot.to_csv(base_dir / "dragon_rollout_governance_snapshot.csv", index=False, encoding="utf-8-sig")
+
+    gate_status_counts = snapshot["status"].value_counts().to_dict() if not snapshot.empty else {}
+    decision_payload = {
+        "generated_at": generated_at,
+        "request_date": request_date,
+        "latest_bar_date": latest_bar_date,
+        "release_version": manifest.get("release_version", "RC1"),
+        "candidate_branch": manifest.get("branch", "alpha_first_glued_refined_hot_cap"),
+        "fallback_branch": fallback_branch,
+        "active_branch": active_branch,
+        "decision": decision,
+        "gate_status_counts": gate_status_counts,
+        "reasons": reasons,
+    }
+    (base_dir / "dragon_rollout_state.json").write_text(
+        json.dumps(decision_payload, indent=2, ensure_ascii=False) + "\n",
+        encoding="utf-8",
+    )
+
+    lines = [
+        "# Dragon Rollout Governance Report",
+        "",
+        f"- generated_at: `{generated_at}`",
+        f"- request_date: `{request_date}`",
+        f"- latest_bar_date: `{latest_bar_date}`",
+        f"- candidate_branch: `{decision_payload['candidate_branch']}`",
+        f"- fallback_branch: `{fallback_branch}`",
+        f"- decision: `{decision}`",
+        f"- active_branch: `{active_branch}`",
+        "",
+        "## Gate Summary",
+        f"- ok: `{int(gate_status_counts.get('ok', 0))}`",
+        f"- warning: `{int(gate_status_counts.get('warning', 0))}`",
+        f"- hard_fail: `{int(gate_status_counts.get('hard_fail', 0))}`",
+        "",
+        "## Gate Details",
+    ]
+
+    if snapshot.empty:
+        lines.append("- No gate rows were generated.")
+    else:
+        for _, row in snapshot.iterrows():
+            lines.extend(
+                [
+                    f"### {row['gate']}",
+                    f"- status: `{row['status']}`",
+                    f"- value: `{row['value']}`",
+                    f"- threshold: `{row['threshold']}`",
+                    f"- detail: {row['detail']}",
+                    f"- action: {row['action']}",
+                    "",
+                ]
+            )
+
+    if reasons:
+        lines.extend(
+            [
+                "## Decision Reasons",
+                *(f"- `{item}`" for item in reasons),
+                "",
+            ]
+        )
+
+    lines.extend(
+        [
+            "## Artifacts",
+            "- `dragon_rollout_state.json`",
+            "- `dragon_rollout_governance_snapshot.csv`",
+            "- `dragon_rollout_rollback_runbook.md`",
+        ]
+    )
+    (base_dir / "dragon_rollout_governance_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
+
+    runbook_lines = [
+        "# Dragon Rollback Runbook",
+        "",
+        "## Trigger Conditions",
+        "- Any gate in `dragon_rollout_governance_snapshot.csv` with `status = hard_fail`.",
+        "- Decision equals `ROLLBACK_REVIEW_REQUIRED`.",
+        "- Persistent warning state (`HOLD_AND_REVIEW`) for more than two bars.",
+        "",
+        "## Immediate Actions",
+        "1. Freeze candidate branch decisions for live-facing guidance.",
+        "2. Switch active branch to control branch `alpha_first_selective_veto`.",
+        "3. Re-run forward chain and verify monitor status:",
+        "   `py -3 dragon_forward_observation_pipeline.py`",
+        "4. Inspect latest artifacts:",
+        "   - `dragon_daily_monitor_snapshot.csv`",
+        "   - `dragon_monitor_health_report.md`",
+        "   - `dragon_branch_divergence_report.md`",
+        "",
+        "## Recovery Checklist",
+        "1. Confirm all hard-fail gates are cleared.",
+        "2. Confirm warning streaks return to threshold range.",
+        "3. Confirm branch divergence drops to `none` or `mild`.",
+        "4. Regenerate rollout report and ensure decision is `FORWARD_OK` before reactivating candidate branch.",
+        "",
+        "## Current Snapshot",
+        f"- latest_bar_date: `{latest_bar_date}`",
+        f"- decision: `{decision}`",
+        f"- recommended_active_branch: `{active_branch}`",
+        f"- fallback_branch: `{fallback_branch}`",
+    ]
+    (base_dir / "dragon_rollout_rollback_runbook.md").write_text("\n".join(runbook_lines) + "\n", encoding="utf-8")
+
+    if decision == FORWARD_OK:
+        return
+    if decision == HOLD_AND_REVIEW:
+        return
+    if decision == ROLLBACK_REVIEW_REQUIRED:
+        return
+
+
+if __name__ == "__main__":
+    main()
+

+ 76 - 0
research/dragon/v2/dragon_rollout_governance_report.md

@@ -0,0 +1,76 @@
+# Dragon Rollout Governance Report
+
+- generated_at: `2026-04-09T10:26:14`
+- request_date: `2026-04-09`
+- latest_bar_date: `2026-04-09`
+- candidate_branch: `alpha_first_glued_refined_hot_cap`
+- fallback_branch: `alpha_first_selective_veto`
+- decision: `FORWARD_OK`
+- active_branch: `alpha_first_glued_refined_hot_cap`
+
+## Gate Summary
+- ok: `8`
+- warning: `0`
+- hard_fail: `0`
+
+## Gate Details
+### data_freshness
+- status: `ok`
+- value: `manifest=2026-04-09,monitor=2026-04-09,divergence=2026-04-09`
+- threshold: `all dates aligned`
+- detail: Daily manifest, monitor history, and divergence log must point to the same latest bar.
+- action: none
+
+### monitor_health_report_present
+- status: `ok`
+- value: `present`
+- threshold: `present`
+- detail: Rollout decision requires a generated monitor health report artifact.
+- action: none
+
+### hard_breach_budget
+- status: `ok`
+- value: `0`
+- threshold: `0`
+- detail: Latest monitor snapshot should not contain hard breach or missing-data metrics.
+- action: none
+
+### hard_breach_streak
+- status: `ok`
+- value: `0`
+- threshold: `0`
+- detail: Consecutive hard-breach streak must remain zero.
+- action: none
+
+### warning_budget
+- status: `ok`
+- value: `0`
+- threshold: `<=1`
+- detail: Latest warning count should stay low and bounded.
+- action: none
+
+### warning_streak
+- status: `ok`
+- value: `0`
+- threshold: `<=2`
+- detail: Warning streaks above two bars indicate persistent instability.
+- action: none
+
+### branch_divergence_level
+- status: `ok`
+- value: `none`
+- threshold: `none|mild`
+- detail: Divergence should stay none/mild for routine forward tracking.
+- action: none
+
+### refined_control_alignment
+- status: `ok`
+- value: `same_position=True,same_event=True`
+- threshold: `both true`
+- detail: Forward candidate and control branch should remain aligned unless divergence is intentional.
+- action: none
+
+## Artifacts
+- `dragon_rollout_state.json`
+- `dragon_rollout_governance_snapshot.csv`
+- `dragon_rollout_rollback_runbook.md`

+ 28 - 0
research/dragon/v2/dragon_rollout_rollback_runbook.md

@@ -0,0 +1,28 @@
+# Dragon Rollback Runbook
+
+## Trigger Conditions
+- Any gate in `dragon_rollout_governance_snapshot.csv` with `status = hard_fail`.
+- Decision equals `ROLLBACK_REVIEW_REQUIRED`.
+- Persistent warning state (`HOLD_AND_REVIEW`) for more than two bars.
+
+## Immediate Actions
+1. Freeze candidate branch decisions for live-facing guidance.
+2. Switch active branch to control branch `alpha_first_selective_veto`.
+3. Re-run forward chain and verify monitor status:
+   `py -3 dragon_forward_observation_pipeline.py`
+4. Inspect latest artifacts:
+   - `dragon_daily_monitor_snapshot.csv`
+   - `dragon_monitor_health_report.md`
+   - `dragon_branch_divergence_report.md`
+
+## Recovery Checklist
+1. Confirm all hard-fail gates are cleared.
+2. Confirm warning streaks return to threshold range.
+3. Confirm branch divergence drops to `none` or `mild`.
+4. Regenerate rollout report and ensure decision is `FORWARD_OK` before reactivating candidate branch.
+
+## Current Snapshot
+- latest_bar_date: `2026-04-09`
+- decision: `FORWARD_OK`
+- recommended_active_branch: `alpha_first_glued_refined_hot_cap`
+- fallback_branch: `alpha_first_selective_veto`

+ 14 - 0
research/dragon/v2/dragon_rollout_state.json

@@ -0,0 +1,14 @@
+{
+  "generated_at": "2026-04-09T10:26:14",
+  "request_date": "2026-04-09",
+  "latest_bar_date": "2026-04-09",
+  "release_version": "RC1",
+  "candidate_branch": "alpha_first_glued_refined_hot_cap",
+  "fallback_branch": "alpha_first_selective_veto",
+  "active_branch": "alpha_first_glued_refined_hot_cap",
+  "decision": "FORWARD_OK",
+  "gate_status_counts": {
+    "ok": 8
+  },
+  "reasons": []
+}

+ 3 - 147
research/dragon/v2/dragon_strategy.py

@@ -1220,153 +1220,9 @@ class DragonRuleEngine:
                 self.context.flat_post_exit_kdj_emitted = True
 
     def run(self, df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
-        self.context = StrategyContext()
-        events: list[dict[str, object]] = []
-        trades: list[dict[str, object]] = []
-        prev_row: Optional[pd.Series] = None
-
-        for _, row in df.iterrows():
-            self._record_cross_counters(row)
-            self._update_position_counters(row)
-            self._update_pending_states(row)
-            just_bought = False
-
-            if (not self.context.in_position) or bool(row["kdj_buy"] or row["ql_buy"]):
-                action, reason = self._buy_decision(row, prev_row)
-                if action == "BUY":
-                    self._post_real_buy(row, reason)
-                    just_bought = True
-                    events.append(
-                        {
-                            "date": row.name.date().isoformat(),
-                            "side": "BUY",
-                            "layer": "real_trade",
-                            "reason": reason,
-                            "close": float(row["close"]),
-                            "a1": float(row["a1"]),
-                            "b1": float(row["b1"]),
-                            "c1": float(row["c1"]),
-                            "kdj_buy": bool(row["kdj_buy"]),
-                            "kdj_sell": bool(row["kdj_sell"]),
-                            "ql_buy": bool(row["ql_buy"]),
-                            "ql_sell": bool(row["ql_sell"]),
-                        }
-                    )
-                elif action == "AUX_BUY":
-                    self._post_aux_buy(row)
-                    events.append(
-                        {
-                            "date": row.name.date().isoformat(),
-                            "side": "BUY",
-                            "layer": "aux_signal",
-                            "reason": reason,
-                            "close": float(row["close"]),
-                            "a1": float(row["a1"]),
-                            "b1": float(row["b1"]),
-                            "c1": float(row["c1"]),
-                            "kdj_buy": bool(row["kdj_buy"]),
-                            "kdj_sell": bool(row["kdj_sell"]),
-                            "ql_buy": bool(row["ql_buy"]),
-                            "ql_sell": bool(row["ql_sell"]),
-                        }
-                    )
-
-            state_aux_sell_candidate = (not self.context.in_position) and self._should_emit_state_aux_sell(row)
-            if not just_bought and (self.context.in_position or bool(row["kdj_sell"] or row["ql_sell"]) or state_aux_sell_candidate):
-                if self.context.in_position and bool(row["kdj_sell"] or row["ql_sell"]):
-                    self.context.sell_signal_count += 1
-                    if bool(row["kdj_sell"]):
-                        self.context.kdj_sell_signal_count += 1
-                    if bool(row["ql_sell"]):
-                        self.context.ql_sell_signal_count += 1
-                    if float(row["b1"]) < 0:
-                        self.context.b1_negative_sell_count += 1
-                action, reason = self._sell_decision(row, prev_row)
-                if action == "SELL":
-                    trades.append(
-                        {
-                            "buy_date": self.context.entry_date.isoformat() if self.context.entry_date else "",
-                            "buy_price": self.context.entry_price,
-                            "buy_reason": self.context.entry_reason,
-                            "sell_date": row.name.date().isoformat(),
-                            "sell_price": float(row["close"]),
-                            "sell_reason": reason,
-                            "holding_days": self._holding_days(row.name.date()),
-                            "return_pct": (
-                                float(row["close"]) / self.context.entry_price - 1
-                                if self.context.entry_price
-                                else None
-                            ),
-                        }
-                    )
-                    self.context.first_exit_checked = True
-                    events.append(
-                        {
-                            "date": row.name.date().isoformat(),
-                            "side": "SELL",
-                            "layer": "real_trade",
-                            "reason": reason,
-                            "close": float(row["close"]),
-                            "a1": float(row["a1"]),
-                            "b1": float(row["b1"]),
-                            "c1": float(row["c1"]),
-                            "kdj_buy": bool(row["kdj_buy"]),
-                            "kdj_sell": bool(row["kdj_sell"]),
-                            "ql_buy": bool(row["ql_buy"]),
-                            "ql_sell": bool(row["ql_sell"]),
-                        }
-                    )
-                    self._post_real_sell(row, reason)
-                elif action == "AUX_SELL":
-                    if self.context.in_position:
-                        self.context.first_exit_checked = True
-                    self._post_aux_sell(row, reason)
-                    events.append(
-                        {
-                            "date": row.name.date().isoformat(),
-                            "side": "SELL",
-                            "layer": "aux_signal",
-                            "reason": reason,
-                            "close": float(row["close"]),
-                            "a1": float(row["a1"]),
-                            "b1": float(row["b1"]),
-                            "c1": float(row["c1"]),
-                            "kdj_buy": bool(row["kdj_buy"]),
-                            "kdj_sell": bool(row["kdj_sell"]),
-                            "ql_buy": bool(row["ql_buy"]),
-                            "ql_sell": bool(row["ql_sell"]),
-                        }
-                    )
-
-            prev_row = row
-
-        events_df = pd.DataFrame(events)
-        trades_df = pd.DataFrame(trades)
-
-        if not events_df.empty:
-            event_decisions = [
-                self._build_decision(
-                    side=str(row["side"]),
-                    layer=str(row["layer"]),
-                    reason=str(row["reason"]),
-                )
-                for _, row in events_df.iterrows()
-            ]
-            events_df["reason_layer"] = [d.reason.layer.value if d.reason is not None else "unknown" for d in event_decisions]
-            events_df["reason_family"] = [d.reason.family.value if d.reason is not None else "unknown" for d in event_decisions]
-            events_df["reason_code"] = [d.reason.code if d.reason is not None else "" for d in event_decisions]
-
-        if not trades_df.empty:
-            buy_meta = trades_df["buy_reason"].map(classify_entry_reason)
-            sell_meta = trades_df["sell_reason"].map(classify_exit_reason)
-            trades_df["buy_reason_layer"] = [meta.layer.value for meta in buy_meta]
-            trades_df["buy_reason_family"] = [meta.family.value for meta in buy_meta]
-            trades_df["buy_reason_code"] = [meta.code for meta in buy_meta]
-            trades_df["sell_reason_layer"] = [meta.layer.value for meta in sell_meta]
-            trades_df["sell_reason_family"] = [meta.family.value for meta in sell_meta]
-            trades_df["sell_reason_code"] = [meta.code for meta in sell_meta]
-
-        return events_df, trades_df
+        from dragon_execution_runtime import run_compat_execution
+
+        return run_compat_execution(self, df)
 
 
 def run_with_layered_engine(df: pd.DataFrame, config: Optional[StrategyConfig] = None) -> tuple[pd.DataFrame, pd.DataFrame]:

+ 28 - 0
research/dragon/v2/memory/2026-04-09.md

@@ -91,3 +91,31 @@
 - layer attribution remains `no unknown reason mapping`,
 - full test suite now `14` tests passed,
 - daily signal pipeline smoke (`--as-of 2026-04-08`) passed.
+- Continued OpenSpec execution with a third change pack:
+- `openspec/changes/execution-decoupling-rollout-governance/`
+- Added proposal/design/tasks/spec docs and acceptance summary for this change.
+- Implemented execution-level decoupling:
+- new module `dragon_execution_runtime.py`
+- `dragon_strategy.py` now delegates `run(...)` compatibility loop to the runtime module.
+- Implemented rollout-governance hardening:
+- new module `dragon_rollout_governance.py`
+- new checker `dragon_rollout_governance_check.py`
+- forward pipeline now auto-runs checker:
+- `dragon_forward_observation_pipeline.py` calls rollout checker before HTML refresh.
+- rollout checker now publishes explicit operational decision artifacts each run.
+- New governance artifacts now generated:
+- `dragon_rollout_state.json`
+- `dragon_rollout_governance_snapshot.csv`
+- `dragon_rollout_governance_report.md`
+- `dragon_rollout_rollback_runbook.md`
+- Added rollout governance tests:
+- `tests/test_rollout_governance.py`
+- Validation reruns after this wave:
+- `py -3 -m unittest discover -s tests -v` -> `17` tests passed.
+- `py -3 dragon_rc1_golden_baseline.py` -> core hashes unchanged.
+- `py -3 dragon_daily_signal_pipeline.py` -> passed.
+- `py -3 dragon_forward_observation_pipeline.py` -> passed.
+- latest rollout decision snapshot after validation:
+- decision `FORWARD_OK`
+- active branch `alpha_first_glued_refined_hot_cap`
+- fallback branch `alpha_first_selective_veto`

+ 3 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/.openspec.yaml

@@ -0,0 +1,3 @@
+schema: spec-driven
+created: 2026-04-09
+

+ 54 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/acceptance-summary.md

@@ -0,0 +1,54 @@
+# Execution Decoupling + Rollout Governance - Acceptance Summary
+
+Date: 2026-04-09
+
+## 1) Scope Executed
+
+- Added decoupled compatibility runtime module:
+  - `dragon_execution_runtime.py`
+- `DragonRuleEngine.run(...)` now delegates to runtime module (behavior-preserving intent).
+- Added rollout governance modules:
+  - `dragon_rollout_governance.py`
+  - `dragon_rollout_governance_check.py`
+- Integrated rollout checker into forward pipeline:
+  - `dragon_forward_observation_pipeline.py`
+
+## 2) Guardrails
+
+- RC1 golden baseline rerun completed.
+- Core hashes unchanged:
+  - events: `8965d1b539a998d7d0aff04432aa2a47cf30ee40df013b9d8b7eb66a3d50a331`
+  - trades: `1298be56b0898266b0b854d62a979c00c20b01629393c82bb8c804faf852cb97`
+- RC1 summary unchanged:
+  - `trade_count = 91`
+  - `event_count = 272`
+  - `win_rate = 52.75%`
+  - `avg_return = 3.42%`
+- Rule attribution audit remains clean:
+  - `dragon_rule_layer_attribution.md` reports `no unknown reason mapping`.
+
+## 3) Tests
+
+- Added:
+  - `tests/test_rollout_governance.py`
+- Validation:
+  - `py -3 -m unittest discover -s tests -v`
+  - result: `17` tests passed (`OK`).
+
+## 4) Pipeline Smoke
+
+- `py -3 dragon_daily_signal_pipeline.py` succeeded.
+- `py -3 dragon_forward_observation_pipeline.py` succeeded.
+
+## 5) New Governance Artifacts
+
+- `dragon_rollout_state.json`
+- `dragon_rollout_governance_snapshot.csv`
+- `dragon_rollout_governance_report.md`
+- `dragon_rollout_rollback_runbook.md`
+
+Latest decision snapshot after smoke run:
+
+- decision: `FORWARD_OK`
+- active_branch: `alpha_first_glued_refined_hot_cap`
+- fallback_branch: `alpha_first_selective_veto`

+ 54 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/design.md

@@ -0,0 +1,54 @@
+## Context
+
+`dragon/v2` already has RC1 golden guardrails, layered reason metadata, and weak-family predicate isolation. The remaining governance gaps are:
+
+- runtime loop is still monolithic inside the compatibility facade,
+- rollout health requires manually reading several files instead of one explicit decision state.
+
+## Goals / Non-Goals
+
+**Goals**
+- Decouple runtime execution loop from `dragon_strategy.py` without changing behavior.
+- Produce explicit rollout decision artifacts from existing monitor sources.
+- Keep RC1 compatibility outputs and schemas stable.
+
+**Non-Goals**
+- Retune RC1 thresholds.
+- Change branch trade paths in this change.
+- Replace existing monitor metrics or thresholds.
+
+## Decisions
+
+### 1. Runtime extraction without behavior rewrite
+
+Move execution-loop mechanics (event/trade payload assembly and reason-metadata enrichment) into a dedicated module, and keep `DragonRuleEngine` decision hooks intact.
+
+### 2. Gate-based rollout decision model
+
+Rollout decision uses deterministic gates across:
+- latest data freshness alignment,
+- monitor hard/warning conditions and streaks,
+- divergence level and refined-control alignment.
+
+### 3. Branch fallback policy
+
+When rollout decision is `ROLLBACK_REVIEW_REQUIRED`, recommended active branch becomes `alpha_first_selective_veto`; otherwise keep candidate branch from the daily manifest.
+
+## Risks / Trade-offs
+
+- [Runtime extraction drift] -> covered by RC1 golden regression and no-silent-path checks.
+- [Over-strict governance gates] -> start with warning-tier HOLD path before hard rollback.
+- [Artifact dependency missing] -> missing monitor health artifact is a hard-fail gate.
+
+## Migration Plan
+
+1. Add compatibility runtime module and delegate `DragonRuleEngine.run(...)`.
+2. Add rollout governance evaluation module and checker script.
+3. Integrate rollout checker into forward pipeline.
+4. Add tests for gate evaluation logic.
+5. Re-run golden, full tests, daily pipeline smoke, and forward pipeline smoke.
+
+## Rollback Plan
+
+- If golden hash or no-silent-path checks fail, revert runtime extraction commit.
+- If governance checker causes pipeline regression, disable checker integration and keep standalone execution.

+ 27 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/proposal.md

@@ -0,0 +1,27 @@
+## Why
+
+The previous OpenSpec rounds isolated weak-family predicates and added layered metadata, but runtime execution is still physically embedded in `dragon_strategy.py`, and rollout decisions still rely on manual interpretation of multiple monitoring files. This keeps engineering risk higher than needed for RC1 forward operations.
+
+## What Changes
+
+- Physically decouple the compatibility execution runtime loop from `dragon_strategy.py` into a dedicated runtime module while preserving behavior.
+- Add rollout governance gates that evaluate `dragon_daily_rc1_manifest.json`, `dragon_monitor_history.csv`, and `dragon_branch_divergence_log.csv` into one machine-readable decision.
+- Generate a rollback runbook and state artifact so forward operation has explicit freeze/fallback guidance.
+- Integrate rollout governance generation into the forward observation pipeline.
+
+## Capabilities
+
+### New Capabilities
+- `execution-runtime-decoupling`: Dedicated compatibility runtime module for event/trade loop execution.
+- `rollout-governance-gates`: Unified rollout gate evaluation with `FORWARD_OK` / `HOLD_AND_REVIEW` / `ROLLBACK_REVIEW_REQUIRED`.
+- `rollback-runbook-artifacts`: Persistent rollback procedure outputs tied to latest monitoring snapshot.
+
+### Modified Capabilities
+- `dragon-strategy-compatibility-facade`: Delegates runtime execution to a decoupled module.
+- `forward-observation-governance`: Automatically regenerates rollout governance artifacts on each forward run.
+
+## Impact
+
+- Affected code: `dragon_strategy.py`, new execution runtime module, forward pipeline, rollout governance checker.
+- Affected artifacts: `dragon_rollout_state.json`, `dragon_rollout_governance_snapshot.csv`, `dragon_rollout_governance_report.md`, `dragon_rollout_rollback_runbook.md`.
+- Expected runtime impact: no intended RC1 path change; governance outputs become deterministic and reproducible.

+ 17 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/specs/execution-runtime-decoupling/spec.md

@@ -0,0 +1,17 @@
+## ADDED Requirements
+
+### Requirement: Compatibility execution runtime SHALL be physically decoupled
+The strategy compatibility execution loop SHALL be implemented in a dedicated runtime module and called by `DragonRuleEngine.run(...)` instead of remaining fully inlined inside `dragon_strategy.py`.
+
+#### Scenario: Strategy run executes through runtime module
+- **WHEN** `DragonRuleEngine.run(...)` is called on the RC1 dataset
+- **THEN** execution is delegated to the decoupled runtime module
+- **AND** output schema remains compatible with existing event/trade consumers
+
+### Requirement: Runtime extraction SHALL preserve RC1 behavior
+Execution-runtime extraction SHALL keep RC1 golden-core hashes and release-window summary metrics unchanged unless an explicit behavior-change proposal is approved.
+
+#### Scenario: Runtime extraction merged
+- **WHEN** golden regression is executed after extraction
+- **THEN** core hashes and summary metrics match the approved RC1 manifest
+

+ 24 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/specs/rollout-governance-gates/spec.md

@@ -0,0 +1,24 @@
+## ADDED Requirements
+
+### Requirement: Rollout governance SHALL produce a deterministic decision artifact
+Rollout governance SHALL evaluate daily manifest + monitor history + divergence log and emit one explicit decision state: `FORWARD_OK`, `HOLD_AND_REVIEW`, or `ROLLBACK_REVIEW_REQUIRED`.
+
+#### Scenario: Governance checker runs after forward pipeline
+- **WHEN** rollout checker is executed on latest artifacts
+- **THEN** it writes a machine-readable decision artifact with gate results and active/fallback branch guidance
+
+### Requirement: Hard-fail gate conditions SHALL force rollback review state
+If hard-fail gate conditions are met (including hard breaches, missing critical artifacts, or review-required divergence), governance decision SHALL be `ROLLBACK_REVIEW_REQUIRED`.
+
+#### Scenario: Hard breach appears in latest monitor snapshot
+- **WHEN** latest monitor row includes `hard_breach` or `missing_data`
+- **THEN** decision becomes `ROLLBACK_REVIEW_REQUIRED`
+- **AND** recommended active branch switches to control fallback branch
+
+### Requirement: Rollout checker SHALL publish rollback runbook output
+Each governance run SHALL output a rollback runbook artifact linked to the latest decision state.
+
+#### Scenario: Governance checker finishes
+- **WHEN** checker writes decision and gate snapshot
+- **THEN** rollback runbook is generated with trigger conditions and recovery checklist
+

+ 24 - 0
research/dragon/v2/openspec/changes/execution-decoupling-rollout-governance/tasks.md

@@ -0,0 +1,24 @@
+## 1. Execution Runtime Decoupling
+
+- [x] 1.1 Add `dragon_execution_runtime.py` with compatibility event/trade loop and metadata enrichment.
+- [x] 1.2 Delegate `DragonRuleEngine.run(...)` to runtime module without changing engine hooks.
+- [x] 1.3 Keep layered engine compatibility behavior unchanged.
+
+## 2. Rollout Governance Hardening
+
+- [x] 2.1 Add `dragon_rollout_governance.py` for gate evaluation and decision constants.
+- [x] 2.2 Add `dragon_rollout_governance_check.py` to generate state/report/runbook artifacts.
+- [x] 2.3 Integrate governance checker into `dragon_forward_observation_pipeline.py`.
+- [x] 2.4 Publish rollout decision artifacts for operational consumption (`dragon_rollout_state.json` + markdown report).
+
+## 3. Tests And Validation
+
+- [x] 3.1 Add rollout governance unit tests.
+- [x] 3.2 Re-run full unittest suite.
+- [x] 3.3 Re-run RC1 golden baseline guardrail.
+- [x] 3.4 Re-run daily signal pipeline smoke.
+- [x] 3.5 Re-run forward observation pipeline smoke.
+
+## 4. Acceptance
+
+- [x] 4.1 Publish acceptance summary in this OpenSpec change folder.

+ 108 - 0
research/dragon/v2/tests/test_rollout_governance.py

@@ -0,0 +1,108 @@
+from __future__ import annotations
+
+import unittest
+
+import pandas as pd
+
+from dragon_rollout_governance import (
+    FORWARD_OK,
+    HOLD_AND_REVIEW,
+    ROLLBACK_REVIEW_REQUIRED,
+    evaluate_rollout,
+)
+
+
+class TestRolloutGovernance(unittest.TestCase):
+    def _manifest(self, latest_bar_date: str = "2026-04-08") -> dict[str, object]:
+        return {
+            "release_version": "RC1",
+            "branch": "alpha_first_glued_refined_hot_cap",
+            "as_of_request_date": latest_bar_date,
+            "latest_bar_date": latest_bar_date,
+        }
+
+    def _monitor_history(self, latest_bar_date: str = "2026-04-08", status: str = "ok") -> pd.DataFrame:
+        return pd.DataFrame(
+            [
+                {
+                    "latest_bar_date": latest_bar_date,
+                    "metric": "next_open_avg_return_delta_vs_control",
+                    "actual_value": 0.01,
+                    "status": status,
+                    "warning_streak": 0 if status == "ok" else 3,
+                    "hard_breach_streak": 0 if status != "hard_breach" else 1,
+                }
+            ]
+        )
+
+    def _divergence_log(self, latest_bar_date: str = "2026-04-08", divergence_level: str = "none") -> pd.DataFrame:
+        return pd.DataFrame(
+            [
+                {
+                    "latest_bar_date": latest_bar_date,
+                    "divergence_level": divergence_level,
+                    "same_position_flag": True,
+                    "same_latest_real_event_flag": True,
+                }
+            ]
+        )
+
+    def test_forward_ok_when_all_gates_pass(self) -> None:
+        decision, gates, reasons, active_branch = evaluate_rollout(
+            manifest=self._manifest(),
+            monitor_history=self._monitor_history(),
+            divergence_log=self._divergence_log(),
+            monitor_health_report_exists=True,
+        )
+        self.assertEqual(decision, FORWARD_OK)
+        self.assertEqual(reasons, [])
+        self.assertEqual(active_branch, "alpha_first_glued_refined_hot_cap")
+        self.assertTrue(all(g.status == "ok" for g in gates))
+
+    def test_hard_breach_requires_rollback_review(self) -> None:
+        decision, gates, reasons, active_branch = evaluate_rollout(
+            manifest=self._manifest(),
+            monitor_history=self._monitor_history(status="hard_breach"),
+            divergence_log=self._divergence_log(),
+            monitor_health_report_exists=True,
+        )
+        self.assertEqual(decision, ROLLBACK_REVIEW_REQUIRED)
+        self.assertTrue(any("hard_breach_budget" in reason for reason in reasons))
+        self.assertEqual(active_branch, "alpha_first_selective_veto")
+        self.assertTrue(any(g.status == "hard_fail" for g in gates))
+
+    def test_material_divergence_or_warning_moves_to_hold(self) -> None:
+        monitor = pd.DataFrame(
+            [
+                {
+                    "latest_bar_date": "2026-04-08",
+                    "metric": "next_open_avg_return_delta_vs_control",
+                    "actual_value": 0.01,
+                    "status": "warning",
+                    "warning_streak": 1,
+                    "hard_breach_streak": 0,
+                },
+                {
+                    "latest_bar_date": "2026-04-08",
+                    "metric": "next_open_profit_factor_delta_vs_control",
+                    "actual_value": 0.20,
+                    "status": "warning",
+                    "warning_streak": 1,
+                    "hard_breach_streak": 0,
+                },
+            ]
+        )
+        decision, gates, reasons, _ = evaluate_rollout(
+            manifest=self._manifest(),
+            monitor_history=monitor,
+            divergence_log=self._divergence_log(divergence_level="material"),
+            monitor_health_report_exists=True,
+        )
+        self.assertEqual(decision, HOLD_AND_REVIEW)
+        self.assertTrue(any(g.status == "warning" for g in gates))
+        self.assertTrue(any(reason.startswith("warning_budget") or reason.startswith("branch_divergence_level") for reason in reasons))
+
+
+if __name__ == "__main__":
+    unittest.main()
+