| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515 |
- from __future__ import annotations
- import argparse
- import json
- from pathlib import Path
- import sys
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- import pandas as pd
- from backtest.engine import compute_metrics
- from config.loader import load_config
- from data.io import load_point_in_time_panel
- from pipelines.regime_lite_support import (
- LITE_EXECUTION_PROFILES,
- LITE_POST_PROMOTION_REVIEW_DECISIONS,
- LITE_POST_PROMOTION_REVIEW_WINDOWS,
- apply_entry_specific_exit_overlay,
- build_empty_post_promotion_review,
- build_governance_context,
- build_post_promotion_review_window,
- evaluate_post_promotion_review,
- evaluate_runtime_health,
- executed_exposure_by_timing,
- resolve_execution_profile,
- resolve_rollback_reference_profile_id,
- run_backtest_with_execution,
- )
- def _safe_series(df: pd.DataFrame, column: str, default: float) -> pd.Series:
- if column not in df.columns:
- return pd.Series(default, index=df.index, dtype=float)
- return pd.to_numeric(df[column], errors='coerce').fillna(default)
- def _compute_lite_signals(df: pd.DataFrame) -> pd.DataFrame:
- out = df.copy()
- close = _safe_series(out, 'close', 0.0)
- ret_1 = close.pct_change().fillna(0.0)
- ma_20 = close.rolling(20, min_periods=5).mean()
- trend_score = close / ma_20 - 1.0
- stress_raw = ret_1.rolling(20, min_periods=5).std().fillna(0.0)
- stress_base = stress_raw.expanding(min_periods=20).median().replace(0.0, pd.NA).ffill()
- stress_score = (stress_raw / stress_base).fillna(1.0)
- breadth_20 = _safe_series(out, 'pct_constituents_above_20dma', 0.5)
- breadth_proxy = breadth_20 - 0.5
- drawdown_20 = close / close.rolling(20, min_periods=5).max() - 1.0
- out['trend_score_lite'] = trend_score.fillna(0.0)
- out['stress_score_lite'] = stress_score
- out['breadth_proxy_lite'] = breadth_proxy.fillna(0.0)
- out['drawdown_20_lite'] = drawdown_20.fillna(0.0)
- return out
- def _classify_state(
- row: pd.Series,
- *,
- risk_off_drawdown: float,
- risk_off_stress: float,
- trend_score_min: float,
- trend_breadth_min: float,
- trend_stress_max: float,
- ) -> str:
- if row['drawdown_20_lite'] <= risk_off_drawdown or row['stress_score_lite'] >= risk_off_stress:
- return 'risk_off'
- if (
- row['trend_score_lite'] >= trend_score_min
- and row['breadth_proxy_lite'] >= trend_breadth_min
- and row['stress_score_lite'] <= trend_stress_max
- ):
- return 'trend'
- return 'chop'
- def _bounded_targets(base_exposure: pd.Series, *, max_step: float, min_exposure: float = 0.0, max_exposure: float = 1.0) -> pd.Series:
- targets: list[float] = []
- prev = 0.0
- for value in base_exposure.fillna(0.0).astype(float):
- lower = max(min_exposure, prev - max_step)
- upper = min(max_exposure, prev + max_step)
- bounded = min(max(value, lower), upper)
- targets.append(float(bounded))
- prev = float(bounded)
- return pd.Series(targets, index=base_exposure.index, dtype=float)
- def _trend_reentry_speed(ledger: pd.DataFrame, threshold: float = 0.60) -> dict[str, float | None]:
- state = ledger['state'].fillna('chop').astype(str)
- executed = pd.to_numeric(ledger['executed_exposure'], errors='coerce').fillna(0.0)
- entry_mask = (state == 'trend') & (state.shift(1).fillna('chop') != 'trend')
- entry_positions = [int(pos) for pos in range(len(ledger)) if bool(entry_mask.iloc[pos])]
- if not entry_positions:
- return {
- 'trend_entry_event_count': 0,
- 'trend_entry_reached_count': 0,
- 'avg_days_to_reach_exposure_after_trend_entry': None,
- }
- delays: list[int] = []
- for start_pos in entry_positions:
- reached = executed.iloc[start_pos:]
- reached = reached[reached >= float(threshold)]
- if reached.empty:
- continue
- delays.append(int(reached.index.get_loc(reached.index[0])) + 0)
- delays[-1] = int(ledger.index.get_loc(reached.index[0])) - start_pos
- avg_delay = float(sum(delays) / len(delays)) if delays else None
- return {
- 'trend_entry_event_count': int(len(entry_positions)),
- 'trend_entry_reached_count': int(len(delays)),
- 'avg_days_to_reach_exposure_after_trend_entry': avg_delay,
- }
- def _window_info(df: pd.DataFrame, *, label: str | None = None) -> dict[str, Any]:
- info = {
- 'row_count': int(len(df)),
- 'date_start': df.index.min().date().isoformat() if len(df) else None,
- 'date_end': df.index.max().date().isoformat() if len(df) else None,
- }
- if label is not None:
- info['label'] = str(label)
- return info
- def _slice_review_windows(ledger: pd.DataFrame, window_sizes: tuple[int, ...]) -> list[dict[str, Any]]:
- windows: list[dict[str, Any]] = []
- for size in window_sizes:
- if len(ledger) < int(size):
- continue
- window_df = ledger.tail(int(size))
- windows.append({'label': f'recent_{int(size)}d', 'data': window_df, 'window': _window_info(window_df, label=f'recent_{int(size)}d')})
- return windows
- def _metrics_from_window(window_df: pd.DataFrame, config: dict[str, Any]) -> dict[str, Any]:
- metrics = compute_metrics(
- strategy_returns=window_df['strategy_return_net'],
- benchmark_returns=window_df['asset_exec_return'],
- turnover=window_df['turnover'],
- tracking_difference=window_df['tracking_difference'],
- annualization=int((config or {}).get('trading', {}).get('annualization', 252)),
- )
- return {
- **{k: float(v) for k, v in metrics.items()},
- **_trend_reentry_speed(window_df),
- }
- def _build_state_conditioned_view(
- active_window_df: pd.DataFrame,
- rollback_window_df: pd.DataFrame,
- config: dict[str, Any],
- *,
- window_label: str,
- ) -> dict[str, Any]:
- states = [str(state) for state in active_window_df['state'].dropna().astype(str).unique().tolist()]
- states = [state for state in ('risk_off', 'chop', 'trend') if state in states]
- if not states:
- return {
- 'view_name': 'state_conditioned_view',
- 'status': 'not_run',
- 'basis': 'lite_states_present_in_recent_primary_window',
- 'window_label': window_label,
- 'segments': [],
- }
- segments: list[dict[str, Any]] = []
- for state in states:
- active_state_df = active_window_df.loc[active_window_df['state'].astype(str) == state]
- rollback_state_df = rollback_window_df.loc[rollback_window_df['state'].astype(str) == state]
- if active_state_df.empty or rollback_state_df.empty:
- continue
- active_metrics = _metrics_from_window(active_state_df, config)
- rollback_metrics = _metrics_from_window(rollback_state_df, config)
- segments.append(
- {
- 'state': state,
- 'row_count': int(len(active_state_df)),
- 'active_metrics': active_metrics,
- 'rollback_reference_metrics': rollback_metrics,
- 'delta_vs_rollback_reference': {
- 'annual_return': (
- active_metrics['annual_return'] - rollback_metrics['annual_return']
- if active_metrics.get('annual_return') is not None and rollback_metrics.get('annual_return') is not None
- else None
- ),
- 'max_drawdown': (
- active_metrics['max_drawdown'] - rollback_metrics['max_drawdown']
- if active_metrics.get('max_drawdown') is not None and rollback_metrics.get('max_drawdown') is not None
- else None
- ),
- 'annual_turnover': (
- active_metrics['annual_turnover'] - rollback_metrics['annual_turnover']
- if active_metrics.get('annual_turnover') is not None and rollback_metrics.get('annual_turnover') is not None
- else None
- ),
- 'avg_days_to_reach_exposure_after_trend_entry': (
- active_metrics['avg_days_to_reach_exposure_after_trend_entry']
- - rollback_metrics['avg_days_to_reach_exposure_after_trend_entry']
- if active_metrics.get('avg_days_to_reach_exposure_after_trend_entry') is not None
- and rollback_metrics.get('avg_days_to_reach_exposure_after_trend_entry') is not None
- else None
- ),
- },
- }
- )
- return {
- 'view_name': 'state_conditioned_view',
- 'status': 'ok' if segments else 'not_run',
- 'basis': 'lite_states_present_in_recent_primary_window',
- 'window_label': window_label,
- 'segments': segments,
- }
- def _build_report_markdown(summary: dict[str, Any]) -> str:
- metrics = summary['metrics']
- execution_profile = summary['execution_profile']
- governance = summary['governance']
- runtime_health = summary.get('runtime_health', {})
- post_promotion_review = summary.get('post_promotion_review', {})
- lines = [
- '# Regime Lite Report',
- '',
- f"- input_path: `{summary['input']['pit_path']}`",
- f"- row_count: `{summary['input']['row_count']}`",
- f"- date_range: `{summary['input']['date_start']}` to `{summary['input']['date_end']}`",
- '',
- '## Execution Profile',
- f"- selected_profile_id: `{summary['selected_profile_id']}`",
- f"- source_variant_id: `{execution_profile['source_variant_id']}`",
- f"- timing_mode: `{execution_profile['timing_mode']}`",
- f"- overlay_mode: `{execution_profile['overlay_mode']}`",
- f"- adaptive_hold_mode: `{execution_profile['adaptive_hold_mode']}`",
- f"- adaptive_hold_context: `{json.dumps(execution_profile['adaptive_hold_context'], ensure_ascii=False, sort_keys=True)}`",
- '',
- '## Core Metrics',
- f"- annual_return: `{metrics['annual_return']:.4f}`",
- f"- max_drawdown: `{metrics['max_drawdown']:.4f}`",
- f"- sharpe: `{metrics['sharpe']:.4f}`",
- f"- annual_turnover: `{metrics['annual_turnover']:.4f}`",
- '',
- '## State Mix',
- f"- state_mix: `{json.dumps(summary['state_mix'], ensure_ascii=False, sort_keys=True)}`",
- '',
- '## Exposure',
- f"- mean_target_exposure: `{summary['mean_target_exposure']:.4f}`",
- f"- max_daily_step_observed: `{summary['max_daily_step_observed']:.4f}`",
- '',
- '## Governance',
- f"- active_profile_id: `{governance['active_profile_id']}`",
- f"- rollback_reference_profile_id: `{governance['rollback_reference_profile_id']}`",
- f"- operating_mode: `{governance['operating_mode']}`",
- '',
- '## Runtime Health',
- f"- status: `{runtime_health.get('status', 'unknown')}`",
- f"- recommended_action: `{runtime_health.get('recommended_action', 'unknown')}`",
- f"- reasons: `{json.dumps(runtime_health.get('reason_lines', []), ensure_ascii=False)}`",
- '',
- '## Post-Promotion Review',
- f"- decision: `{post_promotion_review.get('decision', 'not_run')}`",
- f"- remains_preferred_over_rollback_reference: `{post_promotion_review.get('remains_preferred_over_rollback_reference', False)}`",
- f"- evidence_context_split: `{json.dumps(post_promotion_review.get('evidence_context_split', {}), ensure_ascii=False, sort_keys=True)}`",
- f"- latest_window: `{json.dumps(post_promotion_review.get('latest_window', {}), ensure_ascii=False, sort_keys=True)}`",
- f"- review_windows: `{json.dumps(post_promotion_review.get('review_windows', []), ensure_ascii=False, sort_keys=True)}`",
- f"- segmented_diagnostics: `{json.dumps(post_promotion_review.get('segmented_diagnostics', {}), ensure_ascii=False, sort_keys=True)}`",
- f"- reasons: `{json.dumps(post_promotion_review.get('reason_lines', []), ensure_ascii=False)}`",
- ]
- return '\n'.join(lines) + '\n'
- def main() -> None:
- parser = argparse.ArgumentParser(description='Run a minimal 3-state regime pipeline for small-account operations.')
- parser.add_argument('--pit-csv', type=str, required=True, help='PIT CSV/parquet input with daily bars.')
- parser.add_argument('--output-dir', type=str, default='outputs/regime_lite', help='Output directory.')
- parser.add_argument(
- '--profile',
- type=str,
- default='baseline',
- choices=sorted(LITE_EXECUTION_PROFILES.keys()),
- help='Named lite execution profile to apply.',
- )
- parser.add_argument('--risk-off-exposure', type=float, default=0.0)
- parser.add_argument('--chop-exposure', type=float, default=0.35)
- parser.add_argument('--trend-exposure', type=float, default=0.80)
- parser.add_argument('--max-daily-step', type=float, default=0.20)
- parser.add_argument('--risk-off-drawdown', type=float, default=-0.08)
- parser.add_argument('--risk-off-stress', type=float, default=1.25)
- parser.add_argument('--trend-score-min', type=float, default=0.02)
- parser.add_argument('--trend-breadth-min', type=float, default=0.0)
- parser.add_argument('--trend-stress-max', type=float, default=0.85)
- parser.add_argument('--config', type=str, default=None, help='Optional config YAML path for trading settings.')
- args = parser.parse_args()
- output_dir = Path(args.output_dir)
- output_dir.mkdir(parents=True, exist_ok=True)
- raw = load_point_in_time_panel(args.pit_csv)
- if 'close' not in raw.columns:
- raise ValueError('PIT input must include close column for regime-lite runtime.')
- enriched = _compute_lite_signals(raw)
- enriched['state'] = enriched.apply(
- _classify_state,
- axis=1,
- risk_off_drawdown=float(args.risk_off_drawdown),
- risk_off_stress=float(args.risk_off_stress),
- trend_score_min=float(args.trend_score_min),
- trend_breadth_min=float(args.trend_breadth_min),
- trend_stress_max=float(args.trend_stress_max),
- )
- mapping = {
- 'risk_off': float(args.risk_off_exposure),
- 'chop': float(args.chop_exposure),
- 'trend': float(args.trend_exposure),
- }
- enriched['base_exposure'] = enriched['state'].map(mapping).fillna(float(args.chop_exposure)).astype(float)
- enriched['target_exposure'] = _bounded_targets(
- enriched['base_exposure'],
- max_step=float(args.max_daily_step),
- min_exposure=0.0,
- max_exposure=1.0,
- )
- profile = resolve_execution_profile(args.profile)
- rollback_reference_profile_id = resolve_rollback_reference_profile_id(str(profile['profile_id']))
- governance = build_governance_context(
- active_profile_id=str(profile['profile_id']),
- rollback_reference_profile_id=rollback_reference_profile_id,
- operating_mode='normal',
- decision_rationale_inputs={
- 'selected_profile_id': str(profile['profile_id']),
- 'source_variant_id': str(profile['source_variant_id']),
- 'timing_mode': str(profile['timing_mode']),
- 'overlay_mode': str(profile['overlay_mode']),
- 'adaptive_hold_mode': str(profile.get('adaptive_hold_mode', 'none')),
- },
- )
- target_overlay = apply_entry_specific_exit_overlay(
- enriched,
- enriched['target_exposure'],
- mode=str(profile['overlay_mode']),
- hold_days=int(profile['hold_days']),
- hold_floor=float(profile['hold_floor']),
- stop_drawdown=float(profile['stop_drawdown']),
- stop_stress=float(profile['stop_stress']),
- chop_exposure=float(args.chop_exposure),
- adaptive_hold_mode=str(profile.get('adaptive_hold_mode', 'none')),
- )
- target_final = _bounded_targets(
- target_overlay,
- max_step=float(args.max_daily_step),
- min_exposure=0.0,
- max_exposure=1.0,
- )
- executed = executed_exposure_by_timing(target_final, str(profile['timing_mode']))
- enriched['target_exposure'] = target_final
- config = load_config(args.config)
- ledger, metrics = run_backtest_with_execution(enriched, config, executed)
- ledger.index.name = 'date'
- active_health_metrics = {
- **{k: float(v) for k, v in metrics.items()},
- **_trend_reentry_speed(ledger),
- }
- rollback_ledger = None
- rollback_health_metrics = None
- if str(profile['profile_id']) != rollback_reference_profile_id:
- rollback_profile = resolve_execution_profile(rollback_reference_profile_id)
- rollback_overlay = apply_entry_specific_exit_overlay(
- enriched,
- enriched['target_exposure'],
- mode=str(rollback_profile['overlay_mode']),
- hold_days=int(rollback_profile['hold_days']),
- hold_floor=float(rollback_profile['hold_floor']),
- stop_drawdown=float(rollback_profile['stop_drawdown']),
- stop_stress=float(rollback_profile['stop_stress']),
- chop_exposure=float(args.chop_exposure),
- adaptive_hold_mode=str(rollback_profile.get('adaptive_hold_mode', 'none')),
- )
- rollback_target = _bounded_targets(
- rollback_overlay,
- max_step=float(args.max_daily_step),
- min_exposure=0.0,
- max_exposure=1.0,
- )
- rollback_executed = executed_exposure_by_timing(rollback_target, str(rollback_profile['timing_mode']))
- rollback_plan = enriched.copy()
- rollback_plan['target_exposure'] = rollback_target
- rollback_ledger, rollback_metrics = run_backtest_with_execution(rollback_plan, config, rollback_executed)
- rollback_health_metrics = {
- **{k: float(v) for k, v in rollback_metrics.items()},
- **_trend_reentry_speed(rollback_ledger),
- }
- runtime_health = evaluate_runtime_health(
- governance_context=governance,
- active_metrics=active_health_metrics,
- rollback_metrics=rollback_health_metrics,
- )
- post_promotion_review = build_empty_post_promotion_review(
- governance_context=governance,
- reason_lines=['PASS: active profile is already the rollback reference; post-promotion review not required.'],
- )
- if str(profile['profile_id']) != rollback_reference_profile_id and rollback_health_metrics is not None:
- recent_window_evidence = []
- rollback_review_plan = rollback_ledger if 'rollback_ledger' in locals() else None
- recent_primary_window_df = None
- rollback_recent_primary_window_df = None
- for window_spec in _slice_review_windows(ledger, LITE_POST_PROMOTION_REVIEW_WINDOWS):
- label = str(window_spec['label'])
- active_window_df = window_spec['data']
- active_window_metrics = _metrics_from_window(active_window_df, config)
- rollback_window_df = rollback_review_plan.tail(window_spec['window']['row_count'])
- rollback_window_metrics = _metrics_from_window(rollback_window_df, config)
- if recent_primary_window_df is None:
- recent_primary_window_df = active_window_df
- rollback_recent_primary_window_df = rollback_window_df
- recent_window_evidence.append(
- build_post_promotion_review_window(
- {
- **window_spec['window'],
- 'label': label,
- },
- active_window_metrics,
- rollback_window_metrics,
- )
- )
- full_history_reference = build_post_promotion_review_window(
- _window_info(ledger, label='full_history_reference'),
- active_health_metrics,
- rollback_health_metrics,
- )
- state_conditioned_view = _build_state_conditioned_view(
- recent_primary_window_df,
- rollback_recent_primary_window_df,
- config,
- window_label=str(recent_window_evidence[0]['window']['label']) if recent_window_evidence else 'recent_primary_window',
- )
- post_promotion_review = evaluate_post_promotion_review(
- governance_context=governance,
- recent_window_evidence=recent_window_evidence,
- full_history_reference=full_history_reference,
- state_conditioned_view=state_conditioned_view,
- )
- daily_path = output_dir / 'regime_lite_daily_ledger.csv'
- ledger.to_csv(daily_path)
- state_mix = ledger['state'].fillna('unknown').astype(str).value_counts(normalize=True).sort_index()
- target_diff = ledger['target_exposure'].diff().abs().fillna(0.0)
- summary = {
- 'input': {
- 'pit_path': str(args.pit_csv),
- 'row_count': int(len(ledger)),
- 'date_start': ledger.index.min().date().isoformat() if len(ledger) else None,
- 'date_end': ledger.index.max().date().isoformat() if len(ledger) else None,
- },
- 'selected_profile_id': str(profile['profile_id']),
- 'execution_profile': {
- 'source_variant_id': str(profile['source_variant_id']),
- 'timing_mode': str(profile['timing_mode']),
- 'overlay_mode': str(profile['overlay_mode']),
- 'adaptive_hold_mode': str(profile.get('adaptive_hold_mode', 'none')),
- 'adaptive_hold_context': dict(profile.get('adaptive_hold_context', {})),
- 'hold_days': int(profile['hold_days']),
- 'hold_floor': float(profile['hold_floor']),
- 'stop_drawdown': float(profile['stop_drawdown']),
- 'stop_stress': float(profile['stop_stress']),
- 'rollback_reference_profile_id': rollback_reference_profile_id,
- },
- 'governance': governance,
- 'params': {
- 'risk_off_exposure': float(args.risk_off_exposure),
- 'chop_exposure': float(args.chop_exposure),
- 'trend_exposure': float(args.trend_exposure),
- 'max_daily_step': float(args.max_daily_step),
- 'risk_off_drawdown': float(args.risk_off_drawdown),
- 'risk_off_stress': float(args.risk_off_stress),
- 'trend_score_min': float(args.trend_score_min),
- 'trend_breadth_min': float(args.trend_breadth_min),
- 'trend_stress_max': float(args.trend_stress_max),
- },
- 'metrics': {k: float(v) for k, v in metrics.items()},
- 'runtime_health': runtime_health,
- 'post_promotion_review': post_promotion_review,
- 'state_mix': {state: float(weight) for state, weight in state_mix.items()},
- 'mean_target_exposure': float(ledger['target_exposure'].mean()) if len(ledger) else 0.0,
- 'max_daily_step_observed': float(target_diff.max()) if len(target_diff) else 0.0,
- 'breadth_proxy_source': (
- 'pct_constituents_above_20dma' if 'pct_constituents_above_20dma' in raw.columns else 'neutral_fallback_0.5'
- ),
- }
- with (output_dir / 'regime_lite_summary.json').open('w', encoding='utf-8') as fh:
- json.dump(summary, fh, ensure_ascii=False, indent=2)
- with (output_dir / 'regime_lite_runtime_health.json').open('w', encoding='utf-8') as fh:
- json.dump(runtime_health, fh, ensure_ascii=False, indent=2)
- with (output_dir / 'regime_lite_post_promotion_review.json').open('w', encoding='utf-8') as fh:
- json.dump(post_promotion_review, fh, ensure_ascii=False, indent=2)
- (output_dir / 'regime_lite_report.md').write_text(_build_report_markdown(summary), encoding='utf-8')
- if __name__ == '__main__':
- main()
|