from __future__ import annotations from pathlib import Path import pandas as pd def _load_csv(base_dir: Path, name: str) -> pd.DataFrame: return pd.read_csv(base_dir / name, encoding="utf-8-sig") def _annotate_buy_cluster(row: pd.Series) -> str: a1 = float(row["a1"]) b1 = float(row["b1"]) c1 = float(row["c1"]) ql_buy = bool(row["ql_buy"]) if ql_buy and a1 > 0.03 and b1 > 0.15: return "strong_dual_gold_reconfirm" if b1 > 0.24 and c1 > 45: return "early_strength_reconfirm" if a1 > 0.07 and c1 > 90: return "super_hot_trend_reconfirm" return "other_hold_reconfirm" def _annotate_sell_cluster(row: pd.Series) -> str: reason = str(row["reason"]) a1 = float(row["a1"]) b1 = float(row["b1"]) c1 = float(row["c1"]) days_from_prev_real_sell = row["days_from_prev_real_sell"] ql_sell = bool(row["ql_sell"]) kdj_sell = bool(row["kdj_sell"]) if reason.endswith("state_crash_followthrough"): return "crash_followthrough" if pd.notna(days_from_prev_real_sell) and 0 < float(days_from_prev_real_sell) <= 10: return "post_exit_confirmation" if c1 > 80 and (ql_sell or kdj_sell): return "high_zone_warning" if b1 < -0.12 and (ql_sell or kdj_sell): return "strong_break_warning" if ql_sell and c1 < 20 and a1 < -0.02: return "deep_oversold_followthrough" return "other_bearish_followthrough" def _build_real_trade_lookup(strategy_events: pd.DataFrame, side: str) -> pd.DataFrame: real = strategy_events[(strategy_events["layer"] == "real_trade") & (strategy_events["side"] == side)].copy() real["dt"] = pd.to_datetime(real["date"]) real = real.sort_values("dt") return real[["dt", "date", "reason", "c1"]] def _attach_nearest_real_context(aux: pd.DataFrame, real_buy: pd.DataFrame, real_sell: pd.DataFrame) -> pd.DataFrame: aux = aux.copy() aux["dt"] = pd.to_datetime(aux["date"]) prev_buy_date: list[str | None] = [] days_from_prev_real_buy: list[int | None] = [] prev_sell_date: list[str | None] = [] days_from_prev_real_sell: list[int | None] = [] buy_dates = real_buy["dt"].tolist() sell_dates = real_sell["dt"].tolist() for dt in aux["dt"]: earlier_buys = [x for x in buy_dates if x < dt] earlier_sells = [x for x in sell_dates if x < dt] if earlier_buys: prev_buy = earlier_buys[-1] prev_buy_date.append(prev_buy.date().isoformat()) days_from_prev_real_buy.append((dt - prev_buy).days) else: prev_buy_date.append(None) days_from_prev_real_buy.append(None) if earlier_sells: prev_sell = earlier_sells[-1] prev_sell_date.append(prev_sell.date().isoformat()) days_from_prev_real_sell.append((dt - prev_sell).days) else: prev_sell_date.append(None) days_from_prev_real_sell.append(None) aux["prev_real_buy_date"] = prev_buy_date aux["days_from_prev_real_buy"] = days_from_prev_real_buy aux["prev_real_sell_date"] = prev_sell_date aux["days_from_prev_real_sell"] = days_from_prev_real_sell return aux def main() -> None: base_dir = Path(__file__).resolve().parent strategy_events = _load_csv(base_dir, "dragon_strategy_events.csv") workbook_layers = _load_csv(base_dir, "dragon_workbook_layers.csv") aux = strategy_events[strategy_events["layer"] == "aux_signal"].copy() aux = aux.sort_values(["side", "date"]).reset_index(drop=True) workbook_aux = workbook_layers[workbook_layers["layer"] == "aux_signal"][ ["date", "side", "signal_reason", "note"] ].copy() workbook_aux = workbook_aux.rename(columns={"signal_reason": "workbook_signal_reason", "note": "workbook_note"}) real_buy = _build_real_trade_lookup(strategy_events, "BUY") real_sell = _build_real_trade_lookup(strategy_events, "SELL") aux = _attach_nearest_real_context(aux, real_buy, real_sell) aux = aux.merge(workbook_aux, on=["date", "side"], how="left") aux["matched_workbook_aux"] = aux["workbook_signal_reason"].notna() buy_mask = aux["side"] == "BUY" sell_mask = aux["side"] == "SELL" aux.loc[buy_mask, "cluster"] = aux.loc[buy_mask].apply(_annotate_buy_cluster, axis=1) aux.loc[sell_mask, "cluster"] = aux.loc[sell_mask].apply(_annotate_sell_cluster, axis=1) audit_cols = [ "date", "side", "reason", "cluster", "matched_workbook_aux", "prev_real_buy_date", "days_from_prev_real_buy", "prev_real_sell_date", "days_from_prev_real_sell", "a1", "b1", "c1", "kdj_buy", "kdj_sell", "ql_buy", "ql_sell", "workbook_signal_reason", "workbook_note", ] aux[audit_cols].to_csv(base_dir / "dragon_aux_signal_audit.csv", index=False, encoding="utf-8-sig") lines = [ "# Dragon Aux Signal Audit", "", f"- Strategy aux signals: `{len(aux)}`", f"- Aux BUY: `{int(buy_mask.sum())}`", f"- Aux SELL: `{int(sell_mask.sum())}`", f"- Workbook overlap: `{int(aux['matched_workbook_aux'].sum())}`", "", "## Cluster Summary", ] summary = ( aux.groupby(["side", "cluster", "matched_workbook_aux"]) .size() .reset_index(name="count") .sort_values(["side", "cluster", "matched_workbook_aux"]) ) for _, row in summary.iterrows(): lines.append( f"- {row['side']} / {row['cluster']} / matched `{bool(row['matched_workbook_aux'])}`: `{int(row['count'])}`" ) lines.extend(["", "## Workbook-Matched Aux Rows"]) matched_rows = aux[aux["matched_workbook_aux"]].copy() for _, row in matched_rows.iterrows(): lines.append( f"- {row['date']} {row['side']} `{row['reason']}` -> `{row['cluster']}` | a1 `{row['a1']:.4f}` b1 `{row['b1']:.4f}` c1 `{row['c1']:.2f}`" ) lines.extend(["", "## Top Unmatched Buckets"]) unmatched_summary = ( aux[~aux["matched_workbook_aux"]] .groupby(["side", "cluster"]) .size() .reset_index(name="count") .sort_values("count", ascending=False) .head(12) ) for _, row in unmatched_summary.iterrows(): lines.append(f"- {row['side']} / {row['cluster']}: `{int(row['count'])}`") (base_dir / "dragon_aux_signal_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()