from __future__ import annotations from pathlib import Path import pandas as pd def _load_csv(base_dir: Path, name: str) -> pd.DataFrame: return pd.read_csv(base_dir / name, encoding="utf-8-sig") def _build_real_trade_lookup(strategy_events: pd.DataFrame, side: str) -> pd.DataFrame: real = strategy_events[(strategy_events["layer"] == "real_trade") & (strategy_events["side"] == side)].copy() real["dt"] = pd.to_datetime(real["date"]) real = real.sort_values("dt") return real[["dt", "date", "reason", "c1"]] def _annotate_sell_cluster(row: pd.Series) -> str: reason = str(row["reason"]) a1 = float(row["a1"]) b1 = float(row["b1"]) c1 = float(row["c1"]) days_from_prev_real_sell = row["days_from_prev_real_sell"] ql_sell = bool(row["ql_sell"]) kdj_sell = bool(row["kdj_sell"]) if reason.endswith("state_crash_followthrough"): return "crash_followthrough" if pd.notna(days_from_prev_real_sell) and 0 < float(days_from_prev_real_sell) <= 10: return "post_exit_confirmation" if c1 > 80 and (ql_sell or kdj_sell): return "high_zone_warning" if b1 < -0.12 and (ql_sell or kdj_sell): return "strong_break_warning" if ql_sell and c1 < 20 and a1 < -0.02: return "deep_oversold_followthrough" return "other_bearish_followthrough" def _attach_cycle_context(aux_sell: pd.DataFrame, real_buy: pd.DataFrame, real_sell: pd.DataFrame) -> pd.DataFrame: aux_sell = aux_sell.copy() aux_sell["dt"] = pd.to_datetime(aux_sell["date"]) sell_dates = real_sell["dt"].tolist() buy_dates = real_buy["dt"].tolist() prev_real_sell_dates: list[str | None] = [] days_from_prev_real_sell: list[int | None] = [] next_real_buy_dates: list[str | None] = [] days_to_next_real_buy: list[int | None] = [] for dt in aux_sell["dt"]: earlier_sells = [x for x in sell_dates if x < dt] later_buys = [x for x in buy_dates if x > dt] if earlier_sells: prev_sell = earlier_sells[-1] prev_real_sell_dates.append(prev_sell.date().isoformat()) days_from_prev_real_sell.append((dt - prev_sell).days) else: prev_real_sell_dates.append(None) days_from_prev_real_sell.append(None) if later_buys: next_buy = later_buys[0] next_real_buy_dates.append(next_buy.date().isoformat()) days_to_next_real_buy.append((next_buy - dt).days) else: next_real_buy_dates.append(None) days_to_next_real_buy.append(None) aux_sell["prev_real_sell_date"] = prev_real_sell_dates aux_sell["days_from_prev_real_sell"] = days_from_prev_real_sell aux_sell["next_real_buy_date"] = next_real_buy_dates aux_sell["days_to_next_real_buy"] = days_to_next_real_buy aux_sell["cycle_id"] = aux_sell.apply( lambda row: f"{row['prev_real_sell_date']} -> {row['next_real_buy_date']}" if pd.notna(row["prev_real_sell_date"]) and pd.notna(row["next_real_buy_date"]) else "unbounded_cycle", axis=1, ) return aux_sell def main() -> None: base_dir = Path(__file__).resolve().parent strategy_events = _load_csv(base_dir, "dragon_strategy_events.csv") workbook_layers = _load_csv(base_dir, "dragon_workbook_layers.csv") real_buy = _build_real_trade_lookup(strategy_events, "BUY") real_sell = _build_real_trade_lookup(strategy_events, "SELL") aux_sell = strategy_events[ (strategy_events["layer"] == "aux_signal") & (strategy_events["side"] == "SELL") ].copy() aux_sell = aux_sell.sort_values("date").reset_index(drop=True) aux_sell = _attach_cycle_context(aux_sell, real_buy, real_sell) aux_sell["cluster"] = aux_sell.apply(_annotate_sell_cluster, axis=1) workbook_aux_sell = workbook_layers[ (workbook_layers["layer"] == "aux_signal") & (workbook_layers["side"] == "SELL") ][["date", "signal_reason", "note"]].copy() workbook_aux_sell = workbook_aux_sell.rename( columns={"signal_reason": "workbook_signal_reason", "note": "workbook_note"} ) aux_sell = aux_sell.merge(workbook_aux_sell, on="date", how="left") aux_sell["matched_workbook_aux"] = aux_sell["workbook_signal_reason"].notna() cycle_summary = ( aux_sell.groupby(["cycle_id", "prev_real_sell_date", "next_real_buy_date"]) .agg( aux_sell_count=("date", "count"), matched_count=("matched_workbook_aux", "sum"), post_exit_confirmation_count=("cluster", lambda s: int((s == "post_exit_confirmation").sum())), post_exit_confirmation_matched=( "matched_workbook_aux", lambda s: int( ( aux_sell.loc[s.index, "matched_workbook_aux"] & (aux_sell.loc[s.index, "cluster"] == "post_exit_confirmation") ).sum() ), ), unique_clusters=("cluster", lambda s: " | ".join(sorted(set(s)))), cycle_dates=("date", lambda s: " | ".join(s)), ) .reset_index() ) matched_dates: list[str] = [] unmatched_post_exit_dates: list[str] = [] for _, row in cycle_summary.iterrows(): cycle_rows = aux_sell[aux_sell["cycle_id"] == row["cycle_id"]] matched_cycle_dates = cycle_rows[cycle_rows["matched_workbook_aux"]]["date"].tolist() unmatched_post_exit_cycle_dates = cycle_rows[ (cycle_rows["cluster"] == "post_exit_confirmation") & (~cycle_rows["matched_workbook_aux"]) ]["date"].tolist() matched_dates.append(" | ".join(matched_cycle_dates)) unmatched_post_exit_dates.append(" | ".join(unmatched_post_exit_cycle_dates)) cycle_summary["matched_dates"] = matched_dates cycle_summary["unmatched_post_exit_dates"] = unmatched_post_exit_dates cycle_summary["compression_candidate"] = cycle_summary.apply( lambda row: bool(row["post_exit_confirmation_count"] >= 2 and row["post_exit_confirmation_matched"] == 0), axis=1, ) detail_cols = [ "date", "reason", "cluster", "matched_workbook_aux", "prev_real_sell_date", "days_from_prev_real_sell", "next_real_buy_date", "days_to_next_real_buy", "cycle_id", "a1", "b1", "c1", "kdj_sell", "ql_sell", "workbook_signal_reason", "workbook_note", ] aux_sell[detail_cols].to_csv(base_dir / "dragon_aux_sell_cycle_audit.csv", index=False, encoding="utf-8-sig") cycle_summary.sort_values( ["post_exit_confirmation_count", "aux_sell_count", "cycle_id"], ascending=[False, False, True] ).to_csv(base_dir / "dragon_aux_sell_cycle_summary.csv", index=False, encoding="utf-8-sig") lines = [ "# Dragon Aux Sell Cycle Review", "", f"- Strategy aux SELL signals: `{len(aux_sell)}`", f"- Distinct sell cycles: `{cycle_summary['cycle_id'].nunique()}`", f"- Cycles with repeated `post_exit_confirmation` >= 2: `{int((cycle_summary['post_exit_confirmation_count'] >= 2).sum())}`", f"- Conservative compression candidates (repeated post-exit confirmations with zero workbook anchor): `{int(cycle_summary['compression_candidate'].sum())}`", "", "## Repeated Post-Exit Confirmation Cycles", ] repeated = cycle_summary[cycle_summary["post_exit_confirmation_count"] >= 2].sort_values( ["compression_candidate", "post_exit_confirmation_count", "aux_sell_count"], ascending=[False, False, False], ) for _, row in repeated.iterrows(): lines.append( f"- `{row['cycle_id']}` | aux SELL `{int(row['aux_sell_count'])}` | " f"`post_exit_confirmation={int(row['post_exit_confirmation_count'])}` | " f"matched `{int(row['matched_count'])}` | " f"candidate `{bool(row['compression_candidate'])}`" ) if row["matched_dates"]: lines.append(f" matched dates: `{row['matched_dates']}`") if row["unmatched_post_exit_dates"]: lines.append(f" unmatched post-exit dates: `{row['unmatched_post_exit_dates']}`") lines.extend(["", "## Candidate Cycles To Review First"]) candidates = repeated[repeated["compression_candidate"]].head(12) if candidates.empty: lines.append("- None. Current repeated cycles all contain workbook anchors or are singletons.") else: for _, row in candidates.iterrows(): lines.append( f"- `{row['cycle_id']}` -> unmatched repeated post-exit dates `{row['unmatched_post_exit_dates']}`" ) lines.extend(["", "## Protected Cycles With Workbook Anchors"]) protected = repeated[~repeated["compression_candidate"]].head(12) if protected.empty: lines.append("- None.") else: for _, row in protected.iterrows(): lines.append( f"- `{row['cycle_id']}` -> matched dates `{row['matched_dates']}`; unmatched post-exit dates `{row['unmatched_post_exit_dates']}`" ) (base_dir / "dragon_aux_sell_cycle_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()