from __future__ import annotations from pathlib import Path import pandas as pd BUY_REASON_STATE = { "deep_oversold_rebound_buy": "low_oversold_regime", "oversold_recovery_buy": "low_oversold_regime", "oversold_reversal_after_ql_buy": "rebound_after_sell_regime", "post_sell_rebound_buy": "rebound_after_sell_regime", "post_washout_kdj_reentry_buy": "rebound_after_sell_regime", "predictive_error_reentry_buy": "rebound_after_sell_regime", "hot_exit_reentry_buy": "rebound_after_sell_regime", "early_crash_probe_buy": "crash_probe_regime", "dual_gold_resonance_buy": "low_oversold_regime", "glued_buy": "mid_regime", "non_glued_positive_expansion_buy": "high_regime", } BUY_REASON_QUALIFICATION = { "deep_oversold_rebound_buy": "oversold_reversal_entry", "oversold_recovery_buy": "oversold_reversal_entry", "oversold_reversal_after_ql_buy": "oversold_reversal_entry", "post_sell_rebound_buy": "rebound_reentry", "post_washout_kdj_reentry_buy": "workbook_special_restart", "predictive_error_reentry_buy": "bridge_reentry", "hot_exit_reentry_buy": "bridge_reentry", "early_crash_probe_buy": "crash_probe_entry", "dual_gold_resonance_buy": "dual_gold_entry", "glued_buy": "glued_base_entry", "non_glued_positive_expansion_buy": "momentum_expansion_entry", } SELL_REASON_MANAGEMENT = { "crash_protection_exit": "predictive_risk_exit", "predictive_b1_break_exit": "predictive_risk_exit", "prewarning_reduction_exit": "prewarning_exit", "high_regime_momentum_break": "prewarning_exit", "high_regime_confirmed_exit:kdj_sell": "confirmed_trend_exit", "ql_high_zone_take_profit": "high_regime_take_profit", "ql_mid_zone_take_profit": "high_regime_take_profit", "medium_hot_take_profit": "high_regime_take_profit", "high_zone_post_ql_fade_exit": "ql_followthrough_exit", "post_ql_decay_exit": "ql_followthrough_exit", "post_dual_sell_decay_exit": "ql_followthrough_exit", "knife_take_profit_1": "first_take_profit", "knife_take_profit_2_glued": "first_take_profit", "knife_take_profit_2_wait_ql_s": "first_take_profit", "early_positive_take_profit": "first_take_profit", "oversold_rebound_take_profit": "first_take_profit", "glued_exit:kdj_sell": "confirmed_trend_exit", "small_positive_a1_declining:ql_sell": "confirmed_trend_exit", "negative_a1_no_b1_recovery:kdj_sell": "negative_a1_exit", "negative_a1_no_b1_recovery:ql_sell": "negative_a1_exit", "negative_a1_b1_not_strong:kdj_sell": "negative_a1_exit", "low_zone_dual_gold_exit:kdj_sell": "negative_a1_exit", "hard_exit:kdj_sell": "hard_risk_exit", "hard_exit:ql_sell": "hard_risk_exit", "early_failed_rebound_exit": "predictive_risk_exit", } def _infer_state_layer(buy_reason: str, buy_c1: float) -> str: normalized_reason = buy_reason.split(":", 1)[0] state = BUY_REASON_STATE.get(normalized_reason) if state == "mid_regime": if buy_c1 < 20: return "low_oversold_regime" if buy_c1 >= 70: return "high_regime" return state or "mid_regime" def _infer_entry_layer(buy_reason: str) -> str: normalized_reason = buy_reason.split(":", 1)[0] return BUY_REASON_QUALIFICATION.get(normalized_reason, "base_entry") def _infer_management_layer(sell_reason: str) -> str: return SELL_REASON_MANAGEMENT.get(sell_reason, "default_exit_management") def _infer_aux_context(hold_aux_buy: pd.DataFrame, post_exit_aux_sell: pd.DataFrame) -> tuple[str, str]: if hold_aux_buy.empty and post_exit_aux_sell.empty: return "no_aux_signal", "" if not hold_aux_buy.empty and not post_exit_aux_sell.empty: aux_layer = "aux_buy_and_aux_sell" elif not hold_aux_buy.empty: aux_layer = "aux_buy_only" else: aux_layer = "aux_sell_only" detail_frames = [] if not hold_aux_buy.empty: detail_frames.append(hold_aux_buy) if not post_exit_aux_sell.empty: detail_frames.append(post_exit_aux_sell) aux_detail_df = pd.concat(detail_frames, ignore_index=True).sort_values("dt") detail = " | ".join( f"{row['date']}:{row['side']}:{row['reason']}" for _, row in aux_detail_df.iterrows() ) return aux_layer, detail def _build_taxonomy_markdown(base_dir: Path, path_df: pd.DataFrame) -> None: lines = [ "# Dragon Rule Taxonomy", "", "## Layer 1 Market State", "- `high_regime`: entries born in hot / high-C1 continuation or late-trend expansion states.", "- `mid_regime`: classic glued or middle-zone continuation entries.", "- `low_oversold_regime`: deep oversold, dual-gold reversal, low-C1 rebound, oversold recovery entries.", "- `rebound_after_sell_regime`: reentries after a prior sell, predictive error recovery, post-washout restart.", "- `crash_probe_regime`: early crash probe entries that intentionally test panic states.", "", "## Layer 2 Entry Qualification", ] qualification_map = ( path_df[["buy_reason", "entry_qualification_layer"]] .drop_duplicates() .sort_values(["entry_qualification_layer", "buy_reason"]) ) for _, row in qualification_map.iterrows(): lines.append(f"- `{row['buy_reason']}` -> `{row['entry_qualification_layer']}`") lines.extend(["", "## Layer 3 Position Management"]) management_map = ( path_df[["sell_reason", "position_management_layer"]] .drop_duplicates() .sort_values(["position_management_layer", "sell_reason"]) ) for _, row in management_map.iterrows(): lines.append(f"- `{row['sell_reason']}` -> `{row['position_management_layer']}`") lines.extend( [ "", "## Layer 4 Auxiliary Signal Context", "- `no_aux_signal`: no auxiliary confirmation inside the holding window.", "- `aux_buy_only`: only holding-period bullish confirmation appeared.", "- `aux_sell_only`: only post-exit or in-trade bearish confirmation appeared.", "- `aux_buy_and_aux_sell`: both auxiliary bullish and bearish signals appeared within the trade path.", "", "## Path Summary", ] ) path_summary = ( path_df.groupby( [ "market_state_layer", "entry_qualification_layer", "position_management_layer", "aux_context_layer", ] ) .size() .reset_index(name="count") .sort_values("count", ascending=False) ) for _, row in path_summary.head(30).iterrows(): lines.append( f"- `{row['market_state_layer']}` -> `{row['entry_qualification_layer']}` -> " f"`{row['position_management_layer']}` -> `{row['aux_context_layer']}`: `{int(row['count'])}`" ) (base_dir / "dragon_rule_taxonomy.md").write_text("\n".join(lines) + "\n", encoding="utf-8") def main() -> None: base_dir = Path(__file__).resolve().parent trades = pd.read_csv(base_dir / "dragon_strategy_trades.csv", encoding="utf-8-sig") events = pd.read_csv(base_dir / "dragon_strategy_events.csv", encoding="utf-8-sig") workbook = pd.read_csv(base_dir / "dragon_workbook_layers.csv", encoding="utf-8-sig") trades["buy_dt"] = pd.to_datetime(trades["buy_date"]) trades["sell_dt"] = pd.to_datetime(trades["sell_date"]) events["dt"] = pd.to_datetime(events["date"]) trades["next_buy_dt"] = trades["buy_dt"].shift(-1) trades["next_buy_date"] = trades["buy_date"].shift(-1) buy_events = ( events[(events["layer"] == "real_trade") & (events["side"] == "BUY")] .rename(columns={"date": "buy_date", "close": "buy_close", "a1": "buy_a1", "b1": "buy_b1", "c1": "buy_c1"}) [["buy_date", "buy_close", "buy_a1", "buy_b1", "buy_c1", "kdj_buy", "ql_buy"]] .copy() ) sell_events = ( events[(events["layer"] == "real_trade") & (events["side"] == "SELL")] .rename(columns={"date": "sell_date", "close": "sell_close", "a1": "sell_a1", "b1": "sell_b1", "c1": "sell_c1"}) [["sell_date", "sell_close", "sell_a1", "sell_b1", "sell_c1", "kdj_sell", "ql_sell"]] .copy() ) path_df = trades.merge(buy_events, on="buy_date", how="left").merge(sell_events, on="sell_date", how="left") path_df["market_state_layer"] = path_df.apply(lambda row: _infer_state_layer(str(row["buy_reason"]), float(row["buy_c1"])), axis=1) path_df["entry_qualification_layer"] = path_df["buy_reason"].map(_infer_entry_layer) path_df["position_management_layer"] = path_df["sell_reason"].map(_infer_management_layer) workbook_real_buy = set(workbook[(workbook["layer"] == "real_trade") & (workbook["side"] == "BUY")]["date"]) workbook_real_sell = set(workbook[(workbook["layer"] == "real_trade") & (workbook["side"] == "SELL")]["date"]) path_df["buy_aligned_with_workbook"] = path_df["buy_date"].isin(workbook_real_buy) path_df["sell_aligned_with_workbook"] = path_df["sell_date"].isin(workbook_real_sell) aux_layers: list[str] = [] aux_details: list[str] = [] aux_counts: list[int] = [] hold_aux_buy_counts: list[int] = [] hold_aux_buy_details: list[str] = [] post_exit_aux_sell_counts: list[int] = [] post_exit_aux_sell_details: list[str] = [] for _, trade in path_df.iterrows(): hold_aux_buy = events[ (events["layer"] == "aux_signal") & (events["side"] == "BUY") & (events["dt"] >= trade["buy_dt"]) & (events["dt"] <= trade["sell_dt"]) ].sort_values("dt") post_exit_mask = ( (events["layer"] == "aux_signal") & (events["side"] == "SELL") & (events["dt"] > trade["sell_dt"]) ) if pd.notna(trade["next_buy_dt"]): post_exit_mask = post_exit_mask & (events["dt"] < trade["next_buy_dt"]) post_exit_aux_sell = events[post_exit_mask].sort_values("dt") aux_layer, aux_detail = _infer_aux_context(hold_aux_buy, post_exit_aux_sell) aux_layers.append(aux_layer) aux_details.append(aux_detail) aux_counts.append(len(hold_aux_buy) + len(post_exit_aux_sell)) hold_aux_buy_counts.append(len(hold_aux_buy)) hold_aux_buy_details.append( " | ".join(f"{row['date']}:BUY:{row['reason']}" for _, row in hold_aux_buy.iterrows()) ) post_exit_aux_sell_counts.append(len(post_exit_aux_sell)) post_exit_aux_sell_details.append( " | ".join(f"{row['date']}:SELL:{row['reason']}" for _, row in post_exit_aux_sell.iterrows()) ) path_df["aux_context_layer"] = aux_layers path_df["aux_signal_count"] = aux_counts path_df["aux_signal_detail"] = aux_details path_df["hold_aux_buy_count"] = hold_aux_buy_counts path_df["hold_aux_buy_detail"] = hold_aux_buy_details path_df["post_exit_aux_sell_count"] = post_exit_aux_sell_counts path_df["post_exit_aux_sell_detail"] = post_exit_aux_sell_details path_df["layer_path"] = path_df.apply( lambda row: " > ".join( [ row["market_state_layer"], row["entry_qualification_layer"], row["position_management_layer"], row["aux_context_layer"], ] ), axis=1, ) output_cols = [ "buy_date", "sell_date", "holding_days", "return_pct", "buy_reason", "sell_reason", "buy_a1", "buy_b1", "buy_c1", "sell_a1", "sell_b1", "sell_c1", "market_state_layer", "entry_qualification_layer", "position_management_layer", "aux_context_layer", "aux_signal_count", "hold_aux_buy_count", "hold_aux_buy_detail", "post_exit_aux_sell_count", "post_exit_aux_sell_detail", "aux_signal_detail", "buy_aligned_with_workbook", "sell_aligned_with_workbook", "next_buy_date", "layer_path", ] path_df[output_cols].to_csv(base_dir / "dragon_trade_path_trace.csv", index=False, encoding="utf-8-sig") _build_taxonomy_markdown(base_dir, path_df) if __name__ == "__main__": main()