|
|
@@ -0,0 +1,313 @@
|
|
|
+from __future__ import annotations
|
|
|
+
|
|
|
+from datetime import date, timedelta
|
|
|
+from pathlib import Path
|
|
|
+from typing import Any
|
|
|
+
|
|
|
+import pandas as pd
|
|
|
+
|
|
|
+from src.data.config import get_instrument, load_instruments
|
|
|
+from src.data.exceptions import MissingLayerError, ProviderError
|
|
|
+from src.data.metadata import MetadataStore, update_manifest, utc_now_iso
|
|
|
+from src.data.models import Instrument, LayerName
|
|
|
+from src.data.providers import build_provider
|
|
|
+from src.data.providers.base import IndexPriceProvider
|
|
|
+from src.data.storage import DataLake, LocalParquetDataLake
|
|
|
+from src.data.transform import build_clean_frame, build_features_frame, build_raw_frame
|
|
|
+
|
|
|
+
|
|
|
+class DataPipeline:
|
|
|
+ def __init__(
|
|
|
+ self,
|
|
|
+ repo_root: Path,
|
|
|
+ *,
|
|
|
+ config_path: Path | None = None,
|
|
|
+ data_root: Path | None = None,
|
|
|
+ provider: IndexPriceProvider | None = None,
|
|
|
+ datalake: DataLake | None = None,
|
|
|
+ metadata_store: MetadataStore | None = None,
|
|
|
+ ) -> None:
|
|
|
+ self.repo_root = repo_root
|
|
|
+ self.config_path = config_path or repo_root / "configs" / "instruments.yaml"
|
|
|
+ self.data_root = data_root or repo_root / "data"
|
|
|
+ self.instruments = load_instruments(self.config_path)
|
|
|
+ self.provider = provider or build_provider("akshare")
|
|
|
+ self.datalake = datalake or LocalParquetDataLake(self.data_root)
|
|
|
+ self.metadata_store = metadata_store or MetadataStore(self.data_root / "meta")
|
|
|
+ self.datalake.ensure_layout()
|
|
|
+ self.metadata_store.ensure_layout()
|
|
|
+
|
|
|
+ def bootstrap_all(self, today: date | None = None) -> dict[str, Any]:
|
|
|
+ self.datalake.validate_runtime()
|
|
|
+ reference_date = today or date.today()
|
|
|
+ results: dict[str, Any] = {}
|
|
|
+ for instrument in self.instruments.values():
|
|
|
+ results[instrument.key] = self._refresh_from_provider(
|
|
|
+ instrument=instrument,
|
|
|
+ request_start=instrument.bootstrap_start,
|
|
|
+ request_end=reference_date,
|
|
|
+ operation="bootstrap",
|
|
|
+ )
|
|
|
+ return results
|
|
|
+
|
|
|
+ def update_since_last(self, today: date | None = None) -> dict[str, Any]:
|
|
|
+ self.datalake.validate_runtime()
|
|
|
+ reference_date = today or date.today()
|
|
|
+ results: dict[str, Any] = {}
|
|
|
+ for instrument in self.instruments.values():
|
|
|
+ existing_raw = self._read_existing("raw", instrument.key)
|
|
|
+ if existing_raw.empty:
|
|
|
+ request_start = instrument.bootstrap_start
|
|
|
+ else:
|
|
|
+ last_date = pd.to_datetime(existing_raw["trade_date"]).max().date()
|
|
|
+ request_start = last_date + timedelta(days=1)
|
|
|
+ results[instrument.key] = self._refresh_from_provider(
|
|
|
+ instrument=instrument,
|
|
|
+ request_start=request_start,
|
|
|
+ request_end=reference_date,
|
|
|
+ operation="update",
|
|
|
+ )
|
|
|
+ return results
|
|
|
+
|
|
|
+ def backfill(self, instrument_key: str, start_date: date, today: date | None = None) -> dict[str, Any]:
|
|
|
+ self.datalake.validate_runtime()
|
|
|
+ instrument = get_instrument(self.instruments, instrument_key)
|
|
|
+ reference_date = today or date.today()
|
|
|
+ return self._refresh_from_provider(
|
|
|
+ instrument=instrument,
|
|
|
+ request_start=start_date,
|
|
|
+ request_end=reference_date,
|
|
|
+ operation="backfill",
|
|
|
+ )
|
|
|
+
|
|
|
+ def repair(self, instrument_key: str, layer: LayerName) -> dict[str, Any]:
|
|
|
+ self.datalake.validate_runtime()
|
|
|
+ instrument = get_instrument(self.instruments, instrument_key)
|
|
|
+ if layer == "raw":
|
|
|
+ raise MissingLayerError("Raw layer cannot be repaired locally. Use bootstrap/update/backfill.")
|
|
|
+ raw_frame = self._read_existing("raw", instrument_key)
|
|
|
+ if raw_frame.empty:
|
|
|
+ raise MissingLayerError(f"Missing raw layer for {instrument_key}")
|
|
|
+
|
|
|
+ clean_frame = build_clean_frame(raw_frame, instrument)
|
|
|
+ clean_summary = self._write_layer("clean", instrument, clean_frame)
|
|
|
+ features_frame = build_features_frame(clean_frame)
|
|
|
+ feature_summary = self._write_layer("features", instrument, features_frame)
|
|
|
+ layer_summaries = {"clean": clean_summary, "features": feature_summary}
|
|
|
+
|
|
|
+ manifest = self.metadata_store.load_manifest()
|
|
|
+ updated_manifest = update_manifest(
|
|
|
+ manifest,
|
|
|
+ instrument=instrument,
|
|
|
+ provider_name=self.provider.name,
|
|
|
+ operation=f"repair_{layer}",
|
|
|
+ requested_start=instrument.bootstrap_start.isoformat(),
|
|
|
+ actual_start=self._frame_start_date(raw_frame),
|
|
|
+ layer_summaries={
|
|
|
+ "raw": self._existing_layer_summary(instrument, "raw", raw_frame),
|
|
|
+ **self._merge_existing_layer_summaries(instrument, layer_summaries),
|
|
|
+ },
|
|
|
+ )
|
|
|
+ self.metadata_store.save_manifest(updated_manifest)
|
|
|
+ self._append_layer_logs(
|
|
|
+ instrument=instrument,
|
|
|
+ operation="repair",
|
|
|
+ requested_start=clean_summary["start_date"] or instrument.bootstrap_start.isoformat(),
|
|
|
+ requested_end=feature_summary["end_date"] or clean_summary["end_date"],
|
|
|
+ fetched_rows=0,
|
|
|
+ layer_summaries=layer_summaries,
|
|
|
+ trigger_layer=layer,
|
|
|
+ )
|
|
|
+ return layer_summaries
|
|
|
+
|
|
|
+ def status_snapshot(self) -> dict[str, Any]:
|
|
|
+ return self.metadata_store.load_manifest()
|
|
|
+
|
|
|
+ def _refresh_from_provider(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ instrument: Instrument,
|
|
|
+ request_start: date,
|
|
|
+ request_end: date,
|
|
|
+ operation: str,
|
|
|
+ ) -> dict[str, Any]:
|
|
|
+ existing_raw = self._read_existing("raw", instrument.key)
|
|
|
+ if request_start > request_end:
|
|
|
+ layer_summaries = self._materialize_local_layers(instrument, existing_raw)
|
|
|
+ self._persist_metadata(
|
|
|
+ instrument=instrument,
|
|
|
+ operation=operation,
|
|
|
+ requested_start=request_start,
|
|
|
+ requested_end=request_end,
|
|
|
+ fetched_rows=0,
|
|
|
+ layer_summaries=layer_summaries,
|
|
|
+ )
|
|
|
+ return layer_summaries
|
|
|
+
|
|
|
+ fetched = self.provider.fetch_price_history(instrument, request_start, request_end)
|
|
|
+ fetched_raw = build_raw_frame(fetched, instrument, self.provider.name)
|
|
|
+ if fetched_raw.empty and existing_raw.empty:
|
|
|
+ raise ProviderError(
|
|
|
+ f"No data returned for {instrument.key} between "
|
|
|
+ f"{request_start.isoformat()} and {request_end.isoformat()}"
|
|
|
+ )
|
|
|
+
|
|
|
+ merged_raw = merge_frames(existing_raw, fetched_raw)
|
|
|
+ layer_summaries = self._materialize_local_layers(instrument, merged_raw)
|
|
|
+ self._persist_metadata(
|
|
|
+ instrument=instrument,
|
|
|
+ operation=operation,
|
|
|
+ requested_start=request_start,
|
|
|
+ requested_end=request_end,
|
|
|
+ fetched_rows=len(fetched_raw),
|
|
|
+ layer_summaries=layer_summaries,
|
|
|
+ )
|
|
|
+ return layer_summaries
|
|
|
+
|
|
|
+ def _materialize_local_layers(
|
|
|
+ self,
|
|
|
+ instrument: Instrument,
|
|
|
+ raw_frame: pd.DataFrame,
|
|
|
+ ) -> dict[str, dict[str, Any]]:
|
|
|
+ raw_summary = self._write_layer("raw", instrument, raw_frame)
|
|
|
+ clean_frame = build_clean_frame(raw_frame, instrument)
|
|
|
+ clean_summary = self._write_layer("clean", instrument, clean_frame)
|
|
|
+ features_frame = build_features_frame(clean_frame)
|
|
|
+ feature_summary = self._write_layer("features", instrument, features_frame)
|
|
|
+ return {"raw": raw_summary, "clean": clean_summary, "features": feature_summary}
|
|
|
+
|
|
|
+ def _persist_metadata(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ instrument: Instrument,
|
|
|
+ operation: str,
|
|
|
+ requested_start: date,
|
|
|
+ requested_end: date,
|
|
|
+ fetched_rows: int,
|
|
|
+ layer_summaries: dict[str, dict[str, Any]],
|
|
|
+ ) -> None:
|
|
|
+ manifest = self.metadata_store.load_manifest()
|
|
|
+ actual_start = layer_summaries["raw"]["start_date"]
|
|
|
+ updated_manifest = update_manifest(
|
|
|
+ manifest,
|
|
|
+ instrument=instrument,
|
|
|
+ provider_name=self.provider.name,
|
|
|
+ operation=operation,
|
|
|
+ requested_start=requested_start.isoformat(),
|
|
|
+ actual_start=actual_start,
|
|
|
+ layer_summaries=layer_summaries,
|
|
|
+ )
|
|
|
+ self.metadata_store.save_manifest(updated_manifest)
|
|
|
+ self._append_layer_logs(
|
|
|
+ instrument=instrument,
|
|
|
+ operation=operation,
|
|
|
+ requested_start=requested_start.isoformat(),
|
|
|
+ requested_end=requested_end.isoformat(),
|
|
|
+ fetched_rows=fetched_rows,
|
|
|
+ layer_summaries=layer_summaries,
|
|
|
+ trigger_layer="raw",
|
|
|
+ )
|
|
|
+
|
|
|
+ def _write_layer(
|
|
|
+ self,
|
|
|
+ layer: LayerName,
|
|
|
+ instrument: Instrument,
|
|
|
+ frame: pd.DataFrame,
|
|
|
+ ) -> dict[str, Any]:
|
|
|
+ path = self.datalake.write_layer(layer, instrument.key, frame)
|
|
|
+ return summarize_frame(self.repo_root, path, frame)
|
|
|
+
|
|
|
+ def _read_existing(self, layer: LayerName, instrument_key: str) -> pd.DataFrame:
|
|
|
+ if not self.datalake.exists(layer, instrument_key):
|
|
|
+ return pd.DataFrame()
|
|
|
+ return self.datalake.read_layer(layer, instrument_key)
|
|
|
+
|
|
|
+ def _existing_layer_summary(
|
|
|
+ self,
|
|
|
+ instrument: Instrument,
|
|
|
+ layer: LayerName,
|
|
|
+ frame: pd.DataFrame | None = None,
|
|
|
+ ) -> dict[str, Any]:
|
|
|
+ resolved_frame = frame if frame is not None else self._read_existing(layer, instrument.key)
|
|
|
+ return summarize_frame(self.repo_root, self.datalake.layer_path(layer, instrument.key), resolved_frame)
|
|
|
+
|
|
|
+ def _merge_existing_layer_summaries(
|
|
|
+ self,
|
|
|
+ instrument: Instrument,
|
|
|
+ updated_layers: dict[str, dict[str, Any]],
|
|
|
+ ) -> dict[str, dict[str, Any]]:
|
|
|
+ summaries = updated_layers.copy()
|
|
|
+ for layer in ("clean", "features"):
|
|
|
+ if layer not in summaries and self.datalake.exists(layer, instrument.key):
|
|
|
+ summaries[layer] = self._existing_layer_summary(instrument, layer)
|
|
|
+ return summaries
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _frame_start_date(frame: pd.DataFrame) -> str | None:
|
|
|
+ if frame.empty:
|
|
|
+ return None
|
|
|
+ return pd.to_datetime(frame["trade_date"]).min().date().isoformat()
|
|
|
+
|
|
|
+ def _append_layer_logs(
|
|
|
+ self,
|
|
|
+ *,
|
|
|
+ instrument: Instrument,
|
|
|
+ operation: str,
|
|
|
+ requested_start: str,
|
|
|
+ requested_end: str | None,
|
|
|
+ fetched_rows: int,
|
|
|
+ layer_summaries: dict[str, dict[str, Any]],
|
|
|
+ trigger_layer: str,
|
|
|
+ ) -> None:
|
|
|
+ for layer_name, summary in layer_summaries.items():
|
|
|
+ payload = {
|
|
|
+ "timestamp": utc_now_iso(),
|
|
|
+ "instrument": instrument.key,
|
|
|
+ "operation": operation,
|
|
|
+ "layer": layer_name,
|
|
|
+ "requested_start": requested_start,
|
|
|
+ "requested_end": requested_end,
|
|
|
+ "rows_after_merge": summary["rows"],
|
|
|
+ "status": "success",
|
|
|
+ "provider": self.provider.name,
|
|
|
+ "trigger_layer": trigger_layer,
|
|
|
+ }
|
|
|
+ if layer_name == "raw":
|
|
|
+ payload["fetched_rows"] = fetched_rows
|
|
|
+ self.metadata_store.append_fetch_log(payload)
|
|
|
+
|
|
|
+
|
|
|
+def merge_frames(existing: pd.DataFrame, incoming: pd.DataFrame) -> pd.DataFrame:
|
|
|
+ if existing.empty:
|
|
|
+ return incoming.reset_index(drop=True)
|
|
|
+ if incoming.empty:
|
|
|
+ return existing.reset_index(drop=True)
|
|
|
+ merged = pd.concat([existing, incoming], ignore_index=True)
|
|
|
+ merged["trade_date"] = pd.to_datetime(merged["trade_date"], errors="coerce")
|
|
|
+ merged = merged.sort_values("trade_date").drop_duplicates("trade_date", keep="last")
|
|
|
+ return merged.reset_index(drop=True)
|
|
|
+
|
|
|
+
|
|
|
+def summarize_frame(repo_root: Path, path: Path, frame: pd.DataFrame) -> dict[str, Any]:
|
|
|
+ summary = {
|
|
|
+ "path": _path_for_manifest(repo_root, path),
|
|
|
+ "rows": int(len(frame.index)),
|
|
|
+ "updated_at": utc_now_iso(),
|
|
|
+ "start_date": None,
|
|
|
+ "end_date": None,
|
|
|
+ }
|
|
|
+ if not frame.empty and "trade_date" in frame.columns:
|
|
|
+ dates = pd.to_datetime(frame["trade_date"], errors="coerce").dropna()
|
|
|
+ if not dates.empty:
|
|
|
+ summary["start_date"] = dates.min().date().isoformat()
|
|
|
+ summary["end_date"] = dates.max().date().isoformat()
|
|
|
+ if path.exists():
|
|
|
+ summary["file_size_bytes"] = path.stat().st_size
|
|
|
+ return summary
|
|
|
+
|
|
|
+
|
|
|
+def _path_for_manifest(repo_root: Path, path: Path) -> str:
|
|
|
+ try:
|
|
|
+ return str(path.relative_to(repo_root))
|
|
|
+ except ValueError:
|
|
|
+ return str(path)
|