| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- from __future__ import annotations
- from pathlib import Path
- import pandas as pd
- def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
- return pd.read_csv(base_dir / name, encoding="utf-8-sig")
- def _build_real_trade_lookup(strategy_events: pd.DataFrame, side: str) -> pd.DataFrame:
- real = strategy_events[(strategy_events["layer"] == "real_trade") & (strategy_events["side"] == side)].copy()
- real["dt"] = pd.to_datetime(real["date"])
- real = real.sort_values("dt")
- return real[["dt", "date", "reason", "c1"]]
- def _annotate_sell_cluster(row: pd.Series) -> str:
- reason = str(row["reason"])
- a1 = float(row["a1"])
- b1 = float(row["b1"])
- c1 = float(row["c1"])
- days_from_prev_real_sell = row["days_from_prev_real_sell"]
- ql_sell = bool(row["ql_sell"])
- kdj_sell = bool(row["kdj_sell"])
- if reason.endswith("state_crash_followthrough"):
- return "crash_followthrough"
- if pd.notna(days_from_prev_real_sell) and 0 < float(days_from_prev_real_sell) <= 10:
- return "post_exit_confirmation"
- if c1 > 80 and (ql_sell or kdj_sell):
- return "high_zone_warning"
- if b1 < -0.12 and (ql_sell or kdj_sell):
- return "strong_break_warning"
- if ql_sell and c1 < 20 and a1 < -0.02:
- return "deep_oversold_followthrough"
- return "other_bearish_followthrough"
- def _attach_cycle_context(aux_sell: pd.DataFrame, real_buy: pd.DataFrame, real_sell: pd.DataFrame) -> pd.DataFrame:
- aux_sell = aux_sell.copy()
- aux_sell["dt"] = pd.to_datetime(aux_sell["date"])
- sell_dates = real_sell["dt"].tolist()
- buy_dates = real_buy["dt"].tolist()
- prev_real_sell_dates: list[str | None] = []
- days_from_prev_real_sell: list[int | None] = []
- next_real_buy_dates: list[str | None] = []
- days_to_next_real_buy: list[int | None] = []
- for dt in aux_sell["dt"]:
- earlier_sells = [x for x in sell_dates if x < dt]
- later_buys = [x for x in buy_dates if x > dt]
- if earlier_sells:
- prev_sell = earlier_sells[-1]
- prev_real_sell_dates.append(prev_sell.date().isoformat())
- days_from_prev_real_sell.append((dt - prev_sell).days)
- else:
- prev_real_sell_dates.append(None)
- days_from_prev_real_sell.append(None)
- if later_buys:
- next_buy = later_buys[0]
- next_real_buy_dates.append(next_buy.date().isoformat())
- days_to_next_real_buy.append((next_buy - dt).days)
- else:
- next_real_buy_dates.append(None)
- days_to_next_real_buy.append(None)
- aux_sell["prev_real_sell_date"] = prev_real_sell_dates
- aux_sell["days_from_prev_real_sell"] = days_from_prev_real_sell
- aux_sell["next_real_buy_date"] = next_real_buy_dates
- aux_sell["days_to_next_real_buy"] = days_to_next_real_buy
- aux_sell["cycle_id"] = aux_sell.apply(
- lambda row: f"{row['prev_real_sell_date']} -> {row['next_real_buy_date']}"
- if pd.notna(row["prev_real_sell_date"]) and pd.notna(row["next_real_buy_date"])
- else "unbounded_cycle",
- axis=1,
- )
- return aux_sell
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- strategy_events = _load_csv(base_dir, "dragon_strategy_events.csv")
- workbook_layers = _load_csv(base_dir, "dragon_workbook_layers.csv")
- real_buy = _build_real_trade_lookup(strategy_events, "BUY")
- real_sell = _build_real_trade_lookup(strategy_events, "SELL")
- aux_sell = strategy_events[
- (strategy_events["layer"] == "aux_signal") & (strategy_events["side"] == "SELL")
- ].copy()
- aux_sell = aux_sell.sort_values("date").reset_index(drop=True)
- aux_sell = _attach_cycle_context(aux_sell, real_buy, real_sell)
- aux_sell["cluster"] = aux_sell.apply(_annotate_sell_cluster, axis=1)
- workbook_aux_sell = workbook_layers[
- (workbook_layers["layer"] == "aux_signal") & (workbook_layers["side"] == "SELL")
- ][["date", "signal_reason", "note"]].copy()
- workbook_aux_sell = workbook_aux_sell.rename(
- columns={"signal_reason": "workbook_signal_reason", "note": "workbook_note"}
- )
- aux_sell = aux_sell.merge(workbook_aux_sell, on="date", how="left")
- aux_sell["matched_workbook_aux"] = aux_sell["workbook_signal_reason"].notna()
- cycle_summary = (
- aux_sell.groupby(["cycle_id", "prev_real_sell_date", "next_real_buy_date"])
- .agg(
- aux_sell_count=("date", "count"),
- matched_count=("matched_workbook_aux", "sum"),
- post_exit_confirmation_count=("cluster", lambda s: int((s == "post_exit_confirmation").sum())),
- post_exit_confirmation_matched=(
- "matched_workbook_aux",
- lambda s: int(
- (
- aux_sell.loc[s.index, "matched_workbook_aux"]
- & (aux_sell.loc[s.index, "cluster"] == "post_exit_confirmation")
- ).sum()
- ),
- ),
- unique_clusters=("cluster", lambda s: " | ".join(sorted(set(s)))),
- cycle_dates=("date", lambda s: " | ".join(s)),
- )
- .reset_index()
- )
- matched_dates: list[str] = []
- unmatched_post_exit_dates: list[str] = []
- for _, row in cycle_summary.iterrows():
- cycle_rows = aux_sell[aux_sell["cycle_id"] == row["cycle_id"]]
- matched_cycle_dates = cycle_rows[cycle_rows["matched_workbook_aux"]]["date"].tolist()
- unmatched_post_exit_cycle_dates = cycle_rows[
- (cycle_rows["cluster"] == "post_exit_confirmation") & (~cycle_rows["matched_workbook_aux"])
- ]["date"].tolist()
- matched_dates.append(" | ".join(matched_cycle_dates))
- unmatched_post_exit_dates.append(" | ".join(unmatched_post_exit_cycle_dates))
- cycle_summary["matched_dates"] = matched_dates
- cycle_summary["unmatched_post_exit_dates"] = unmatched_post_exit_dates
- cycle_summary["compression_candidate"] = cycle_summary.apply(
- lambda row: bool(row["post_exit_confirmation_count"] >= 2 and row["post_exit_confirmation_matched"] == 0),
- axis=1,
- )
- detail_cols = [
- "date",
- "reason",
- "cluster",
- "matched_workbook_aux",
- "prev_real_sell_date",
- "days_from_prev_real_sell",
- "next_real_buy_date",
- "days_to_next_real_buy",
- "cycle_id",
- "a1",
- "b1",
- "c1",
- "kdj_sell",
- "ql_sell",
- "workbook_signal_reason",
- "workbook_note",
- ]
- aux_sell[detail_cols].to_csv(base_dir / "dragon_aux_sell_cycle_audit.csv", index=False, encoding="utf-8-sig")
- cycle_summary.sort_values(
- ["post_exit_confirmation_count", "aux_sell_count", "cycle_id"], ascending=[False, False, True]
- ).to_csv(base_dir / "dragon_aux_sell_cycle_summary.csv", index=False, encoding="utf-8-sig")
- lines = [
- "# Dragon Aux Sell Cycle Review",
- "",
- f"- Strategy aux SELL signals: `{len(aux_sell)}`",
- f"- Distinct sell cycles: `{cycle_summary['cycle_id'].nunique()}`",
- f"- Cycles with repeated `post_exit_confirmation` >= 2: `{int((cycle_summary['post_exit_confirmation_count'] >= 2).sum())}`",
- f"- Conservative compression candidates (repeated post-exit confirmations with zero workbook anchor): `{int(cycle_summary['compression_candidate'].sum())}`",
- "",
- "## Repeated Post-Exit Confirmation Cycles",
- ]
- repeated = cycle_summary[cycle_summary["post_exit_confirmation_count"] >= 2].sort_values(
- ["compression_candidate", "post_exit_confirmation_count", "aux_sell_count"],
- ascending=[False, False, False],
- )
- for _, row in repeated.iterrows():
- lines.append(
- f"- `{row['cycle_id']}` | aux SELL `{int(row['aux_sell_count'])}` | "
- f"`post_exit_confirmation={int(row['post_exit_confirmation_count'])}` | "
- f"matched `{int(row['matched_count'])}` | "
- f"candidate `{bool(row['compression_candidate'])}`"
- )
- if row["matched_dates"]:
- lines.append(f" matched dates: `{row['matched_dates']}`")
- if row["unmatched_post_exit_dates"]:
- lines.append(f" unmatched post-exit dates: `{row['unmatched_post_exit_dates']}`")
- lines.extend(["", "## Candidate Cycles To Review First"])
- candidates = repeated[repeated["compression_candidate"]].head(12)
- if candidates.empty:
- lines.append("- None. Current repeated cycles all contain workbook anchors or are singletons.")
- else:
- for _, row in candidates.iterrows():
- lines.append(
- f"- `{row['cycle_id']}` -> unmatched repeated post-exit dates `{row['unmatched_post_exit_dates']}`"
- )
- lines.extend(["", "## Protected Cycles With Workbook Anchors"])
- protected = repeated[~repeated["compression_candidate"]].head(12)
- if protected.empty:
- lines.append("- None.")
- else:
- for _, row in protected.iterrows():
- lines.append(
- f"- `{row['cycle_id']}` -> matched dates `{row['matched_dates']}`; unmatched post-exit dates `{row['unmatched_post_exit_dates']}`"
- )
- (base_dir / "dragon_aux_sell_cycle_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|