| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Any, Iterable, Mapping, Sequence
- import pandas as pd
- from .io import (
- DEFAULT_MIN_COVERAGE,
- evaluate_data_quality_gate,
- load_market_data,
- load_point_in_time_panel,
- merge_point_in_time_sidecar,
- )
- def build_pit_dataset(
- market_path: str | Path,
- *,
- sidecar_paths: Sequence[str | Path] | None = None,
- strict: bool = False,
- critical_columns: Iterable[str] | None = None,
- blocking_columns: Iterable[str] | None = None,
- default_min_coverage: float = DEFAULT_MIN_COVERAGE,
- column_min_coverage: Mapping[str, float] | None = None,
- ) -> tuple[pd.DataFrame, dict[str, Any]]:
- market_path = str(market_path)
- ordered_sidecars = [str(path) for path in (sidecar_paths or [])]
- pit = load_market_data(market_path)
- for sidecar_path in ordered_sidecars:
- sidecar = load_point_in_time_panel(sidecar_path)
- pit = merge_point_in_time_sidecar(pit, sidecar)
- quality = evaluate_data_quality_gate(
- pit,
- strict=strict,
- critical_columns=critical_columns,
- blocking_columns=blocking_columns,
- default_min_coverage=default_min_coverage,
- column_min_coverage=column_min_coverage,
- )
- quality['sources'] = {
- 'market_path': market_path,
- 'sidecar_paths': ordered_sidecars,
- 'sidecar_count': int(len(ordered_sidecars)),
- 'merged_row_count': int(len(pit)),
- }
- quality['pit_columns'] = sorted([str(col) for col in pit.columns])
- return pit, quality
|