| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- from __future__ import annotations
- from pathlib import Path
- import sys
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- import argparse
- import json
- from typing import Any
- from backtest.frozen_walkforward import (
- normalize_hypothesis_candidates,
- run_frozen_walkforward,
- run_strategy_bundle,
- )
- from backtest.walkforward import WindowSpec, build_expanding_windows
- from config.loader import load_config
- from data.io import (
- evaluate_data_quality_gate,
- load_full_pit_data,
- )
- def _resolve_data_quality_settings(
- config: dict[str, Any],
- *,
- strict_cli: bool,
- min_coverage_cli: float | None,
- ) -> tuple[bool, float, list[str] | None, list[str] | None, dict[str, float]]:
- quality_cfg = config.get('data_quality', {})
- strict_mode = bool(quality_cfg.get('strict_mode_default', False)) or strict_cli
- default_min_coverage = float(quality_cfg.get('default_min_coverage', 0.95))
- if min_coverage_cli is not None:
- default_min_coverage = float(min_coverage_cli)
- critical_columns = [str(col).strip().lower() for col in quality_cfg.get('critical_columns', [])]
- blocking_columns = [str(col).strip().lower() for col in quality_cfg.get('blocking_columns', critical_columns)]
- column_min_coverage = {
- str(column).strip().lower(): float(value) for column, value in quality_cfg.get('column_min_coverage', {}).items()
- }
- return strict_mode, default_min_coverage, (critical_columns or None), (blocking_columns or None), column_min_coverage
- def _load_candidate_payload(path: str | None) -> list[dict[str, Any]] | None:
- if not path:
- return None
- with Path(path).open('r', encoding='utf-8') as fh:
- payload = json.load(fh)
- if not isinstance(payload, list):
- raise ValueError('Candidate file must be a JSON list of candidate objects.')
- return payload
- def _resolve_frozen_settings(
- config: dict[str, Any],
- *,
- candidates_json: str | None,
- min_train_rows_cli: int | None,
- min_test_rows_cli: int | None,
- ) -> tuple[list[Any], int, int]:
- frozen_cfg = config.get('frozen_validation', {})
- raw_candidates = _load_candidate_payload(candidates_json) or frozen_cfg.get('candidates')
- candidates = normalize_hypothesis_candidates(raw_candidates)
- min_train_rows = int(frozen_cfg.get('min_train_rows', 120))
- min_test_rows = int(frozen_cfg.get('min_test_rows', 40))
- if min_train_rows_cli is not None:
- min_train_rows = int(min_train_rows_cli)
- if min_test_rows_cli is not None:
- min_test_rows = int(min_test_rows_cli)
- return candidates, min_train_rows, min_test_rows
- def _serialize_windows(windows: list[WindowSpec]) -> list[dict[str, str]]:
- return [
- {
- 'train_start': window.train_start,
- 'train_end': window.train_end,
- 'test_start': window.test_start,
- 'test_end': window.test_end,
- }
- for window in windows
- ]
- def _resolve_walkforward_windows(config: dict[str, Any], raw_index) -> list[WindowSpec]:
- frozen_cfg = config.get('frozen_validation', {})
- window_mode = str(frozen_cfg.get('window_mode', 'expanding')).strip().lower()
- if window_mode != 'expanding':
- raise ValueError(f'Unsupported window_mode: {window_mode}')
- return build_expanding_windows(
- raw_index,
- min_train_years=int(frozen_cfg.get('min_train_years', 2)),
- test_years=int(frozen_cfg.get('test_years', 1)),
- allow_partial_last_test=bool(frozen_cfg.get('allow_partial_last_test', True)),
- )
- def main() -> None:
- parser = argparse.ArgumentParser(description='Run frozen-hypothesis validation for the ChiNext 50 regime scaffold.')
- parser.add_argument(
- '--pit-csv',
- '--data-csv',
- dest='pit_csv',
- type=str,
- required=True,
- help='Required CSV/parquet full PIT input keyed by date.',
- )
- parser.add_argument(
- '--strict-data',
- action='store_true',
- help='Fail fast when critical input columns breach coverage thresholds.',
- )
- parser.add_argument(
- '--min-coverage',
- type=float,
- default=None,
- help='Override the default minimum non-null coverage ratio for data quality gate.',
- )
- parser.add_argument(
- '--candidates-json',
- type=str,
- default=None,
- help='Optional JSON file describing frozen-validation candidate set.',
- )
- parser.add_argument(
- '--min-train-rows',
- type=int,
- default=None,
- help='Override minimum required rows for each training window.',
- )
- parser.add_argument(
- '--min-test-rows',
- type=int,
- default=None,
- help='Override minimum required rows for each test window.',
- )
- parser.add_argument('--config', type=str, default=None, help='Optional config YAML path.')
- parser.add_argument('--output-dir', type=str, default='outputs/frozen_validation', help='Directory for validation artifacts.')
- args = parser.parse_args()
- output_dir = Path(args.output_dir)
- output_dir.mkdir(parents=True, exist_ok=True)
- config = load_config(args.config)
- raw = load_full_pit_data(args.pit_csv)
- strict_mode, min_coverage, critical_columns, blocking_columns, column_min_coverage = _resolve_data_quality_settings(
- config,
- strict_cli=args.strict_data,
- min_coverage_cli=args.min_coverage,
- )
- quality_summary = evaluate_data_quality_gate(
- raw,
- strict=strict_mode,
- critical_columns=critical_columns,
- blocking_columns=blocking_columns,
- default_min_coverage=min_coverage,
- column_min_coverage=column_min_coverage,
- )
- with (output_dir / 'data_quality_summary.json').open('w', encoding='utf-8') as fh:
- json.dump(quality_summary, fh, ensure_ascii=False, indent=2)
- if quality_summary['blocking']:
- failed_items = quality_summary.get('errors') or quality_summary['breaches']
- breached = ', '.join(item['column'] for item in failed_items)
- raise ValueError(f'Data quality gate failed in strict mode. Breached columns: {breached}')
- config.setdefault('_runtime', {})['strict_feature_gate'] = strict_mode
- candidates, min_train_rows, min_test_rows = _resolve_frozen_settings(
- config,
- candidates_json=args.candidates_json,
- min_train_rows_cli=args.min_train_rows,
- min_test_rows_cli=args.min_test_rows,
- )
- windows = _resolve_walkforward_windows(config, raw.index)
- board, frozen_summary = run_frozen_walkforward(
- raw=raw,
- config=config,
- windows=windows,
- candidates=candidates,
- min_train_rows=min_train_rows,
- min_test_rows=min_test_rows,
- )
- _, _, full_metrics = run_strategy_bundle(raw, config)
- summary = {
- 'window_count': int(frozen_summary['total_windows']),
- 'processed_window_count': int(frozen_summary['processed_window_count']),
- 'skipped_window_count': int(frozen_summary['skipped_window_count']),
- 'positive_window_ratio': float(frozen_summary['positive_window_ratio']),
- 'selected_candidate_distribution': dict(frozen_summary['selected_candidate_distribution']),
- 'window_status_counts': dict(frozen_summary['window_status_counts']),
- 'selection_mode_distribution': dict(frozen_summary.get('selection_mode_distribution', {})),
- 'windows_with_hard_pass_candidate_count': int(frozen_summary.get('windows_with_hard_pass_candidate_count', 0)),
- 'windows_without_hard_pass_candidate_count': int(
- frozen_summary.get('windows_without_hard_pass_candidate_count', 0)
- ),
- 'hard_pass_window_ratio': float(frozen_summary.get('hard_pass_window_ratio', 0.0)),
- 'candidate_selection': dict(frozen_summary.get('candidate_selection', {})),
- 'candidate_ids': list(frozen_summary['candidate_ids']),
- 'min_train_rows': int(frozen_summary['min_train_rows']),
- 'min_test_rows': int(frozen_summary['min_test_rows']),
- 'windows': _serialize_windows(windows),
- 'full_sample_metrics': full_metrics,
- }
- board.to_csv(output_dir / 'frozen_validation_board.csv', index=False)
- with (output_dir / 'frozen_validation_summary.json').open('w', encoding='utf-8') as fh:
- json.dump(summary, fh, ensure_ascii=False, indent=2)
- if __name__ == '__main__':
- main()
|