| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- from __future__ import annotations
- import copy
- import json
- from dataclasses import dataclass
- from typing import Any, Callable, Iterable, Mapping, Sequence
- import pandas as pd
- from backtest.engine import compute_metrics, run_backtest
- from backtest.utility import core_utility, utility_from_metrics, utility_status
- from features.quality import enforce_feature_information_gate
- from backtest.walkforward import WindowSpec
- from features.pipeline import build_feature_table
- from model.policy import build_exposure_plan
- from model.scores import build_scores
- from model.state_machine import run_state_machine
- @dataclass(frozen=True)
- class HypothesisCandidate:
- candidate_id: str
- overrides: dict[str, Any]
- DEFAULT_HYPOTHESIS_CANDIDATES: tuple[HypothesisCandidate, ...] = (
- HypothesisCandidate(
- candidate_id='defensive',
- overrides={
- 'policy': {
- 'trend': 0.80,
- 'euphoric_late': 0.30,
- 'chop': 0.20,
- 'repair_rebound_base': 0.30,
- 'repair_rebound_max': 0.65,
- },
- 'trading': {
- 'max_daily_exposure_change': 0.20,
- },
- },
- ),
- HypothesisCandidate(candidate_id='baseline', overrides={}),
- HypothesisCandidate(
- candidate_id='balanced_capture',
- overrides={
- 'policy': {
- 'trend': 0.95,
- 'euphoric_late': 0.65,
- 'chop': 0.35,
- 'repair_rebound_base': 0.40,
- 'repair_rebound_max': 0.85,
- },
- 'trading': {
- 'max_daily_exposure_change': 0.30,
- },
- },
- ),
- HypothesisCandidate(
- candidate_id='pro_risk',
- overrides={
- 'policy': {
- 'trend': 1.00,
- 'euphoric_late': 0.70,
- 'chop': 0.45,
- 'repair_rebound_base': 0.50,
- 'repair_rebound_max': 0.95,
- },
- 'trading': {
- 'max_daily_exposure_change': 0.35,
- },
- },
- ),
- )
- StrategyRunner = Callable[[pd.DataFrame, dict[str, Any]], tuple[pd.DataFrame, pd.DataFrame, dict[str, float]]]
- def _deep_merge_dict(base: Mapping[str, Any], overrides: Mapping[str, Any]) -> dict[str, Any]:
- out = copy.deepcopy(dict(base))
- for key, value in overrides.items():
- if isinstance(value, Mapping) and isinstance(out.get(key), Mapping):
- out[key] = _deep_merge_dict(dict(out[key]), value)
- else:
- out[key] = copy.deepcopy(value)
- return out
- def _resolve_utility(metrics: Mapping[str, float], config: Mapping[str, Any] | None = None) -> tuple[float, str]:
- evaluation_cfg = dict((config or {}).get('evaluation', {}))
- utility_total_score = float(
- metrics.get(
- 'utility_total_score',
- utility_from_metrics(
- dict(metrics),
- upside_target=float(evaluation_cfg.get('utility_upside_target', 0.55)),
- turnover_penalty_start=float(evaluation_cfg.get('utility_turnover_penalty_start', 8.0)),
- turnover_penalty_rate=float(evaluation_cfg.get('utility_turnover_penalty_rate', 0.010)),
- ),
- )
- )
- utility_state = str(metrics.get('utility_status', utility_status(utility_total_score)))
- return utility_total_score, utility_state
- def run_strategy_bundle(df: pd.DataFrame, config: dict[str, Any]) -> tuple[pd.DataFrame, pd.DataFrame, dict[str, float]]:
- featured = build_feature_table(df)
- enforce_feature_information_gate(featured, config)
- scored = build_scores(featured)
- stated = run_state_machine(scored, config)
- planned = build_exposure_plan(stated, config)
- ledger, metrics = run_backtest(planned, config)
- utility_total_score, utility_state = _resolve_utility(metrics, config)
- out_metrics = dict(metrics)
- out_metrics['utility_total_score'] = utility_total_score
- out_metrics['utility_status'] = utility_state
- return planned, ledger, out_metrics
- def normalize_hypothesis_candidates(raw_candidates: Iterable[Mapping[str, Any]] | None) -> list[HypothesisCandidate]:
- if raw_candidates is None:
- return [copy.deepcopy(candidate) for candidate in DEFAULT_HYPOTHESIS_CANDIDATES]
- candidates: list[HypothesisCandidate] = []
- for idx, item in enumerate(raw_candidates):
- candidate_id = str(item.get('id', item.get('candidate_id', f'candidate_{idx + 1}'))).strip()
- if not candidate_id:
- raise ValueError(f'Candidate index {idx} is missing an id.')
- overrides_raw = item.get('overrides', {})
- if not isinstance(overrides_raw, Mapping):
- raise ValueError(f'Candidate {candidate_id} overrides must be an object.')
- candidates.append(HypothesisCandidate(candidate_id=candidate_id, overrides=dict(overrides_raw)))
- if not candidates:
- raise ValueError('At least one hypothesis candidate is required.')
- ids = [candidate.candidate_id for candidate in candidates]
- if len(set(ids)) != len(ids):
- raise ValueError(f'Duplicate candidate ids found: {ids}')
- return candidates
- def _candidate_config(base_config: Mapping[str, Any], candidate: HypothesisCandidate) -> dict[str, Any]:
- merged = _deep_merge_dict(base_config, candidate.overrides)
- merged['_candidate_id'] = candidate.candidate_id
- return merged
- def _prefixed_metrics(prefix: str, metrics: Mapping[str, Any]) -> dict[str, Any]:
- out: dict[str, Any] = {}
- for key, value in metrics.items():
- if isinstance(value, (int, float)):
- out[f'{prefix}_{key}'] = float(value)
- else:
- out[f'{prefix}_{key}'] = value
- return out
- def _compute_window_metrics(ledger: pd.DataFrame, config: Mapping[str, Any] | None = None) -> dict[str, float]:
- required_columns = {'strategy_return_net', 'asset_exec_return', 'turnover'}
- if not required_columns.issubset(ledger.columns):
- raise ValueError(f'Ledger is missing required columns: {sorted(required_columns - set(ledger.columns))}')
- metrics = compute_metrics(
- strategy_returns=ledger['strategy_return_net'],
- benchmark_returns=ledger['asset_exec_return'],
- turnover=ledger['turnover'],
- )
- utility_total_score, utility_state = _resolve_utility(metrics, config)
- out_metrics = dict(metrics)
- out_metrics['utility_total_score'] = utility_total_score
- out_metrics['utility_status'] = utility_state
- return out_metrics
- def _window_row_base(window: WindowSpec) -> dict[str, Any]:
- return {
- 'train_start': window.train_start,
- 'train_end': window.train_end,
- 'test_start': window.test_start,
- 'test_end': window.test_end,
- }
- def _clip(value: float, lower: float, upper: float) -> float:
- return float(min(max(value, lower), upper))
- def _safe_float(value: Any, default: float = 0.0) -> float:
- try:
- return float(value)
- except (TypeError, ValueError):
- return float(default)
- def _resolve_candidate_selection_settings(config: Mapping[str, Any]) -> dict[str, Any]:
- frozen_cfg = dict((config or {}).get('frozen_validation', {}))
- evaluation_cfg = dict((config or {}).get('evaluation', {}))
- cfg = dict(frozen_cfg.get('candidate_selection', {}))
- return {
- 'use_hard_constraints': bool(cfg.get('use_hard_constraints', True)),
- 'upside_capture_min': float(cfg.get('upside_capture_min', 0.28)),
- 'max_drawdown_ratio_vs_benchmark': float(cfg.get('max_drawdown_ratio_vs_benchmark', 0.72)),
- 'annual_turnover_soft_max': float(cfg.get('annual_turnover_soft_max', 18.0)),
- 'annual_return_override_abs': float(cfg.get('annual_return_override_abs', 0.05)),
- 'annual_return_override_ratio': float(cfg.get('annual_return_override_ratio', 0.40)),
- 'return_ratio_weight': float(cfg.get('return_ratio_weight', 0.30)),
- 'upside_weight': float(cfg.get('upside_weight', 0.30)),
- 'drawdown_weight': float(cfg.get('drawdown_weight', 0.20)),
- 'sharpe_delta_weight': float(cfg.get('sharpe_delta_weight', 0.10)),
- 'stability_weight': float(cfg.get('stability_weight', 0.10)),
- 'turnover_penalty_per_unit': float(cfg.get('turnover_penalty_per_unit', 0.015)),
- 'score_cap': float(cfg.get('score_cap', 1.2)),
- 'upside_target': float(cfg.get('upside_target', 0.45)),
- 'drawdown_improvement_target': float(cfg.get('drawdown_improvement_target', 0.35)),
- 'sharpe_delta_shift': float(cfg.get('sharpe_delta_shift', 0.05)),
- 'sharpe_delta_scale': float(cfg.get('sharpe_delta_scale', 0.15)),
- 'turnover_penalty_start': float(cfg.get('turnover_penalty_start', 12.0)),
- 'core_utility_floor': float(cfg.get('core_utility_floor', cfg.get('utility_floor', -0.05))),
- 'core_utility_target': float(cfg.get('core_utility_target', cfg.get('utility_target', 0.10))),
- 'utility_upside_target': float(evaluation_cfg.get('utility_upside_target', 0.55)),
- 'fallback_mode': str(cfg.get('fallback_mode', 'closest_to_feasible_frontier')).strip().lower(),
- }
- def _compute_selection_score(metrics: Mapping[str, Any], settings: Mapping[str, Any]) -> tuple[float, dict[str, float]]:
- annual_return = _safe_float(metrics.get('annual_return'))
- benchmark_return = _safe_float(metrics.get('benchmark_return'))
- upside_capture = _safe_float(metrics.get('upside_capture'))
- max_drawdown = _safe_float(metrics.get('max_drawdown'))
- benchmark_max_drawdown = _safe_float(metrics.get('benchmark_max_drawdown'))
- sharpe_delta = _safe_float(metrics.get('sharpe_delta'))
- annual_turnover = _safe_float(metrics.get('annual_turnover'))
- score_cap = float(settings['score_cap'])
- upside_target = max(float(settings['upside_target']), 1e-12)
- drawdown_target = max(float(settings['drawdown_improvement_target']), 1e-12)
- sharpe_scale = max(float(settings['sharpe_delta_scale']), 1e-12)
- if benchmark_return > 0.05:
- return_ratio = _clip(annual_return / benchmark_return, 0.0, score_cap)
- else:
- return_ratio = _clip(annual_return / 0.10, 0.0, score_cap)
- upside_score = _clip((upside_capture - 0.15) / max(upside_target - 0.15, 1e-12), 0.0, score_cap)
- if benchmark_max_drawdown > 1e-12:
- drawdown_improvement = (benchmark_max_drawdown - max_drawdown) / benchmark_max_drawdown
- else:
- drawdown_improvement = 0.0
- core_utility_value = _safe_float(
- metrics.get(
- 'core_utility_score',
- core_utility(
- sharpe_delta=sharpe_delta,
- drawdown_improvement=drawdown_improvement,
- upside_capture=upside_capture,
- upside_target=float(settings['utility_upside_target']),
- ),
- )
- )
- drawdown_score = _clip(drawdown_improvement / drawdown_target, 0.0, score_cap)
- sharpe_delta_score = _clip((sharpe_delta + float(settings['sharpe_delta_shift'])) / sharpe_scale, 0.0, score_cap)
- stability_score = _clip(
- (core_utility_value - float(settings['core_utility_floor']))
- / max(float(settings['core_utility_target']) - float(settings['core_utility_floor']), 1e-12),
- 0.0,
- score_cap,
- )
- turnover_penalty = max(0.0, annual_turnover - float(settings['turnover_penalty_start'])) * float(
- settings['turnover_penalty_per_unit']
- )
- score = (
- float(settings['return_ratio_weight']) * return_ratio
- + float(settings['upside_weight']) * upside_score
- + float(settings['drawdown_weight']) * drawdown_score
- + float(settings['sharpe_delta_weight']) * sharpe_delta_score
- + float(settings['stability_weight']) * stability_score
- - turnover_penalty
- )
- return score, {
- 'return_ratio': return_ratio,
- 'upside_score': upside_score,
- 'drawdown_score': drawdown_score,
- 'sharpe_delta_score': sharpe_delta_score,
- 'core_utility_value': core_utility_value,
- 'stability_score': stability_score,
- 'turnover_penalty': turnover_penalty,
- }
- def _evaluate_hard_constraints(metrics: Mapping[str, Any], settings: Mapping[str, Any]) -> tuple[bool, list[str]]:
- reasons: list[str] = []
- upside_capture = _safe_float(metrics.get('upside_capture'))
- max_drawdown = _safe_float(metrics.get('max_drawdown'))
- benchmark_max_drawdown = _safe_float(metrics.get('benchmark_max_drawdown'))
- annual_turnover = _safe_float(metrics.get('annual_turnover'))
- annual_return = _safe_float(metrics.get('annual_return'))
- benchmark_return = _safe_float(metrics.get('benchmark_return'))
- if upside_capture < float(settings['upside_capture_min']):
- reasons.append('upside_capture_below_min')
- if benchmark_max_drawdown > 1e-12:
- drawdown_ratio = max_drawdown / benchmark_max_drawdown
- if drawdown_ratio > float(settings['max_drawdown_ratio_vs_benchmark']):
- reasons.append('drawdown_ratio_above_max')
- turnover_cap = float(settings['annual_turnover_soft_max'])
- return_override_threshold = max(
- float(settings['annual_return_override_abs']),
- float(settings['annual_return_override_ratio']) * max(benchmark_return, 0.0),
- )
- if annual_turnover > turnover_cap and annual_return < return_override_threshold:
- reasons.append('turnover_above_soft_max_without_return_override')
- return len(reasons) == 0, reasons
- def _constraint_distance(metrics: Mapping[str, Any], settings: Mapping[str, Any]) -> tuple[float, dict[str, float]]:
- upside_capture = _safe_float(metrics.get('upside_capture'))
- max_drawdown = _safe_float(metrics.get('max_drawdown'))
- benchmark_max_drawdown = _safe_float(metrics.get('benchmark_max_drawdown'))
- annual_turnover = _safe_float(metrics.get('annual_turnover'))
- annual_return = _safe_float(metrics.get('annual_return'))
- benchmark_return = _safe_float(metrics.get('benchmark_return'))
- upside_min = max(float(settings['upside_capture_min']), 1e-12)
- drawdown_max = max(float(settings['max_drawdown_ratio_vs_benchmark']), 1e-12)
- turnover_soft_max = max(float(settings['annual_turnover_soft_max']), 1e-12)
- return_override_threshold = max(
- float(settings['annual_return_override_abs']),
- float(settings['annual_return_override_ratio']) * max(benchmark_return, 0.0),
- )
- upside_gap = max(0.0, upside_min - upside_capture) / upside_min
- drawdown_ratio = (max_drawdown / benchmark_max_drawdown) if benchmark_max_drawdown > 1e-12 else 0.0
- drawdown_gap = max(0.0, drawdown_ratio - drawdown_max) / drawdown_max
- turnover_gap = 0.0
- if annual_turnover > turnover_soft_max and annual_return < return_override_threshold:
- turnover_gap = (annual_turnover - turnover_soft_max) / turnover_soft_max
- violation_distance = 0.50 * upside_gap + 0.30 * drawdown_gap + 0.20 * turnover_gap
- return float(violation_distance), {
- 'upside_gap': float(upside_gap),
- 'drawdown_gap': float(drawdown_gap),
- 'turnover_gap': float(turnover_gap),
- }
- def run_frozen_walkforward(
- raw: pd.DataFrame,
- config: Mapping[str, Any],
- windows: Sequence[WindowSpec],
- *,
- candidates: Sequence[HypothesisCandidate] | None = None,
- min_train_rows: int = 120,
- min_test_rows: int = 40,
- strategy_runner: StrategyRunner | None = None,
- ) -> tuple[pd.DataFrame, dict[str, Any]]:
- if min_train_rows <= 0:
- raise ValueError('min_train_rows must be positive.')
- if min_test_rows <= 0:
- raise ValueError('min_test_rows must be positive.')
- runner = strategy_runner or run_strategy_bundle
- candidate_list = list(candidates or DEFAULT_HYPOTHESIS_CANDIDATES)
- if not candidate_list:
- raise ValueError('At least one candidate is required for frozen walk-forward.')
- selection_settings = _resolve_candidate_selection_settings(config)
- rows: list[dict[str, Any]] = []
- for window in windows:
- train_slice = raw.loc[window.train_start:window.train_end].copy()
- test_slice = raw.loc[window.test_start:window.test_end].copy()
- row = _window_row_base(window)
- row['train_rows'] = int(len(train_slice))
- row['test_rows'] = int(len(test_slice))
- row['candidate_count'] = int(len(candidate_list))
- if len(train_slice) < min_train_rows:
- row['status'] = 'skipped_insufficient_train'
- rows.append(row)
- continue
- if len(test_slice) < min_test_rows:
- row['status'] = 'skipped_insufficient_test'
- rows.append(row)
- continue
- selected_candidate: HypothesisCandidate | None = None
- selected_train_metrics: dict[str, float] | None = None
- selected_train_utility = float('-inf')
- selected_train_score = float('-inf')
- selected_train_hard_pass = False
- selected_train_constraint_failures: list[str] = []
- selected_train_violation_distance = 0.0
- selected_train_violation_components: dict[str, float] = {}
- selection_mode = 'constraint_score'
- candidate_evaluations: list[dict[str, Any]] = []
- for candidate in candidate_list:
- candidate_config = _candidate_config(config, candidate)
- _, _, train_metrics_raw = runner(train_slice, candidate_config)
- train_metrics = dict(train_metrics_raw)
- utility_value, _ = _resolve_utility(train_metrics)
- train_metrics['utility_total_score'] = utility_value
- train_metrics['utility_status'] = utility_status(utility_value)
- hard_pass, hard_fail_reasons = _evaluate_hard_constraints(train_metrics, selection_settings)
- score_value, score_components = _compute_selection_score(train_metrics, selection_settings)
- violation_distance, violation_components = _constraint_distance(train_metrics, selection_settings)
- candidate_evaluations.append(
- {
- 'candidate': candidate,
- 'metrics': train_metrics,
- 'utility': utility_value,
- 'hard_pass': hard_pass,
- 'hard_fail_reasons': hard_fail_reasons,
- 'selection_score': score_value,
- 'selection_score_components': score_components,
- 'violation_distance': violation_distance,
- 'violation_components': violation_components,
- }
- )
- use_hard_constraints = bool(selection_settings['use_hard_constraints'])
- ranking_pool = (
- [item for item in candidate_evaluations if item['hard_pass']]
- if use_hard_constraints
- else candidate_evaluations
- )
- if ranking_pool:
- for item in ranking_pool:
- score_value = float(item['selection_score'])
- if score_value > selected_train_score:
- selected_train_score = score_value
- selected_candidate = item['candidate']
- selected_train_metrics = item['metrics']
- selected_train_utility = float(item['utility'])
- selected_train_hard_pass = bool(item['hard_pass'])
- selected_train_constraint_failures = list(item['hard_fail_reasons'])
- selected_train_violation_distance = float(item['violation_distance'])
- selected_train_violation_components = dict(item['violation_components'])
- else:
- fallback_mode = str(selection_settings.get('fallback_mode', 'closest_to_feasible_frontier')).strip().lower()
- if fallback_mode == 'closest_to_feasible_frontier':
- selection_mode = 'frontier_fallback_no_hard_pass'
- selected_fallback_score = float('-inf')
- for item in candidate_evaluations:
- fallback_score = -float(item['violation_distance']) + 0.25 * float(item['selection_score'])
- utility_value = float(item['utility'])
- if (
- fallback_score > selected_fallback_score
- or (
- fallback_score == selected_fallback_score
- and float(item['selection_score']) > selected_train_score
- )
- or (
- fallback_score == selected_fallback_score
- and float(item['selection_score']) == selected_train_score
- and utility_value > selected_train_utility
- )
- ):
- selected_fallback_score = fallback_score
- selected_train_utility = utility_value
- selected_candidate = item['candidate']
- selected_train_metrics = item['metrics']
- selected_train_score = float(item['selection_score'])
- selected_train_hard_pass = bool(item['hard_pass'])
- selected_train_constraint_failures = list(item['hard_fail_reasons'])
- selected_train_violation_distance = float(item['violation_distance'])
- selected_train_violation_components = dict(item['violation_components'])
- else:
- selection_mode = 'utility_fallback_no_hard_pass'
- for item in candidate_evaluations:
- utility_value = float(item['utility'])
- if utility_value > selected_train_utility:
- selected_train_utility = utility_value
- selected_candidate = item['candidate']
- selected_train_metrics = item['metrics']
- selected_train_score = float(item['selection_score'])
- selected_train_hard_pass = bool(item['hard_pass'])
- selected_train_constraint_failures = list(item['hard_fail_reasons'])
- selected_train_violation_distance = float(item['violation_distance'])
- selected_train_violation_components = dict(item['violation_components'])
- hard_pass_count = int(sum(1 for item in candidate_evaluations if bool(item['hard_pass'])))
- ranking_brief = [
- {
- 'candidate_id': item['candidate'].candidate_id,
- 'hard_pass': bool(item['hard_pass']),
- 'selection_score': float(item['selection_score']),
- 'train_utility_total_score': float(item['utility']),
- 'hard_fail_reasons': list(item['hard_fail_reasons']),
- 'violation_distance': float(item['violation_distance']),
- }
- for item in candidate_evaluations
- ]
- ranking_brief.sort(key=lambda x: (-x['hard_pass'], -x['selection_score'], -x['train_utility_total_score']))
- if selected_candidate is None or selected_train_metrics is None:
- row['status'] = 'skipped_no_candidate'
- rows.append(row)
- continue
- combined_slice = raw.loc[window.train_start:window.test_end].copy()
- candidate_config = _candidate_config(config, selected_candidate)
- _, combined_ledger, _ = runner(combined_slice, candidate_config)
- frozen_test_ledger = combined_ledger.loc[window.test_start:window.test_end].copy()
- if len(frozen_test_ledger) < min_test_rows:
- row['status'] = 'skipped_insufficient_test'
- rows.append(row)
- continue
- test_metrics = _compute_window_metrics(frozen_test_ledger, candidate_config)
- row.update(
- {
- 'status': 'ok',
- 'selected_candidate_id': selected_candidate.candidate_id,
- 'selection_mode': selection_mode,
- 'train_candidate_hard_pass_count': hard_pass_count,
- 'train_candidate_total_count': int(len(candidate_evaluations)),
- 'selected_train_selection_score': float(selected_train_score),
- 'selected_train_hard_pass': bool(selected_train_hard_pass),
- 'selected_train_constraint_failures': json.dumps(
- selected_train_constraint_failures,
- ensure_ascii=False,
- sort_keys=True,
- ),
- 'selected_train_violation_distance': float(selected_train_violation_distance),
- 'selected_train_violation_components': json.dumps(
- selected_train_violation_components,
- ensure_ascii=False,
- sort_keys=True,
- ),
- 'train_candidate_rankings': json.dumps(ranking_brief, ensure_ascii=False, sort_keys=True),
- 'selected_candidate_overrides': json.dumps(
- selected_candidate.overrides,
- ensure_ascii=False,
- sort_keys=True,
- ),
- }
- )
- row.update(_prefixed_metrics('train', selected_train_metrics))
- row.update(_prefixed_metrics('test', test_metrics))
- rows.append(row)
- board = pd.DataFrame(rows)
- if board.empty:
- board = pd.DataFrame(columns=['status'])
- ok_board = board[board['status'] == 'ok'].copy() if 'status' in board.columns else pd.DataFrame()
- selected_distribution = (
- ok_board['selected_candidate_id'].value_counts().to_dict() if 'selected_candidate_id' in ok_board.columns else {}
- )
- status_counts = board['status'].value_counts().to_dict() if 'status' in board.columns else {}
- selection_mode_distribution = (
- ok_board['selection_mode'].value_counts().to_dict() if not ok_board.empty and 'selection_mode' in ok_board.columns else {}
- )
- windows_with_hard_pass_candidate_count = (
- int((ok_board['train_candidate_hard_pass_count'] > 0).sum())
- if not ok_board.empty and 'train_candidate_hard_pass_count' in ok_board.columns
- else 0
- )
- hard_pass_window_ratio = (
- float(windows_with_hard_pass_candidate_count / len(ok_board))
- if len(ok_board) > 0
- else 0.0
- )
- positive_window_ratio = (
- float((ok_board['test_utility_total_score'] > 0.0).mean())
- if not ok_board.empty and 'test_utility_total_score' in ok_board.columns
- else 0.0
- )
- fallback_distance_distribution = (
- ok_board.loc[
- ok_board['selection_mode'].isin({'frontier_fallback_no_hard_pass', 'utility_fallback_no_hard_pass'}),
- 'selected_train_violation_distance',
- ]
- .dropna()
- .tolist()
- if not ok_board.empty
- and 'selection_mode' in ok_board.columns
- and 'selected_train_violation_distance' in ok_board.columns
- else []
- )
- summary = {
- 'total_windows': int(len(windows)),
- 'processed_window_count': int(len(ok_board)),
- 'skipped_window_count': int(max(len(windows) - len(ok_board), 0)),
- 'positive_window_ratio': positive_window_ratio,
- 'positive_window_ratio_role': 'diagnostic_only',
- 'primary_acceptance_metrics': ['primary_window_success_ratio', 'hard_pass_window_ratio'],
- 'selected_candidate_distribution': selected_distribution,
- 'window_status_counts': status_counts,
- 'selection_mode_distribution': selection_mode_distribution,
- 'windows_with_hard_pass_candidate_count': windows_with_hard_pass_candidate_count,
- 'windows_without_hard_pass_candidate_count': int(max(len(ok_board) - windows_with_hard_pass_candidate_count, 0)),
- 'hard_pass_window_ratio': hard_pass_window_ratio,
- 'fallback_distance_distribution': [float(x) for x in fallback_distance_distribution],
- 'candidate_ids': [candidate.candidate_id for candidate in candidate_list],
- 'min_train_rows': int(min_train_rows),
- 'min_test_rows': int(min_test_rows),
- 'candidate_selection': selection_settings,
- }
- return board, summary
|