| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- from __future__ import annotations
- from dataclasses import dataclass
- from datetime import datetime
- from pathlib import Path
- from typing import Optional
- import numpy as np
- import pandas as pd
- import sys
- REPO_ROOT = Path(__file__).resolve().parents[3]
- ROOT_DRAGON_DIR = REPO_ROOT / "dragon"
- if not ROOT_DRAGON_DIR.exists():
- raise ModuleNotFoundError(f"Expected dragon dependency directory at {ROOT_DRAGON_DIR}")
- if str(ROOT_DRAGON_DIR) not in sys.path:
- sys.path.append(str(ROOT_DRAGON_DIR))
- import MyTT # noqa: E402
- from data_fetcher_v2 import DataFetcherV2 # noqa: E402
- @dataclass
- class DragonIndicatorConfig:
- symbol: str = "399673"
- start_date: str = "2015-01-01"
- end_date: Optional[str] = None
- def _cross_up(left: np.ndarray, right: np.ndarray) -> np.ndarray:
- left_prev = np.roll(left, 1)
- right_prev = np.roll(right, 1)
- result = (left > right) & (left_prev <= right_prev)
- result[0] = False
- return result
- class DragonIndicatorEngine:
- def __init__(self, config: Optional[DragonIndicatorConfig] = None):
- self.config = config or DragonIndicatorConfig()
- self.fetcher = DataFetcherV2()
- self.last_fetch_meta: dict[str, object] = {}
- def fetch_daily_data(self, include_intraday_snapshot: bool = False) -> pd.DataFrame:
- end_date = self.config.end_date or datetime.now().strftime("%Y-%m-%d")
- if include_intraday_snapshot:
- df = self.fetcher.fetch_index_data_with_latest_snapshot_v2(
- symbol=self.config.symbol,
- start_date=self.config.start_date,
- end_date=end_date,
- )
- else:
- df = self.fetcher.fetch_index_data_v2(
- symbol=self.config.symbol,
- start_date=self.config.start_date,
- end_date=end_date,
- )
- if df.empty:
- raise RuntimeError(f"Failed to fetch daily data for {self.config.symbol}")
- self.last_fetch_meta = {
- "intraday_snapshot_appended": bool(df.attrs.get("intraday_snapshot_appended", False)),
- "intraday_snapshot_timestamp": df.attrs.get("intraday_snapshot_timestamp"),
- "historical_latest_bar_date": df.attrs.get(
- "historical_latest_bar_date",
- df.index[-1].date().isoformat(),
- ),
- }
- result = df.sort_index().copy()
- result.attrs.update(self.last_fetch_meta)
- return result
- def compute(self, df: pd.DataFrame) -> pd.DataFrame:
- if df.empty:
- return df.copy()
- result = df.copy()
- close = result["close"].to_numpy(dtype=float)
- high = result["high"].to_numpy(dtype=float)
- low = result["low"].to_numpy(dtype=float)
- open_ = result["open"].to_numpy(dtype=float)
- h1_5 = np.nan_to_num(MyTT.EMA(close, 8), nan=0.0)
- h2_5 = np.nan_to_num(MyTT.EMA(h1_5, 20), nan=0.0)
- rsv = np.nan_to_num((close - MyTT.LLV(low, 7)) / (MyTT.HHV(high, 7) - MyTT.LLV(low, 7)) * 100)
- y0 = np.nan_to_num(MyTT.SMA(rsv, 3, 1), nan=0.0)
- y1 = np.nan_to_num(MyTT.SMA(y0, 3, 1), nan=0.0)
- rsv1 = np.nan_to_num((close - MyTT.LLV(low, 38)) / (MyTT.HHV(high, 38) - MyTT.LLV(low, 38)) * 100)
- y2 = np.nan_to_num(MyTT.SMA(rsv1, 5, 1), nan=0.0)
- y3 = np.nan_to_num(MyTT.SMA(y2, 10, 1), nan=0.0)
- avg_h = (h1_5 + h2_5) / 2.0
- a1 = np.divide(h1_5 - h2_5, avg_h, out=np.zeros_like(avg_h), where=avg_h != 0)
- b1 = (y2 - y3) / 100.0
- c1 = (y2 + y3) / 2.0
- xopen = (MyTT.REF(open_, 1) + MyTT.REF(close, 1)) / 2.0
- xopen = np.nan_to_num(xopen, nan=close)
- xclose = close
- xhigh = np.maximum(high, xopen)
- xlow = np.minimum(low, xopen)
- ql_volatility = np.nan_to_num(MyTT.MA(xhigh - xlow, 8), nan=0.0)
- ql_mid = np.nan_to_num(MyTT.MA(xclose, 5), nan=0.0)
- ql_upper = ql_mid + ql_volatility / 2.0
- ql_lower = ql_mid - ql_volatility / 2.0
- kdj_buy = _cross_up(y0, y1)
- kdj_sell = _cross_up(y1, y0)
- ql_buy = _cross_up(xclose, ql_upper)
- ql_sell = _cross_up(ql_lower, xclose)
- result["h1_5"] = h1_5
- result["h2_5"] = h2_5
- result["a1"] = a1
- result["y0"] = y0
- result["y1"] = y1
- result["kdj_buy"] = kdj_buy
- result["kdj_sell"] = kdj_sell
- result["y2"] = y2
- result["y3"] = y3
- result["b1"] = b1
- result["c1"] = c1
- result["ql_xopen"] = xopen
- result["ql_upper"] = ql_upper
- result["ql_lower"] = ql_lower
- result["ql_buy"] = ql_buy
- result["ql_sell"] = ql_sell
- return result
|