| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- from __future__ import annotations
- import json
- import tempfile
- import unittest
- from datetime import date
- from pathlib import Path
- import pandas as pd
- from src.data.metadata import MetadataStore
- from src.data.pipeline import DataPipeline
- from src.data.providers.base import IndexPriceProvider
- from src.data.storage import InMemoryDataLake
- from src.data.transform import build_clean_frame, build_features_frame
- class FakeProvider(IndexPriceProvider):
- name = "fake_provider"
- def __init__(self, frames: dict[str, list[pd.DataFrame]]) -> None:
- self.frames = frames
- self.calls: list[tuple[str, date, date]] = []
- def fetch_price_history(self, instrument, start_date, end_date) -> pd.DataFrame:
- self.calls.append((instrument.key, start_date, end_date))
- queue = self.frames.setdefault(instrument.key, [])
- if not queue:
- return pd.DataFrame(columns=["trade_date", "open", "close", "high", "low", "volume", "amount"])
- return queue.pop(0).copy()
- def make_price_frame(start: str, closes: list[float]) -> pd.DataFrame:
- dates = pd.date_range(start=start, periods=len(closes), freq="D")
- frame = pd.DataFrame(
- {
- "trade_date": dates,
- "open": closes,
- "close": closes,
- "high": [value + 1 for value in closes],
- "low": [value - 1 for value in closes],
- "volume": [1000 + idx for idx in range(len(closes))],
- "amount": [100000 + idx for idx in range(len(closes))],
- }
- )
- return frame
- class PipelineTests(unittest.TestCase):
- def setUp(self) -> None:
- self.temp_dir = tempfile.TemporaryDirectory()
- self.addCleanup(self.temp_dir.cleanup)
- self.root = Path(self.temp_dir.name)
- (self.root / "configs").mkdir(parents=True, exist_ok=True)
- (self.root / "data" / "meta").mkdir(parents=True, exist_ok=True)
- self.config_path = self.root / "configs" / "instruments.yaml"
- self.config_path.write_text(
- "\n".join(
- [
- "instruments:",
- " sse50:",
- " name: 上证50",
- " index_code: \"000016\"",
- " provider_symbol: sh000016",
- " exchange: SSE",
- " price_type: price_index",
- " bootstrap_start: \"2003-12-31\"",
- ]
- ),
- encoding="utf-8",
- )
- def test_features_do_not_change_when_future_rows_are_appended(self) -> None:
- clean_a = pd.DataFrame(
- {
- "instrument": ["sse50"] * 25,
- "instrument_name": ["上证50"] * 25,
- "index_code": ["000016"] * 25,
- "provider": ["fake"] * 25,
- "price_type": ["price_index"] * 25,
- "trade_date": pd.date_range("2020-01-01", periods=25, freq="D"),
- "open": range(1, 26),
- "high": range(2, 27),
- "low": range(0, 25),
- "close": range(1, 26),
- "prev_close": [None] + list(range(1, 25)),
- "change_amount": [None] + [1] * 24,
- "daily_return": [None] + [1.0 / value for value in range(1, 25)],
- "volume": [100] * 25,
- "amount": [1000] * 25,
- }
- )
- features_a = build_features_frame(clean_a)
- features_b = build_features_frame(pd.concat([clean_a, clean_a.tail(1).assign(trade_date=pd.Timestamp("2020-01-26"), close=26, open=26, high=27, low=25, prev_close=25, change_amount=1, daily_return=0.04)], ignore_index=True))
- pd.testing.assert_frame_equal(
- features_a.iloc[:25].reset_index(drop=True),
- features_b.iloc[:25].reset_index(drop=True),
- )
- def test_bootstrap_then_incremental_update_merges_raw_and_updates_manifest(self) -> None:
- provider = FakeProvider(
- {
- "sse50": [
- make_price_frame("2020-01-01", [10, 11, 12]),
- make_price_frame("2020-01-04", [13, 14]),
- ]
- }
- )
- datalake = InMemoryDataLake(self.root / "memory")
- metadata = MetadataStore(self.root / "data" / "meta")
- pipeline = DataPipeline(
- repo_root=self.root,
- config_path=self.config_path,
- data_root=self.root / "data",
- provider=provider,
- datalake=datalake,
- metadata_store=metadata,
- )
- bootstrap = pipeline.bootstrap_all(today=date(2020, 1, 3))
- self.assertEqual(bootstrap["sse50"]["raw"]["rows"], 3)
- update = pipeline.update_since_last(today=date(2020, 1, 5))
- self.assertEqual(update["sse50"]["raw"]["rows"], 5)
- raw_frame = datalake.read_layer("raw", "sse50")
- self.assertEqual(len(raw_frame.index), 5)
- manifest = metadata.load_manifest()
- self.assertEqual(manifest["instruments"]["sse50"]["actual_start"], "2020-01-01")
- self.assertEqual(manifest["instruments"]["sse50"]["layers"]["features"]["end_date"], "2020-01-05")
- fetch_log_lines = (self.root / "data" / "meta" / "fetch_log.jsonl").read_text(encoding="utf-8").strip().splitlines()
- self.assertEqual(len(fetch_log_lines), 6)
- latest_payload = json.loads(fetch_log_lines[-1])
- self.assertEqual(latest_payload["layer"], "features")
- self.assertEqual(latest_payload["operation"], "update")
- def test_repair_features_uses_local_clean_only(self) -> None:
- provider = FakeProvider({"sse50": [make_price_frame("2020-01-01", [10, 11, 12, 13, 14])]})
- datalake = InMemoryDataLake(self.root / "memory")
- metadata = MetadataStore(self.root / "data" / "meta")
- pipeline = DataPipeline(
- repo_root=self.root,
- config_path=self.config_path,
- data_root=self.root / "data",
- provider=provider,
- datalake=datalake,
- metadata_store=metadata,
- )
- pipeline.bootstrap_all(today=date(2020, 1, 5))
- datalake.write_layer("features", "sse50", pd.DataFrame({"broken": [1]}))
- repaired = pipeline.repair("sse50", "features")
- repaired_frame = datalake.read_layer("features", "sse50")
- self.assertIn("ret_1d", repaired_frame.columns)
- self.assertEqual(repaired["features"]["rows"], 5)
- self.assertEqual(len(provider.calls), 1)
- def test_repair_clean_rebuilds_features_as_downstream_dependency(self) -> None:
- provider = FakeProvider({"sse50": [make_price_frame("2020-01-01", [10, 11, 12, 13, 14])]})
- datalake = InMemoryDataLake(self.root / "memory")
- metadata = MetadataStore(self.root / "data" / "meta")
- pipeline = DataPipeline(
- repo_root=self.root,
- config_path=self.config_path,
- data_root=self.root / "data",
- provider=provider,
- datalake=datalake,
- metadata_store=metadata,
- )
- pipeline.bootstrap_all(today=date(2020, 1, 5))
- datalake.write_layer("features", "sse50", pd.DataFrame({"broken": [1]}))
- repaired = pipeline.repair("sse50", "clean")
- repaired_frame = datalake.read_layer("features", "sse50")
- self.assertIn("ma_5", repaired_frame.columns)
- self.assertEqual(repaired["clean"]["rows"], 5)
- self.assertEqual(repaired["features"]["rows"], 5)
- def test_clean_layer_computes_prev_close_and_daily_return(self) -> None:
- raw_frame = make_price_frame("2020-01-01", [10, 12, 18])
- raw_frame["instrument"] = "sse50"
- raw_frame["instrument_name"] = "上证50"
- raw_frame["index_code"] = "000016"
- raw_frame["provider"] = "fake"
- raw_frame = raw_frame[
- ["instrument", "instrument_name", "index_code", "provider", "trade_date", "open", "high", "low", "close", "volume", "amount"]
- ]
- class InstrumentStub:
- price_type = "price_index"
- clean = build_clean_frame(raw_frame, InstrumentStub())
- self.assertTrue(pd.isna(clean.loc[0, "prev_close"]))
- self.assertAlmostEqual(clean.loc[1, "daily_return"], 0.2)
- self.assertAlmostEqual(clean.loc[2, "daily_return"], 0.5)
- if __name__ == "__main__":
- unittest.main()
|