| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- from __future__ import annotations
- from pathlib import Path
- import pandas as pd
- BUY_REASON_STATE = {
- "deep_oversold_rebound_buy": "low_oversold_regime",
- "oversold_recovery_buy": "low_oversold_regime",
- "oversold_reversal_after_ql_buy": "rebound_after_sell_regime",
- "post_sell_rebound_buy": "rebound_after_sell_regime",
- "post_washout_kdj_reentry_buy": "rebound_after_sell_regime",
- "predictive_error_reentry_buy": "rebound_after_sell_regime",
- "hot_exit_reentry_buy": "rebound_after_sell_regime",
- "early_crash_probe_buy": "crash_probe_regime",
- "dual_gold_resonance_buy": "low_oversold_regime",
- "glued_buy": "mid_regime",
- "non_glued_positive_expansion_buy": "high_regime",
- }
- BUY_REASON_QUALIFICATION = {
- "deep_oversold_rebound_buy": "oversold_reversal_entry",
- "oversold_recovery_buy": "oversold_reversal_entry",
- "oversold_reversal_after_ql_buy": "oversold_reversal_entry",
- "post_sell_rebound_buy": "rebound_reentry",
- "post_washout_kdj_reentry_buy": "workbook_special_restart",
- "predictive_error_reentry_buy": "bridge_reentry",
- "hot_exit_reentry_buy": "bridge_reentry",
- "early_crash_probe_buy": "crash_probe_entry",
- "dual_gold_resonance_buy": "dual_gold_entry",
- "glued_buy": "glued_base_entry",
- "non_glued_positive_expansion_buy": "momentum_expansion_entry",
- }
- SELL_REASON_MANAGEMENT = {
- "crash_protection_exit": "predictive_risk_exit",
- "predictive_b1_break_exit": "predictive_risk_exit",
- "prewarning_reduction_exit": "prewarning_exit",
- "high_regime_momentum_break": "prewarning_exit",
- "high_regime_confirmed_exit:kdj_sell": "confirmed_trend_exit",
- "ql_high_zone_take_profit": "high_regime_take_profit",
- "ql_mid_zone_take_profit": "high_regime_take_profit",
- "medium_hot_take_profit": "high_regime_take_profit",
- "high_zone_post_ql_fade_exit": "ql_followthrough_exit",
- "post_ql_decay_exit": "ql_followthrough_exit",
- "post_dual_sell_decay_exit": "ql_followthrough_exit",
- "knife_take_profit_1": "first_take_profit",
- "knife_take_profit_2_glued": "first_take_profit",
- "knife_take_profit_2_wait_ql_s": "first_take_profit",
- "early_positive_take_profit": "first_take_profit",
- "oversold_rebound_take_profit": "first_take_profit",
- "glued_exit:kdj_sell": "confirmed_trend_exit",
- "small_positive_a1_declining:ql_sell": "confirmed_trend_exit",
- "negative_a1_no_b1_recovery:kdj_sell": "negative_a1_exit",
- "negative_a1_no_b1_recovery:ql_sell": "negative_a1_exit",
- "negative_a1_b1_not_strong:kdj_sell": "negative_a1_exit",
- "low_zone_dual_gold_exit:kdj_sell": "negative_a1_exit",
- "hard_exit:kdj_sell": "hard_risk_exit",
- "hard_exit:ql_sell": "hard_risk_exit",
- "early_failed_rebound_exit": "predictive_risk_exit",
- }
- def _infer_state_layer(buy_reason: str, buy_c1: float) -> str:
- normalized_reason = buy_reason.split(":", 1)[0]
- state = BUY_REASON_STATE.get(normalized_reason)
- if state == "mid_regime":
- if buy_c1 < 20:
- return "low_oversold_regime"
- if buy_c1 >= 70:
- return "high_regime"
- return state or "mid_regime"
- def _infer_entry_layer(buy_reason: str) -> str:
- normalized_reason = buy_reason.split(":", 1)[0]
- return BUY_REASON_QUALIFICATION.get(normalized_reason, "base_entry")
- def _infer_management_layer(sell_reason: str) -> str:
- return SELL_REASON_MANAGEMENT.get(sell_reason, "default_exit_management")
- def _infer_aux_context(hold_aux_buy: pd.DataFrame, post_exit_aux_sell: pd.DataFrame) -> tuple[str, str]:
- if hold_aux_buy.empty and post_exit_aux_sell.empty:
- return "no_aux_signal", ""
- if not hold_aux_buy.empty and not post_exit_aux_sell.empty:
- aux_layer = "aux_buy_and_aux_sell"
- elif not hold_aux_buy.empty:
- aux_layer = "aux_buy_only"
- else:
- aux_layer = "aux_sell_only"
- detail_frames = []
- if not hold_aux_buy.empty:
- detail_frames.append(hold_aux_buy)
- if not post_exit_aux_sell.empty:
- detail_frames.append(post_exit_aux_sell)
- aux_detail_df = pd.concat(detail_frames, ignore_index=True).sort_values("dt")
- detail = " | ".join(
- f"{row['date']}:{row['side']}:{row['reason']}" for _, row in aux_detail_df.iterrows()
- )
- return aux_layer, detail
- def _build_taxonomy_markdown(base_dir: Path, path_df: pd.DataFrame) -> None:
- lines = [
- "# Dragon Rule Taxonomy",
- "",
- "## Layer 1 Market State",
- "- `high_regime`: entries born in hot / high-C1 continuation or late-trend expansion states.",
- "- `mid_regime`: classic glued or middle-zone continuation entries.",
- "- `low_oversold_regime`: deep oversold, dual-gold reversal, low-C1 rebound, oversold recovery entries.",
- "- `rebound_after_sell_regime`: reentries after a prior sell, predictive error recovery, post-washout restart.",
- "- `crash_probe_regime`: early crash probe entries that intentionally test panic states.",
- "",
- "## Layer 2 Entry Qualification",
- ]
- qualification_map = (
- path_df[["buy_reason", "entry_qualification_layer"]]
- .drop_duplicates()
- .sort_values(["entry_qualification_layer", "buy_reason"])
- )
- for _, row in qualification_map.iterrows():
- lines.append(f"- `{row['buy_reason']}` -> `{row['entry_qualification_layer']}`")
- lines.extend(["", "## Layer 3 Position Management"])
- management_map = (
- path_df[["sell_reason", "position_management_layer"]]
- .drop_duplicates()
- .sort_values(["position_management_layer", "sell_reason"])
- )
- for _, row in management_map.iterrows():
- lines.append(f"- `{row['sell_reason']}` -> `{row['position_management_layer']}`")
- lines.extend(
- [
- "",
- "## Layer 4 Auxiliary Signal Context",
- "- `no_aux_signal`: no auxiliary confirmation inside the holding window.",
- "- `aux_buy_only`: only holding-period bullish confirmation appeared.",
- "- `aux_sell_only`: only post-exit or in-trade bearish confirmation appeared.",
- "- `aux_buy_and_aux_sell`: both auxiliary bullish and bearish signals appeared within the trade path.",
- "",
- "## Path Summary",
- ]
- )
- path_summary = (
- path_df.groupby(
- [
- "market_state_layer",
- "entry_qualification_layer",
- "position_management_layer",
- "aux_context_layer",
- ]
- )
- .size()
- .reset_index(name="count")
- .sort_values("count", ascending=False)
- )
- for _, row in path_summary.head(30).iterrows():
- lines.append(
- f"- `{row['market_state_layer']}` -> `{row['entry_qualification_layer']}` -> "
- f"`{row['position_management_layer']}` -> `{row['aux_context_layer']}`: `{int(row['count'])}`"
- )
- (base_dir / "dragon_rule_taxonomy.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- trades = pd.read_csv(base_dir / "dragon_strategy_trades.csv", encoding="utf-8-sig")
- events = pd.read_csv(base_dir / "dragon_strategy_events.csv", encoding="utf-8-sig")
- workbook = pd.read_csv(base_dir / "dragon_workbook_layers.csv", encoding="utf-8-sig")
- trades["buy_dt"] = pd.to_datetime(trades["buy_date"])
- trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
- events["dt"] = pd.to_datetime(events["date"])
- trades["next_buy_dt"] = trades["buy_dt"].shift(-1)
- trades["next_buy_date"] = trades["buy_date"].shift(-1)
- buy_events = (
- events[(events["layer"] == "real_trade") & (events["side"] == "BUY")]
- .rename(columns={"date": "buy_date", "close": "buy_close", "a1": "buy_a1", "b1": "buy_b1", "c1": "buy_c1"})
- [["buy_date", "buy_close", "buy_a1", "buy_b1", "buy_c1", "kdj_buy", "ql_buy"]]
- .copy()
- )
- sell_events = (
- events[(events["layer"] == "real_trade") & (events["side"] == "SELL")]
- .rename(columns={"date": "sell_date", "close": "sell_close", "a1": "sell_a1", "b1": "sell_b1", "c1": "sell_c1"})
- [["sell_date", "sell_close", "sell_a1", "sell_b1", "sell_c1", "kdj_sell", "ql_sell"]]
- .copy()
- )
- path_df = trades.merge(buy_events, on="buy_date", how="left").merge(sell_events, on="sell_date", how="left")
- path_df["market_state_layer"] = path_df.apply(lambda row: _infer_state_layer(str(row["buy_reason"]), float(row["buy_c1"])), axis=1)
- path_df["entry_qualification_layer"] = path_df["buy_reason"].map(_infer_entry_layer)
- path_df["position_management_layer"] = path_df["sell_reason"].map(_infer_management_layer)
- workbook_real_buy = set(workbook[(workbook["layer"] == "real_trade") & (workbook["side"] == "BUY")]["date"])
- workbook_real_sell = set(workbook[(workbook["layer"] == "real_trade") & (workbook["side"] == "SELL")]["date"])
- path_df["buy_aligned_with_workbook"] = path_df["buy_date"].isin(workbook_real_buy)
- path_df["sell_aligned_with_workbook"] = path_df["sell_date"].isin(workbook_real_sell)
- aux_layers: list[str] = []
- aux_details: list[str] = []
- aux_counts: list[int] = []
- hold_aux_buy_counts: list[int] = []
- hold_aux_buy_details: list[str] = []
- post_exit_aux_sell_counts: list[int] = []
- post_exit_aux_sell_details: list[str] = []
- for _, trade in path_df.iterrows():
- hold_aux_buy = events[
- (events["layer"] == "aux_signal")
- & (events["side"] == "BUY")
- & (events["dt"] >= trade["buy_dt"])
- & (events["dt"] <= trade["sell_dt"])
- ].sort_values("dt")
- post_exit_mask = (
- (events["layer"] == "aux_signal")
- & (events["side"] == "SELL")
- & (events["dt"] > trade["sell_dt"])
- )
- if pd.notna(trade["next_buy_dt"]):
- post_exit_mask = post_exit_mask & (events["dt"] < trade["next_buy_dt"])
- post_exit_aux_sell = events[post_exit_mask].sort_values("dt")
- aux_layer, aux_detail = _infer_aux_context(hold_aux_buy, post_exit_aux_sell)
- aux_layers.append(aux_layer)
- aux_details.append(aux_detail)
- aux_counts.append(len(hold_aux_buy) + len(post_exit_aux_sell))
- hold_aux_buy_counts.append(len(hold_aux_buy))
- hold_aux_buy_details.append(
- " | ".join(f"{row['date']}:BUY:{row['reason']}" for _, row in hold_aux_buy.iterrows())
- )
- post_exit_aux_sell_counts.append(len(post_exit_aux_sell))
- post_exit_aux_sell_details.append(
- " | ".join(f"{row['date']}:SELL:{row['reason']}" for _, row in post_exit_aux_sell.iterrows())
- )
- path_df["aux_context_layer"] = aux_layers
- path_df["aux_signal_count"] = aux_counts
- path_df["aux_signal_detail"] = aux_details
- path_df["hold_aux_buy_count"] = hold_aux_buy_counts
- path_df["hold_aux_buy_detail"] = hold_aux_buy_details
- path_df["post_exit_aux_sell_count"] = post_exit_aux_sell_counts
- path_df["post_exit_aux_sell_detail"] = post_exit_aux_sell_details
- path_df["layer_path"] = path_df.apply(
- lambda row: " > ".join(
- [
- row["market_state_layer"],
- row["entry_qualification_layer"],
- row["position_management_layer"],
- row["aux_context_layer"],
- ]
- ),
- axis=1,
- )
- output_cols = [
- "buy_date",
- "sell_date",
- "holding_days",
- "return_pct",
- "buy_reason",
- "sell_reason",
- "buy_a1",
- "buy_b1",
- "buy_c1",
- "sell_a1",
- "sell_b1",
- "sell_c1",
- "market_state_layer",
- "entry_qualification_layer",
- "position_management_layer",
- "aux_context_layer",
- "aux_signal_count",
- "hold_aux_buy_count",
- "hold_aux_buy_detail",
- "post_exit_aux_sell_count",
- "post_exit_aux_sell_detail",
- "aux_signal_detail",
- "buy_aligned_with_workbook",
- "sell_aligned_with_workbook",
- "next_buy_date",
- "layer_path",
- ]
- path_df[output_cols].to_csv(base_dir / "dragon_trade_path_trace.csv", index=False, encoding="utf-8-sig")
- _build_taxonomy_markdown(base_dir, path_df)
- if __name__ == "__main__":
- main()
|