from __future__ import annotations from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional import numpy as np import pandas as pd import sys REPO_ROOT = Path(__file__).resolve().parents[3] ROOT_DRAGON_DIR = REPO_ROOT / "dragon" if not ROOT_DRAGON_DIR.exists(): raise ModuleNotFoundError(f"Expected dragon dependency directory at {ROOT_DRAGON_DIR}") if str(ROOT_DRAGON_DIR) not in sys.path: sys.path.append(str(ROOT_DRAGON_DIR)) import MyTT # noqa: E402 from data_fetcher_v2 import DataFetcherV2 # noqa: E402 @dataclass class DragonIndicatorConfig: symbol: str = "399673" start_date: str = "2015-01-01" end_date: Optional[str] = None def _cross_up(left: np.ndarray, right: np.ndarray) -> np.ndarray: left_prev = np.roll(left, 1) right_prev = np.roll(right, 1) result = (left > right) & (left_prev <= right_prev) result[0] = False return result class DragonIndicatorEngine: def __init__(self, config: Optional[DragonIndicatorConfig] = None): self.config = config or DragonIndicatorConfig() self.fetcher = DataFetcherV2() self.last_fetch_meta: dict[str, object] = {} def fetch_daily_data(self, include_intraday_snapshot: bool = False) -> pd.DataFrame: end_date = self.config.end_date or datetime.now().strftime("%Y-%m-%d") if include_intraday_snapshot: df = self.fetcher.fetch_index_data_with_latest_snapshot_v2( symbol=self.config.symbol, start_date=self.config.start_date, end_date=end_date, ) else: df = self.fetcher.fetch_index_data_v2( symbol=self.config.symbol, start_date=self.config.start_date, end_date=end_date, ) if df.empty: raise RuntimeError(f"Failed to fetch daily data for {self.config.symbol}") self.last_fetch_meta = { "intraday_snapshot_appended": bool(df.attrs.get("intraday_snapshot_appended", False)), "intraday_snapshot_timestamp": df.attrs.get("intraday_snapshot_timestamp"), "historical_latest_bar_date": df.attrs.get( "historical_latest_bar_date", df.index[-1].date().isoformat(), ), } result = df.sort_index().copy() result.attrs.update(self.last_fetch_meta) return result def compute(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df.copy() result = df.copy() close = result["close"].to_numpy(dtype=float) high = result["high"].to_numpy(dtype=float) low = result["low"].to_numpy(dtype=float) open_ = result["open"].to_numpy(dtype=float) h1_5 = np.nan_to_num(MyTT.EMA(close, 8), nan=0.0) h2_5 = np.nan_to_num(MyTT.EMA(h1_5, 20), nan=0.0) rsv = np.nan_to_num((close - MyTT.LLV(low, 7)) / (MyTT.HHV(high, 7) - MyTT.LLV(low, 7)) * 100) y0 = np.nan_to_num(MyTT.SMA(rsv, 3, 1), nan=0.0) y1 = np.nan_to_num(MyTT.SMA(y0, 3, 1), nan=0.0) rsv1 = np.nan_to_num((close - MyTT.LLV(low, 38)) / (MyTT.HHV(high, 38) - MyTT.LLV(low, 38)) * 100) y2 = np.nan_to_num(MyTT.SMA(rsv1, 5, 1), nan=0.0) y3 = np.nan_to_num(MyTT.SMA(y2, 10, 1), nan=0.0) avg_h = (h1_5 + h2_5) / 2.0 a1 = np.divide(h1_5 - h2_5, avg_h, out=np.zeros_like(avg_h), where=avg_h != 0) b1 = (y2 - y3) / 100.0 c1 = (y2 + y3) / 2.0 xopen = (MyTT.REF(open_, 1) + MyTT.REF(close, 1)) / 2.0 xopen = np.nan_to_num(xopen, nan=close) xclose = close xhigh = np.maximum(high, xopen) xlow = np.minimum(low, xopen) ql_volatility = np.nan_to_num(MyTT.MA(xhigh - xlow, 8), nan=0.0) ql_mid = np.nan_to_num(MyTT.MA(xclose, 5), nan=0.0) ql_upper = ql_mid + ql_volatility / 2.0 ql_lower = ql_mid - ql_volatility / 2.0 kdj_buy = _cross_up(y0, y1) kdj_sell = _cross_up(y1, y0) ql_buy = _cross_up(xclose, ql_upper) ql_sell = _cross_up(ql_lower, xclose) result["h1_5"] = h1_5 result["h2_5"] = h2_5 result["a1"] = a1 result["y0"] = y0 result["y1"] = y1 result["kdj_buy"] = kdj_buy result["kdj_sell"] = kdj_sell result["y2"] = y2 result["y3"] = y3 result["b1"] = b1 result["c1"] = c1 result["ql_xopen"] = xopen result["ql_upper"] = ql_upper result["ql_lower"] = ql_lower result["ql_buy"] = ql_buy result["ql_sell"] = ql_sell return result