| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- from __future__ import annotations
- from pathlib import Path
- import sys
- from typing import Any, Mapping
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- import argparse
- import copy
- import json
- import pandas as pd
- from backtest.frozen_walkforward import run_strategy_bundle
- from config.loader import load_config
- from data.io import evaluate_data_quality_gate, load_full_pit_data
- def _resolve_data_quality_settings(
- config: dict[str, Any],
- *,
- strict_cli: bool,
- min_coverage_cli: float | None,
- ) -> tuple[bool, float, list[str] | None, list[str] | None, dict[str, float]]:
- quality_cfg = config.get('data_quality', {})
- strict_mode = bool(quality_cfg.get('strict_mode_default', False)) or strict_cli
- default_min_coverage = float(quality_cfg.get('default_min_coverage', 0.95))
- if min_coverage_cli is not None:
- default_min_coverage = float(min_coverage_cli)
- critical_columns = [str(col).strip().lower() for col in quality_cfg.get('critical_columns', [])]
- blocking_columns = [str(col).strip().lower() for col in quality_cfg.get('blocking_columns', critical_columns)]
- column_min_coverage = {
- str(column).strip().lower(): float(value) for column, value in quality_cfg.get('column_min_coverage', {}).items()
- }
- return strict_mode, default_min_coverage, (critical_columns or None), (blocking_columns or None), column_min_coverage
- def _deep_merge_dict(base: Mapping[str, Any], overrides: Mapping[str, Any]) -> dict[str, Any]:
- out = copy.deepcopy(dict(base))
- for key, value in overrides.items():
- if isinstance(value, Mapping) and isinstance(out.get(key), Mapping):
- out[key] = _deep_merge_dict(dict(out[key]), value)
- else:
- out[key] = copy.deepcopy(value)
- return out
- def _parse_float_list(raw: str, *, label: str) -> list[float]:
- values: list[float] = []
- for item in raw.split(','):
- text = item.strip()
- if not text:
- continue
- values.append(float(text))
- if not values:
- raise ValueError(f'{label} must include at least one float value.')
- return values
- def _calibration_score(metrics: Mapping[str, Any]) -> float:
- utility = float(metrics.get('utility_total_score', 0.0))
- annual_return = float(metrics.get('annual_return', 0.0))
- upside_capture = float(metrics.get('upside_capture', 0.0))
- tracking_abs = float(metrics.get('tracking_diff_abs_mean', 0.0))
- tracking_p95 = float(metrics.get('tracking_error_20_p95', 0.0))
- max_drawdown = float(metrics.get('max_drawdown', 0.0))
- return (
- 0.60 * utility
- + 0.25 * annual_return
- + 0.15 * upside_capture
- - 0.50 * max_drawdown
- - 2.0 * max(0.0, tracking_p95 - 0.003)
- - 1.0 * max(0.0, tracking_abs - 0.001)
- )
- def main() -> None:
- parser = argparse.ArgumentParser(description='Calibrate execution constraint parameters on full PIT data.')
- parser.add_argument('--pit-csv', '--data-csv', dest='pit_csv', type=str, required=True, help='Required CSV/parquet full PIT input keyed by date.')
- parser.add_argument('--strict-data', action='store_true', help='Fail fast when blocking quality breaches are detected.')
- parser.add_argument('--min-coverage', type=float, default=None, help='Override default minimum non-null coverage ratio.')
- parser.add_argument('--cost-multipliers', type=str, default='1.0,1.25,1.5,1.75', help='Comma-separated extreme_day_cost_multiplier candidates.')
- parser.add_argument('--gap-slippage-factors', type=str, default='0.0,0.01,0.02,0.03', help='Comma-separated gap_slippage_factor candidates.')
- parser.add_argument('--config', type=str, default=None, help='Optional config YAML path.')
- parser.add_argument('--output-dir', type=str, default='outputs/execution_calibration', help='Directory for calibration artifacts.')
- args = parser.parse_args()
- output_dir = Path(args.output_dir)
- output_dir.mkdir(parents=True, exist_ok=True)
- config = load_config(args.config)
- raw = load_full_pit_data(args.pit_csv)
- strict_mode, min_coverage, critical_columns, blocking_columns, column_min_coverage = _resolve_data_quality_settings(
- config,
- strict_cli=args.strict_data,
- min_coverage_cli=args.min_coverage,
- )
- quality_summary = evaluate_data_quality_gate(
- raw,
- strict=strict_mode,
- critical_columns=critical_columns,
- blocking_columns=blocking_columns,
- default_min_coverage=min_coverage,
- column_min_coverage=column_min_coverage,
- )
- with (output_dir / 'data_quality_summary.json').open('w', encoding='utf-8') as fh:
- json.dump(quality_summary, fh, ensure_ascii=False, indent=2)
- if quality_summary['blocking']:
- failed_items = quality_summary.get('errors') or quality_summary['breaches']
- breached = ', '.join(item['column'] for item in failed_items)
- raise ValueError(f'Data quality gate failed in strict mode. Breached columns: {breached}')
- config.setdefault('_runtime', {})['strict_feature_gate'] = strict_mode
- multipliers = _parse_float_list(args.cost_multipliers, label='cost-multipliers')
- gap_factors = _parse_float_list(args.gap_slippage_factors, label='gap-slippage-factors')
- rows: list[dict[str, Any]] = []
- for multiplier in multipliers:
- for gap_factor in gap_factors:
- candidate_config = _deep_merge_dict(
- config,
- {
- 'trading': {
- 'extreme_day_cost_multiplier': float(multiplier),
- 'gap_slippage_factor': float(gap_factor),
- }
- },
- )
- _, _, metrics = run_strategy_bundle(raw, candidate_config)
- score = _calibration_score(metrics)
- rows.append(
- {
- 'extreme_day_cost_multiplier': float(multiplier),
- 'gap_slippage_factor': float(gap_factor),
- 'calibration_score': float(score),
- 'utility_total_score': float(metrics.get('utility_total_score', 0.0)),
- 'annual_return': float(metrics.get('annual_return', 0.0)),
- 'sharpe': float(metrics.get('sharpe', 0.0)),
- 'max_drawdown': float(metrics.get('max_drawdown', 0.0)),
- 'tracking_diff_mean': float(metrics.get('tracking_diff_mean', 0.0)),
- 'tracking_diff_abs_mean': float(metrics.get('tracking_diff_abs_mean', 0.0)),
- 'tracking_error_20_p95': float(metrics.get('tracking_error_20_p95', 0.0)),
- }
- )
- grid = pd.DataFrame(rows).sort_values(by='calibration_score', ascending=False).reset_index(drop=True)
- grid.to_csv(output_dir / 'execution_calibration_grid.csv', index=False)
- best = grid.iloc[0].to_dict()
- recommendation = {
- 'input': {
- 'pit_path': str(args.pit_csv),
- 'row_count': int(len(raw)),
- 'date_start': raw.index.min().date().isoformat() if len(raw) else None,
- 'date_end': raw.index.max().date().isoformat() if len(raw) else None,
- },
- 'score_formula': '0.60*utility_total_score + 0.25*annual_return + 0.15*upside_capture - 0.50*max_drawdown - 2.0*max(0, tracking_error_20_p95 - 0.003) - 1.0*max(0, tracking_diff_abs_mean - 0.001)',
- 'search_space': {
- 'cost_multipliers': multipliers,
- 'gap_slippage_factors': gap_factors,
- 'combination_count': int(len(grid)),
- },
- 'recommended': {
- 'extreme_day_cost_multiplier': float(best['extreme_day_cost_multiplier']),
- 'gap_slippage_factor': float(best['gap_slippage_factor']),
- 'calibration_score': float(best['calibration_score']),
- },
- 'top_candidates': grid.head(5).to_dict(orient='records'),
- }
- with (output_dir / 'execution_calibration_recommendation.json').open('w', encoding='utf-8') as fh:
- json.dump(recommendation, fh, ensure_ascii=False, indent=2)
- if __name__ == '__main__':
- main()
|