from __future__ import annotations import json import tempfile import unittest from datetime import date from pathlib import Path import pandas as pd from src.data.metadata import MetadataStore from src.data.pipeline import DataPipeline from src.data.providers.base import IndexPriceProvider from src.data.storage import InMemoryDataLake from src.data.transform import build_clean_frame, build_features_frame class FakeProvider(IndexPriceProvider): name = "fake_provider" def __init__(self, frames: dict[str, list[pd.DataFrame]]) -> None: self.frames = frames self.calls: list[tuple[str, date, date]] = [] def fetch_price_history(self, instrument, start_date, end_date) -> pd.DataFrame: self.calls.append((instrument.key, start_date, end_date)) queue = self.frames.setdefault(instrument.key, []) if not queue: return pd.DataFrame(columns=["trade_date", "open", "close", "high", "low", "volume", "amount"]) return queue.pop(0).copy() def make_price_frame(start: str, closes: list[float]) -> pd.DataFrame: dates = pd.date_range(start=start, periods=len(closes), freq="D") frame = pd.DataFrame( { "trade_date": dates, "open": closes, "close": closes, "high": [value + 1 for value in closes], "low": [value - 1 for value in closes], "volume": [1000 + idx for idx in range(len(closes))], "amount": [100000 + idx for idx in range(len(closes))], } ) return frame class PipelineTests(unittest.TestCase): def setUp(self) -> None: self.temp_dir = tempfile.TemporaryDirectory() self.addCleanup(self.temp_dir.cleanup) self.root = Path(self.temp_dir.name) (self.root / "configs").mkdir(parents=True, exist_ok=True) (self.root / "data" / "meta").mkdir(parents=True, exist_ok=True) self.config_path = self.root / "configs" / "instruments.yaml" self.config_path.write_text( "\n".join( [ "instruments:", " sse50:", " name: 上证50", " index_code: \"000016\"", " provider_symbol: sh000016", " exchange: SSE", " price_type: price_index", " bootstrap_start: \"2003-12-31\"", ] ), encoding="utf-8", ) def test_features_do_not_change_when_future_rows_are_appended(self) -> None: clean_a = pd.DataFrame( { "instrument": ["sse50"] * 25, "instrument_name": ["上证50"] * 25, "index_code": ["000016"] * 25, "provider": ["fake"] * 25, "price_type": ["price_index"] * 25, "trade_date": pd.date_range("2020-01-01", periods=25, freq="D"), "open": range(1, 26), "high": range(2, 27), "low": range(0, 25), "close": range(1, 26), "prev_close": [None] + list(range(1, 25)), "change_amount": [None] + [1] * 24, "daily_return": [None] + [1.0 / value for value in range(1, 25)], "volume": [100] * 25, "amount": [1000] * 25, } ) features_a = build_features_frame(clean_a) features_b = build_features_frame(pd.concat([clean_a, clean_a.tail(1).assign(trade_date=pd.Timestamp("2020-01-26"), close=26, open=26, high=27, low=25, prev_close=25, change_amount=1, daily_return=0.04)], ignore_index=True)) pd.testing.assert_frame_equal( features_a.iloc[:25].reset_index(drop=True), features_b.iloc[:25].reset_index(drop=True), ) def test_bootstrap_then_incremental_update_merges_raw_and_updates_manifest(self) -> None: provider = FakeProvider( { "sse50": [ make_price_frame("2020-01-01", [10, 11, 12]), make_price_frame("2020-01-04", [13, 14]), ] } ) datalake = InMemoryDataLake(self.root / "memory") metadata = MetadataStore(self.root / "data" / "meta") pipeline = DataPipeline( repo_root=self.root, config_path=self.config_path, data_root=self.root / "data", provider=provider, datalake=datalake, metadata_store=metadata, ) bootstrap = pipeline.bootstrap_all(today=date(2020, 1, 3)) self.assertEqual(bootstrap["sse50"]["raw"]["rows"], 3) update = pipeline.update_since_last(today=date(2020, 1, 5)) self.assertEqual(update["sse50"]["raw"]["rows"], 5) raw_frame = datalake.read_layer("raw", "sse50") self.assertEqual(len(raw_frame.index), 5) manifest = metadata.load_manifest() self.assertEqual(manifest["instruments"]["sse50"]["actual_start"], "2020-01-01") self.assertEqual(manifest["instruments"]["sse50"]["layers"]["features"]["end_date"], "2020-01-05") fetch_log_lines = (self.root / "data" / "meta" / "fetch_log.jsonl").read_text(encoding="utf-8").strip().splitlines() self.assertEqual(len(fetch_log_lines), 6) latest_payload = json.loads(fetch_log_lines[-1]) self.assertEqual(latest_payload["layer"], "features") self.assertEqual(latest_payload["operation"], "update") def test_repair_features_uses_local_clean_only(self) -> None: provider = FakeProvider({"sse50": [make_price_frame("2020-01-01", [10, 11, 12, 13, 14])]}) datalake = InMemoryDataLake(self.root / "memory") metadata = MetadataStore(self.root / "data" / "meta") pipeline = DataPipeline( repo_root=self.root, config_path=self.config_path, data_root=self.root / "data", provider=provider, datalake=datalake, metadata_store=metadata, ) pipeline.bootstrap_all(today=date(2020, 1, 5)) datalake.write_layer("features", "sse50", pd.DataFrame({"broken": [1]})) repaired = pipeline.repair("sse50", "features") repaired_frame = datalake.read_layer("features", "sse50") self.assertIn("ret_1d", repaired_frame.columns) self.assertEqual(repaired["features"]["rows"], 5) self.assertEqual(len(provider.calls), 1) def test_repair_clean_rebuilds_features_as_downstream_dependency(self) -> None: provider = FakeProvider({"sse50": [make_price_frame("2020-01-01", [10, 11, 12, 13, 14])]}) datalake = InMemoryDataLake(self.root / "memory") metadata = MetadataStore(self.root / "data" / "meta") pipeline = DataPipeline( repo_root=self.root, config_path=self.config_path, data_root=self.root / "data", provider=provider, datalake=datalake, metadata_store=metadata, ) pipeline.bootstrap_all(today=date(2020, 1, 5)) datalake.write_layer("features", "sse50", pd.DataFrame({"broken": [1]})) repaired = pipeline.repair("sse50", "clean") repaired_frame = datalake.read_layer("features", "sse50") self.assertIn("ma_5", repaired_frame.columns) self.assertEqual(repaired["clean"]["rows"], 5) self.assertEqual(repaired["features"]["rows"], 5) def test_clean_layer_computes_prev_close_and_daily_return(self) -> None: raw_frame = make_price_frame("2020-01-01", [10, 12, 18]) raw_frame["instrument"] = "sse50" raw_frame["instrument_name"] = "上证50" raw_frame["index_code"] = "000016" raw_frame["provider"] = "fake" raw_frame = raw_frame[ ["instrument", "instrument_name", "index_code", "provider", "trade_date", "open", "high", "low", "close", "volume", "amount"] ] class InstrumentStub: price_type = "price_index" clean = build_clean_frame(raw_frame, InstrumentStub()) self.assertTrue(pd.isna(clean.loc[0, "prev_close"])) self.assertAlmostEqual(clean.loc[1, "daily_return"], 0.2) self.assertAlmostEqual(clean.loc[2, "daily_return"], 0.5) if __name__ == "__main__": unittest.main()