pit_builder.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from __future__ import annotations
  2. from pathlib import Path
  3. from typing import Any, Iterable, Mapping, Sequence
  4. import pandas as pd
  5. from .io import (
  6. DEFAULT_MIN_COVERAGE,
  7. evaluate_data_quality_gate,
  8. load_market_data,
  9. load_point_in_time_panel,
  10. merge_point_in_time_sidecar,
  11. )
  12. def build_pit_dataset(
  13. market_path: str | Path,
  14. *,
  15. sidecar_paths: Sequence[str | Path] | None = None,
  16. strict: bool = False,
  17. critical_columns: Iterable[str] | None = None,
  18. blocking_columns: Iterable[str] | None = None,
  19. default_min_coverage: float = DEFAULT_MIN_COVERAGE,
  20. column_min_coverage: Mapping[str, float] | None = None,
  21. ) -> tuple[pd.DataFrame, dict[str, Any]]:
  22. market_path = str(market_path)
  23. ordered_sidecars = [str(path) for path in (sidecar_paths or [])]
  24. pit = load_market_data(market_path)
  25. for sidecar_path in ordered_sidecars:
  26. sidecar = load_point_in_time_panel(sidecar_path)
  27. pit = merge_point_in_time_sidecar(pit, sidecar)
  28. quality = evaluate_data_quality_gate(
  29. pit,
  30. strict=strict,
  31. critical_columns=critical_columns,
  32. blocking_columns=blocking_columns,
  33. default_min_coverage=default_min_coverage,
  34. column_min_coverage=column_min_coverage,
  35. )
  36. quality['sources'] = {
  37. 'market_path': market_path,
  38. 'sidecar_paths': ordered_sidecars,
  39. 'sidecar_count': int(len(ordered_sidecars)),
  40. 'merged_row_count': int(len(pit)),
  41. }
  42. quality['pit_columns'] = sorted([str(col) for col in pit.columns])
  43. return pit, quality