| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- from __future__ import annotations
- from pathlib import Path
- import pandas as pd
- def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
- return pd.read_csv(base_dir / name, encoding="utf-8-sig")
- def _annotate_buy_cluster(row: pd.Series) -> str:
- a1 = float(row["a1"])
- b1 = float(row["b1"])
- c1 = float(row["c1"])
- ql_buy = bool(row["ql_buy"])
- if ql_buy and a1 > 0.03 and b1 > 0.15:
- return "strong_dual_gold_reconfirm"
- if b1 > 0.24 and c1 > 45:
- return "early_strength_reconfirm"
- if a1 > 0.07 and c1 > 90:
- return "super_hot_trend_reconfirm"
- return "other_hold_reconfirm"
- def _annotate_sell_cluster(row: pd.Series) -> str:
- reason = str(row["reason"])
- a1 = float(row["a1"])
- b1 = float(row["b1"])
- c1 = float(row["c1"])
- days_from_prev_real_sell = row["days_from_prev_real_sell"]
- ql_sell = bool(row["ql_sell"])
- kdj_sell = bool(row["kdj_sell"])
- if reason.endswith("state_crash_followthrough"):
- return "crash_followthrough"
- if pd.notna(days_from_prev_real_sell) and 0 < float(days_from_prev_real_sell) <= 10:
- return "post_exit_confirmation"
- if c1 > 80 and (ql_sell or kdj_sell):
- return "high_zone_warning"
- if b1 < -0.12 and (ql_sell or kdj_sell):
- return "strong_break_warning"
- if ql_sell and c1 < 20 and a1 < -0.02:
- return "deep_oversold_followthrough"
- return "other_bearish_followthrough"
- def _build_real_trade_lookup(strategy_events: pd.DataFrame, side: str) -> pd.DataFrame:
- real = strategy_events[(strategy_events["layer"] == "real_trade") & (strategy_events["side"] == side)].copy()
- real["dt"] = pd.to_datetime(real["date"])
- real = real.sort_values("dt")
- return real[["dt", "date", "reason", "c1"]]
- def _attach_nearest_real_context(aux: pd.DataFrame, real_buy: pd.DataFrame, real_sell: pd.DataFrame) -> pd.DataFrame:
- aux = aux.copy()
- aux["dt"] = pd.to_datetime(aux["date"])
- prev_buy_date: list[str | None] = []
- days_from_prev_real_buy: list[int | None] = []
- prev_sell_date: list[str | None] = []
- days_from_prev_real_sell: list[int | None] = []
- buy_dates = real_buy["dt"].tolist()
- sell_dates = real_sell["dt"].tolist()
- for dt in aux["dt"]:
- earlier_buys = [x for x in buy_dates if x < dt]
- earlier_sells = [x for x in sell_dates if x < dt]
- if earlier_buys:
- prev_buy = earlier_buys[-1]
- prev_buy_date.append(prev_buy.date().isoformat())
- days_from_prev_real_buy.append((dt - prev_buy).days)
- else:
- prev_buy_date.append(None)
- days_from_prev_real_buy.append(None)
- if earlier_sells:
- prev_sell = earlier_sells[-1]
- prev_sell_date.append(prev_sell.date().isoformat())
- days_from_prev_real_sell.append((dt - prev_sell).days)
- else:
- prev_sell_date.append(None)
- days_from_prev_real_sell.append(None)
- aux["prev_real_buy_date"] = prev_buy_date
- aux["days_from_prev_real_buy"] = days_from_prev_real_buy
- aux["prev_real_sell_date"] = prev_sell_date
- aux["days_from_prev_real_sell"] = days_from_prev_real_sell
- return aux
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- strategy_events = _load_csv(base_dir, "dragon_strategy_events.csv")
- workbook_layers = _load_csv(base_dir, "dragon_workbook_layers.csv")
- aux = strategy_events[strategy_events["layer"] == "aux_signal"].copy()
- aux = aux.sort_values(["side", "date"]).reset_index(drop=True)
- workbook_aux = workbook_layers[workbook_layers["layer"] == "aux_signal"][
- ["date", "side", "signal_reason", "note"]
- ].copy()
- workbook_aux = workbook_aux.rename(columns={"signal_reason": "workbook_signal_reason", "note": "workbook_note"})
- real_buy = _build_real_trade_lookup(strategy_events, "BUY")
- real_sell = _build_real_trade_lookup(strategy_events, "SELL")
- aux = _attach_nearest_real_context(aux, real_buy, real_sell)
- aux = aux.merge(workbook_aux, on=["date", "side"], how="left")
- aux["matched_workbook_aux"] = aux["workbook_signal_reason"].notna()
- buy_mask = aux["side"] == "BUY"
- sell_mask = aux["side"] == "SELL"
- aux.loc[buy_mask, "cluster"] = aux.loc[buy_mask].apply(_annotate_buy_cluster, axis=1)
- aux.loc[sell_mask, "cluster"] = aux.loc[sell_mask].apply(_annotate_sell_cluster, axis=1)
- audit_cols = [
- "date",
- "side",
- "reason",
- "cluster",
- "matched_workbook_aux",
- "prev_real_buy_date",
- "days_from_prev_real_buy",
- "prev_real_sell_date",
- "days_from_prev_real_sell",
- "a1",
- "b1",
- "c1",
- "kdj_buy",
- "kdj_sell",
- "ql_buy",
- "ql_sell",
- "workbook_signal_reason",
- "workbook_note",
- ]
- aux[audit_cols].to_csv(base_dir / "dragon_aux_signal_audit.csv", index=False, encoding="utf-8-sig")
- lines = [
- "# Dragon Aux Signal Audit",
- "",
- f"- Strategy aux signals: `{len(aux)}`",
- f"- Aux BUY: `{int(buy_mask.sum())}`",
- f"- Aux SELL: `{int(sell_mask.sum())}`",
- f"- Workbook overlap: `{int(aux['matched_workbook_aux'].sum())}`",
- "",
- "## Cluster Summary",
- ]
- summary = (
- aux.groupby(["side", "cluster", "matched_workbook_aux"])
- .size()
- .reset_index(name="count")
- .sort_values(["side", "cluster", "matched_workbook_aux"])
- )
- for _, row in summary.iterrows():
- lines.append(
- f"- {row['side']} / {row['cluster']} / matched `{bool(row['matched_workbook_aux'])}`: `{int(row['count'])}`"
- )
- lines.extend(["", "## Workbook-Matched Aux Rows"])
- matched_rows = aux[aux["matched_workbook_aux"]].copy()
- for _, row in matched_rows.iterrows():
- lines.append(
- f"- {row['date']} {row['side']} `{row['reason']}` -> `{row['cluster']}` | a1 `{row['a1']:.4f}` b1 `{row['b1']:.4f}` c1 `{row['c1']:.2f}`"
- )
- lines.extend(["", "## Top Unmatched Buckets"])
- unmatched_summary = (
- aux[~aux["matched_workbook_aux"]]
- .groupby(["side", "cluster"])
- .size()
- .reset_index(name="count")
- .sort_values("count", ascending=False)
- .head(12)
- )
- for _, row in unmatched_summary.iterrows():
- lines.append(f"- {row['side']} / {row['cluster']}: `{int(row['count'])}`")
- (base_dir / "dragon_aux_signal_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|