| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- from __future__ import annotations
- from datetime import datetime
- import json
- from pathlib import Path
- import pandas as pd
- import dragon_daily_signal_pipeline as daily
- import dragon_html_reports as html_reports
- import dragon_rollout_governance_check as rollout_governance
- def _load_csv(path: Path) -> pd.DataFrame:
- if not path.exists():
- return pd.DataFrame()
- return pd.read_csv(path, encoding="utf-8-sig")
- def _write_csv(df: pd.DataFrame, path: Path) -> None:
- df.to_csv(path, index=False, encoding="utf-8-sig")
- def _append_dedup(existing: pd.DataFrame, new_rows: pd.DataFrame, key_cols: list[str]) -> pd.DataFrame:
- if existing.empty:
- out = new_rows.copy()
- else:
- out = pd.concat([existing, new_rows], ignore_index=True, sort=False)
- out = out.drop_duplicates(subset=key_cols, keep="last")
- return out.sort_values(key_cols).reset_index(drop=True)
- def _json_default(value: object) -> object:
- if isinstance(value, (pd.Timestamp, datetime)):
- return value.isoformat()
- if pd.isna(value):
- return None
- return value
- def _load_state(path: Path) -> dict[str, object]:
- if not path.exists():
- return {}
- return json.loads(path.read_text(encoding="utf-8"))
- def _build_monitor_summary(monitor_df: pd.DataFrame) -> dict[str, object]:
- return {
- "warning_count": int((monitor_df["status"] == "warning").sum()),
- "missing_data_count": int((monitor_df["status"] == "missing_data").sum()),
- "hard_breach_count": int(monitor_df["status"].isin(["hard_breach", "missing_data"]).sum()),
- "warning_metrics": monitor_df.loc[monitor_df["status"] == "warning", "metric"].tolist(),
- "missing_data_metrics": monitor_df.loc[monitor_df["status"] == "missing_data", "metric"].tolist(),
- "hard_breach_metrics": monitor_df.loc[monitor_df["status"].isin(["hard_breach", "missing_data"]), "metric"].tolist(),
- }
- def _branch_event_signature(row: pd.Series) -> str:
- return f"{row['latest_real_event_date']}|{row['latest_real_event_side']}|{row['latest_real_event_reason']}"
- def _build_signal_changes(
- branch_status: pd.DataFrame,
- prev_state: dict[str, object],
- latest_bar_date: str,
- monitor_summary: dict[str, object],
- ) -> pd.DataFrame:
- prev_branches = prev_state.get("branches", {}) if isinstance(prev_state, dict) else {}
- prev_monitor = prev_state.get("monitor_summary", {}) if isinstance(prev_state, dict) else {}
- rows: list[dict[str, object]] = []
- for _, row in branch_status.iterrows():
- branch = str(row["branch"])
- prev = prev_branches.get(branch, {})
- indicator_context = (
- f"close={float(row['latest_close']):.3f},"
- f"a1={float(row['latest_a1']):.4f},"
- f"b1={float(row['latest_b1']):.4f},"
- f"c1={float(row['latest_c1']):.2f}"
- )
- event_context = str(row["events_today"]) if isinstance(row["events_today"], str) and row["events_today"] else "none"
- if not prev:
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": branch,
- "change_type": "branch_initialized",
- "old_value": "",
- "new_value": _branch_event_signature(row),
- "reason": "first forward-observation snapshot for this branch",
- "event_context": event_context,
- "indicator_context": indicator_context,
- }
- )
- continue
- if bool(prev.get("in_position")) != bool(row["in_position"]):
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": branch,
- "change_type": "position_changed",
- "old_value": str(prev.get("in_position")),
- "new_value": str(bool(row["in_position"])),
- "reason": "position state changed between forward snapshots",
- "event_context": event_context,
- "indicator_context": indicator_context,
- }
- )
- prev_sig = f"{prev.get('latest_real_event_date', '')}|{prev.get('latest_real_event_side', '')}|{prev.get('latest_real_event_reason', '')}"
- new_sig = _branch_event_signature(row)
- if prev_sig != new_sig:
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": branch,
- "change_type": "latest_real_event_changed",
- "old_value": prev_sig,
- "new_value": new_sig,
- "reason": "latest real-trade event changed",
- "event_context": event_context,
- "indicator_context": indicator_context,
- }
- )
- if int(row["events_today_count"]) > 0:
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": branch,
- "change_type": "new_event_on_latest_bar",
- "old_value": "",
- "new_value": str(row["events_today"]),
- "reason": "new signal fired on the latest market bar",
- "event_context": event_context,
- "indicator_context": indicator_context,
- }
- )
- prev_warn = int(prev_monitor.get("warning_count", 0)) if isinstance(prev_monitor, dict) else 0
- prev_hard = int(prev_monitor.get("hard_breach_count", 0)) if isinstance(prev_monitor, dict) else 0
- prev_missing = int(prev_monitor.get("missing_data_count", 0)) if isinstance(prev_monitor, dict) else 0
- warn = int(monitor_summary["warning_count"])
- hard = int(monitor_summary["hard_breach_count"])
- missing = int(monitor_summary["missing_data_count"])
- if warn != prev_warn:
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": "system",
- "change_type": "monitor_warning_count_changed",
- "old_value": str(prev_warn),
- "new_value": str(warn),
- "reason": "warning count changed versus prior forward snapshot",
- "event_context": "",
- "indicator_context": "",
- }
- )
- if hard != prev_hard:
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": "system",
- "change_type": "monitor_hard_breach_count_changed",
- "old_value": str(prev_hard),
- "new_value": str(hard),
- "reason": "hard breach count changed versus prior forward snapshot",
- "event_context": "",
- "indicator_context": "",
- }
- )
- if missing != prev_missing:
- rows.append(
- {
- "latest_bar_date": latest_bar_date,
- "branch": "system",
- "change_type": "monitor_missing_data_count_changed",
- "old_value": str(prev_missing),
- "new_value": str(missing),
- "reason": "missing-data metric count changed versus prior forward snapshot",
- "event_context": "",
- "indicator_context": "",
- }
- )
- return pd.DataFrame(rows)
- def _divergence_level(row: dict[str, object]) -> str:
- if int(row["hard_breach_count"]) > 0:
- return "review_required"
- if not bool(row["same_position_flag"]):
- return "review_required"
- if not bool(row["same_latest_real_event_flag"]):
- return "material"
- if int(row["warning_count"]) > 0:
- return "mild"
- return "none"
- def _update_monitor_history(existing: pd.DataFrame, current: pd.DataFrame, latest_bar_date: str) -> pd.DataFrame:
- current = current.copy()
- current["latest_bar_date"] = latest_bar_date
- cols = [
- "latest_bar_date",
- "metric",
- "actual_value",
- "status",
- "warning_threshold",
- "hard_threshold",
- "scope",
- "cadence",
- "action_on_warning",
- "action_on_hard_breach",
- "rationale",
- ]
- current = current[cols]
- history = _append_dedup(existing, current, ["latest_bar_date", "metric"])
- history["warning_streak"] = 0
- history["hard_breach_streak"] = 0
- for metric, idx in history.groupby("metric", sort=False).groups.items():
- subset = history.loc[list(idx)].sort_values("latest_bar_date")
- warn = 0
- hard = 0
- for row_idx, row in subset.iterrows():
- warn = warn + 1 if row["status"] == "warning" else 0
- hard = hard + 1 if row["status"] in {"hard_breach", "missing_data"} else 0
- history.at[row_idx, "warning_streak"] = warn
- history.at[row_idx, "hard_breach_streak"] = hard
- return history.sort_values(["latest_bar_date", "metric"]).reset_index(drop=True)
- def _build_weekly_summary(observation_log: pd.DataFrame, change_log: pd.DataFrame, divergence_log: pd.DataFrame, monitor_history: pd.DataFrame) -> pd.DataFrame:
- if observation_log.empty:
- return pd.DataFrame()
- unique_dates = sorted(observation_log["latest_bar_date"].unique())[-5:]
- obs = observation_log[observation_log["latest_bar_date"].isin(unique_dates)].copy()
- changes = change_log[change_log["latest_bar_date"].isin(unique_dates)].copy() if not change_log.empty else pd.DataFrame()
- divergence = divergence_log[divergence_log["latest_bar_date"].isin(unique_dates)].copy() if not divergence_log.empty else pd.DataFrame()
- monitor = monitor_history[monitor_history["latest_bar_date"].isin(unique_dates)].copy() if not monitor_history.empty else pd.DataFrame()
- rows: list[dict[str, object]] = []
- for branch, group in obs.groupby("branch"):
- rows.append(
- {
- "window_start": unique_dates[0],
- "window_end": unique_dates[-1],
- "observation_days": int(len(group)),
- "branch": branch,
- "days_in_position": int(group["in_position"].astype(bool).sum()),
- "latest_real_event_changed_count": int(
- 0
- if changes.empty
- else len(changes[(changes["branch"] == branch) & (changes["change_type"] == "latest_real_event_changed")])
- ),
- "new_event_days": int(group["events_today_count"].fillna(0).astype(int).gt(0).sum()),
- "warning_days": 0,
- "hard_breach_days": 0,
- "material_divergence_days": 0,
- }
- )
- rows.append(
- {
- "window_start": unique_dates[0],
- "window_end": unique_dates[-1],
- "observation_days": int(len(unique_dates)),
- "branch": "system_monitor",
- "days_in_position": 0,
- "latest_real_event_changed_count": 0,
- "new_event_days": 0,
- "warning_days": int(0 if monitor.empty else monitor[monitor["status"] == "warning"]["latest_bar_date"].nunique()),
- "hard_breach_days": int(0 if monitor.empty else monitor[monitor["status"] == "hard_breach"]["latest_bar_date"].nunique()),
- "material_divergence_days": int(
- 0 if divergence.empty else divergence["divergence_level"].isin(["material", "review_required"]).sum()
- ),
- }
- )
- return pd.DataFrame(rows)
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- forward_dir = base_dir / "forward_reports"
- forward_dir.mkdir(exist_ok=True)
- daily.main()
- branch_status = _load_csv(base_dir / "dragon_daily_branch_status.csv")
- monitor_snapshot = _load_csv(base_dir / "dragon_daily_monitor_snapshot.csv")
- manifest_path = base_dir / "dragon_daily_rc1_manifest.json"
- manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
- latest_bar_date = str(manifest["latest_bar_date"])
- request_date = str(manifest["as_of_request_date"])
- run_ts = datetime.now().isoformat(timespec="seconds")
- monitor_summary = _build_monitor_summary(monitor_snapshot)
- observation_rows = branch_status.copy()
- observation_rows["run_timestamp"] = run_ts
- observation_rows["request_date"] = request_date
- observation_rows["latest_bar_date"] = latest_bar_date
- observation_rows["monitor_warning_count"] = int(monitor_summary["warning_count"])
- observation_rows["monitor_hard_breach_count"] = int(monitor_summary["hard_breach_count"])
- observation_rows["monitor_missing_data_count"] = int(monitor_summary["missing_data_count"])
- observation_rows["latest_real_event"] = observation_rows.apply(_branch_event_signature, axis=1)
- observation_cols = [
- "run_timestamp",
- "request_date",
- "latest_bar_date",
- "branch",
- "in_position",
- "latest_real_event_date",
- "latest_real_event_side",
- "latest_real_event_reason",
- "latest_real_event",
- "events_today_count",
- "events_today",
- "latest_close",
- "latest_a1",
- "latest_b1",
- "latest_c1",
- "open_entry_date",
- "open_entry_reason",
- "open_holding_days",
- "open_return_pct",
- "monitor_warning_count",
- "monitor_hard_breach_count",
- "monitor_missing_data_count",
- ]
- observation_rows = observation_rows[observation_cols]
- observation_log_path = base_dir / "dragon_forward_observation_log.csv"
- existing_obs = _load_csv(observation_log_path)
- observation_log = _append_dedup(existing_obs, observation_rows, ["latest_bar_date", "branch"])
- _write_csv(observation_log, observation_log_path)
- state_path = base_dir / "dragon_forward_observation_state.json"
- prev_state = _load_state(state_path)
- change_log_path = base_dir / "dragon_signal_change_log.csv"
- existing_changes = _load_csv(change_log_path)
- change_rows = _build_signal_changes(branch_status, prev_state, latest_bar_date, monitor_summary)
- if not change_rows.empty:
- change_log = _append_dedup(existing_changes, change_rows, ["latest_bar_date", "branch", "change_type", "new_value"])
- else:
- change_log = existing_changes
- _write_csv(change_log, change_log_path)
- refined = branch_status[branch_status["branch"] == "alpha_first_glued_refined_hot_cap"].iloc[0]
- control = branch_status[branch_status["branch"] == "alpha_first_selective_veto"].iloc[0]
- actuals = {
- row["metric"]: row["actual_value"]
- for _, row in monitor_snapshot.iterrows()
- if pd.notna(row["actual_value"])
- }
- divergence_row = {
- "latest_bar_date": latest_bar_date,
- "request_date": request_date,
- "same_position_flag": bool(refined["in_position"]) == bool(control["in_position"]),
- "same_latest_real_event_flag": _branch_event_signature(refined) == _branch_event_signature(control),
- "refined_in_position": bool(refined["in_position"]),
- "control_in_position": bool(control["in_position"]),
- "refined_latest_event": _branch_event_signature(refined),
- "control_latest_event": _branch_event_signature(control),
- "next_open_avg_return_delta": float(actuals.get("next_open_avg_return_delta_vs_control", float("nan"))),
- "next_open_pf_delta": float(actuals.get("next_open_profit_factor_delta_vs_control", float("nan"))),
- "next_open_max_drawdown_refined": float(actuals.get("next_open_max_drawdown", float("nan"))),
- "next_open_max_loss_streak_refined": int(actuals.get("next_open_max_loss_streak", 0)),
- "warning_count": int(monitor_summary["warning_count"]),
- "hard_breach_count": int(monitor_summary["hard_breach_count"]),
- "missing_data_count": int(monitor_summary["missing_data_count"]),
- }
- divergence_row["divergence_level"] = _divergence_level(divergence_row)
- divergence_df = pd.DataFrame([divergence_row])
- divergence_log_path = base_dir / "dragon_branch_divergence_log.csv"
- existing_div = _load_csv(divergence_log_path)
- divergence_log = _append_dedup(existing_div, divergence_df, ["latest_bar_date"])
- _write_csv(divergence_log, divergence_log_path)
- monitor_history_path = base_dir / "dragon_monitor_history.csv"
- existing_monitor = _load_csv(monitor_history_path)
- monitor_history = _update_monitor_history(existing_monitor, monitor_snapshot, latest_bar_date)
- _write_csv(monitor_history, monitor_history_path)
- weekly_summary = _build_weekly_summary(observation_log, change_log, divergence_log, monitor_history)
- weekly_summary_path = base_dir / "dragon_forward_weekly_summary.csv"
- _write_csv(weekly_summary, weekly_summary_path)
- latest_changes = change_log[change_log["latest_bar_date"] == latest_bar_date].copy() if not change_log.empty else pd.DataFrame()
- change_lines = [
- "# Dragon Signal Change Review",
- "",
- f"- latest_bar_date: `{latest_bar_date}`",
- f"- change_count: `{len(latest_changes)}`",
- "",
- ]
- if latest_changes.empty:
- change_lines.append("- No state-change record was generated for the latest bar.")
- else:
- for _, row in latest_changes.iterrows():
- change_lines.extend(
- [
- f"## {row['branch']} / {row['change_type']}",
- f"- old: `{row['old_value']}`",
- f"- new: `{row['new_value']}`",
- f"- reason: {row['reason']}",
- f"- event_context: `{row['event_context']}`",
- f"- indicator_context: `{row['indicator_context']}`",
- "",
- ]
- )
- (base_dir / "dragon_signal_change_review.md").write_text("\n".join(change_lines) + "\n", encoding="utf-8")
- div_lines = [
- "# Dragon Branch Divergence Report",
- "",
- f"- latest_bar_date: `{latest_bar_date}`",
- f"- divergence_level: `{divergence_row['divergence_level']}`",
- f"- same_position_flag: `{divergence_row['same_position_flag']}`",
- f"- same_latest_real_event_flag: `{divergence_row['same_latest_real_event_flag']}`",
- f"- refined_latest_event: `{divergence_row['refined_latest_event']}`",
- f"- control_latest_event: `{divergence_row['control_latest_event']}`",
- f"- next_open_avg_return_delta: `{daily._format_pct(float(divergence_row['next_open_avg_return_delta']))}`",
- f"- next_open_pf_delta: `{daily._format_num(float(divergence_row['next_open_pf_delta']))}`",
- f"- warning_count: `{divergence_row['warning_count']}`",
- f"- hard_breach_count: `{divergence_row['hard_breach_count']}`",
- f"- missing_data_count: `{divergence_row['missing_data_count']}`",
- "",
- "## Recent Log",
- ]
- for _, row in divergence_log.tail(10).iterrows():
- div_lines.append(
- f"- `{row['latest_bar_date']}`: level `{row['divergence_level']}`, same_position `{row['same_position_flag']}`, same_event `{row['same_latest_real_event_flag']}`"
- )
- (base_dir / "dragon_branch_divergence_report.md").write_text("\n".join(div_lines) + "\n", encoding="utf-8")
- health_lines = [
- "# Dragon Monitor Health Report",
- "",
- f"- latest_bar_date: `{latest_bar_date}`",
- f"- warning_count: `{monitor_summary['warning_count']}`",
- f"- hard_breach_count: `{monitor_summary['hard_breach_count']}`",
- f"- missing_data_count: `{monitor_summary['missing_data_count']}`",
- "",
- "## Latest Metrics",
- ]
- latest_monitor = monitor_history[monitor_history["latest_bar_date"] == latest_bar_date].copy()
- for _, row in latest_monitor.iterrows():
- health_lines.append(
- f"- `{row['metric']}`: actual `{row['actual_value']}` | status `{row['status']}` | warning_streak `{int(row['warning_streak'])}` | hard_breach_streak `{int(row['hard_breach_streak'])}`"
- )
- (base_dir / "dragon_monitor_health_report.md").write_text("\n".join(health_lines) + "\n", encoding="utf-8")
- weekly_lines = [
- "# Dragon Forward Weekly Review",
- "",
- f"- latest_window_end: `{latest_bar_date}`",
- "",
- ]
- if weekly_summary.empty:
- weekly_lines.append("- Weekly summary is empty.")
- else:
- for _, row in weekly_summary.iterrows():
- weekly_lines.extend(
- [
- f"## {row['branch']}",
- f"- window: `{row['window_start']}` -> `{row['window_end']}`",
- f"- observation_days: `{int(row['observation_days'])}`",
- f"- days_in_position: `{int(row['days_in_position'])}`",
- f"- latest_real_event_changed_count: `{int(row['latest_real_event_changed_count'])}`",
- f"- new_event_days: `{int(row['new_event_days'])}`",
- f"- warning_days: `{int(row['warning_days'])}`",
- f"- hard_breach_days: `{int(row['hard_breach_days'])}`",
- f"- material_divergence_days: `{int(row['material_divergence_days'])}`",
- "",
- ]
- )
- (base_dir / "dragon_forward_weekly_review.md").write_text("\n".join(weekly_lines) + "\n", encoding="utf-8")
- state_payload = {
- "last_run_timestamp": run_ts,
- "request_date": request_date,
- "latest_bar_date": latest_bar_date,
- "branches": {
- str(row["branch"]): {
- key: _json_default(row[key])
- for key in branch_status.columns
- }
- for _, row in branch_status.iterrows()
- },
- "monitor_summary": monitor_summary,
- "divergence": {key: _json_default(value) for key, value in divergence_row.items()},
- }
- state_path.write_text(json.dumps(state_payload, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
- archive_paths = [
- observation_log_path,
- change_log_path,
- divergence_log_path,
- monitor_history_path,
- weekly_summary_path,
- ]
- for path in archive_paths:
- archived = forward_dir / f"{path.stem}_{latest_bar_date}{path.suffix}"
- if path.exists():
- path_df = _load_csv(path)
- _write_csv(path_df, archived)
- rollout_governance.main()
- html_reports.main()
- if __name__ == "__main__":
- main()
|