| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- from __future__ import annotations
- from dataclasses import dataclass
- from pathlib import Path
- import pandas as pd
- from dragon_branch_configs import (
- alpha_first_glued_followthrough_probe_config,
- alpha_first_glued_refined_hot_cap_config,
- )
- from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine
- from dragon_shared import END_DATE, START_DATE, format_pct as _format_pct
- from dragon_strategy import DragonRuleEngine
- from dragon_strategy_config import StrategyConfig
- REENTRY_REASON_PREFIX = "glued_followthrough_reentry_buy:confirmed_"
- OUTPUT_CANDIDATES = "dragon_followthrough_profit_loop_candidates.csv"
- OUTPUT_CANDIDATE_SUMMARY = "dragon_followthrough_profit_loop_candidate_summary.csv"
- OUTPUT_REENTRIES = "dragon_followthrough_profit_loop_reentries.csv"
- OUTPUT_REENTRY_SUMMARY = "dragon_followthrough_profit_loop_reentry_summary.csv"
- OUTPUT_REVIEW = "dragon_followthrough_profit_loop_review.md"
- @dataclass(frozen=True)
- class ProbeDefinition:
- name: str
- label: str
- allowed_subtype: str
- config: StrategyConfig
- def build_followthrough_probe_configs() -> dict[str, ProbeDefinition]:
- base = alpha_first_glued_refined_hot_cap_config()
- return {
- "probe_mid_zone": ProbeDefinition(
- name="probe_mid_zone",
- label="pending only for mid_zone_very_weak_b1",
- allowed_subtype="mid_zone_very_weak_b1",
- config=alpha_first_glued_followthrough_probe_config(),
- ),
- "probe_high_zone": ProbeDefinition(
- name="probe_high_zone",
- label="pending only for high_zone_weak_b1",
- allowed_subtype="high_zone_weak_b1",
- config=base.with_updates(
- glued_followthrough_pending_enabled=True,
- glued_followthrough_allow_mid_zone_very_weak_b1=False,
- glued_followthrough_allow_high_zone_weak_b1=True,
- glued_followthrough_allow_ql_rebound_weak_followthrough=False,
- ),
- ),
- "probe_ql_rebound": ProbeDefinition(
- name="probe_ql_rebound",
- label="pending only for ql_rebound_weak_followthrough",
- allowed_subtype="ql_rebound_weak_followthrough",
- config=base.with_updates(
- glued_followthrough_pending_enabled=True,
- glued_followthrough_allow_mid_zone_very_weak_b1=False,
- glued_followthrough_allow_high_zone_weak_b1=False,
- glued_followthrough_allow_ql_rebound_weak_followthrough=True,
- ),
- ),
- }
- def prepare_indicator_history(df: pd.DataFrame) -> pd.DataFrame:
- history = df.copy()
- if "date" in history.columns and "date" in list(history.index.names):
- history = history.reset_index(drop=True)
- if "date" not in history.columns:
- history = history.reset_index()
- if "date" not in history.columns:
- history = history.rename(columns={history.columns[0]: "date"})
- history["date"] = pd.to_datetime(history["date"])
- history = history.sort_values("date").reset_index(drop=True)
- for column in ["open", "high", "low", "close", "a1", "b1", "c1"]:
- if column in history.columns:
- history[column] = history[column].astype(float)
- for column in ["kdj_buy", "kdj_sell", "ql_buy", "ql_sell"]:
- if column in history.columns:
- history[column] = history[column].astype(bool)
- history["ma20"] = history["close"].rolling(20).mean()
- history["ma60"] = history["close"].rolling(60).mean()
- history["regime"] = history.apply(_classify_regime, axis=1)
- return history.set_index("date", drop=False)
- def collect_glued_block_candidates(
- df: pd.DataFrame,
- config: StrategyConfig,
- *,
- branch_name: str = "base_rc1",
- ) -> pd.DataFrame:
- indexed = prepare_indicator_history(df)
- blocked, _ = _collect_branch_diagnostics(indexed, config, branch_name)
- return blocked
- def collect_followthrough_reentries(
- df: pd.DataFrame,
- config: StrategyConfig,
- *,
- branch_name: str = "probe",
- ) -> pd.DataFrame:
- indexed = prepare_indicator_history(df)
- _, reentries = _collect_branch_diagnostics(indexed, config, branch_name)
- return reentries
- def _classify_regime(row: pd.Series) -> str:
- close = float(row["close"])
- ma20 = float(row["ma20"]) if pd.notna(row["ma20"]) else float("nan")
- ma60 = float(row["ma60"]) if pd.notna(row["ma60"]) else float("nan")
- if pd.isna(ma20) or pd.isna(ma60):
- return "unknown"
- if close >= ma20 >= ma60:
- return "uptrend"
- if close <= ma20 <= ma60:
- return "downtrend"
- return "range"
- def _in_release_window(value: str) -> bool:
- return START_DATE <= value <= END_DATE
- def _collect_branch_diagnostics(
- indexed: pd.DataFrame,
- config: StrategyConfig,
- branch_name: str,
- ) -> tuple[pd.DataFrame, pd.DataFrame]:
- engine = DragonRuleEngine(config=config)
- blocked_rows: list[dict[str, object]] = []
- reentry_rows: list[dict[str, object]] = []
- prev_row = None
- for _, row in indexed.iterrows():
- engine._record_cross_counters(row)
- engine._update_position_counters(row)
- engine._update_pending_states(row)
- just_bought = False
- pending_snapshot = {
- "origin_date": (
- engine.context.pending_glued_followthrough_origin_date.isoformat()
- if engine.context.pending_glued_followthrough_origin_date
- else ""
- ),
- "subtype": engine.context.pending_glued_followthrough_subtype,
- "bars_waited": int(engine.context.pending_glued_followthrough_bars_waited),
- "signal_close": float(engine.context.pending_glued_followthrough_signal_close or 0.0),
- "signal_a1": float(engine.context.pending_glued_followthrough_a1 or 0.0),
- "signal_b1": float(engine.context.pending_glued_followthrough_b1 or 0.0),
- "signal_c1": float(engine.context.pending_glued_followthrough_c1 or 0.0),
- }
- if (not engine.context.in_position) or bool(row["kdj_buy"] or row["ql_buy"]):
- action, reason = engine._buy_decision(row, prev_row)
- if action == "BLOCK" and reason == "buy_block_glued_high_weak_rebound":
- subtype = engine._glued_high_weak_rebound_subtype(row)
- blocked_rows.append(
- {
- "branch": branch_name,
- "signal_date": row.name.date().isoformat(),
- "subtype": subtype,
- "signal_close": float(row["close"]),
- "signal_a1": float(row["a1"]),
- "signal_b1": float(row["b1"]),
- "signal_c1": float(row["c1"]),
- "signal_kdj_buy": bool(row["kdj_buy"]),
- "signal_ql_buy": bool(row["ql_buy"]),
- "in_release_window": _in_release_window(row.name.date().isoformat()),
- }
- )
- elif action == "BUY":
- if reason.startswith(REENTRY_REASON_PREFIX):
- reentry_rows.append(
- {
- "branch": branch_name,
- "buy_date": row.name.date().isoformat(),
- "buy_reason": reason,
- "origin_date": pending_snapshot["origin_date"],
- "subtype": pending_snapshot["subtype"],
- "bars_waited": pending_snapshot["bars_waited"],
- "signal_close": pending_snapshot["signal_close"],
- "signal_a1": pending_snapshot["signal_a1"],
- "signal_b1": pending_snapshot["signal_b1"],
- "signal_c1": pending_snapshot["signal_c1"],
- "buy_close": float(row["close"]),
- "buy_a1": float(row["a1"]),
- "buy_b1": float(row["b1"]),
- "buy_c1": float(row["c1"]),
- "buy_kdj_buy": bool(row["kdj_buy"]),
- "buy_ql_buy": bool(row["ql_buy"]),
- "b1_repair": float(row["b1"]) - pending_snapshot["signal_b1"],
- "in_release_window": _in_release_window(row.name.date().isoformat()),
- }
- )
- engine._post_real_buy(row, reason)
- just_bought = True
- elif action == "AUX_BUY":
- engine._post_aux_buy(row)
- state_aux_sell_candidate = (not engine.context.in_position) and engine._should_emit_state_aux_sell(row)
- if not just_bought and (engine.context.in_position or bool(row["kdj_sell"] or row["ql_sell"]) or state_aux_sell_candidate):
- if engine.context.in_position and bool(row["kdj_sell"] or row["ql_sell"]):
- engine.context.sell_signal_count += 1
- if bool(row["kdj_sell"]):
- engine.context.kdj_sell_signal_count += 1
- if bool(row["ql_sell"]):
- engine.context.ql_sell_signal_count += 1
- if float(row["b1"]) < 0:
- engine.context.b1_negative_sell_count += 1
- action, reason = engine._sell_decision(row, prev_row)
- if action == "SELL":
- engine.context.first_exit_checked = True
- engine._post_real_sell(row, reason)
- elif action == "AUX_SELL":
- if engine.context.in_position:
- engine.context.first_exit_checked = True
- engine._post_aux_sell(row, reason)
- prev_row = row
- blocked_df = pd.DataFrame(blocked_rows)
- reentry_df = pd.DataFrame(reentry_rows)
- return blocked_df, reentry_df
- def _load_full_history() -> tuple[pd.DataFrame, dict[str, object]]:
- indicator_engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date=None))
- raw = indicator_engine.fetch_daily_data(include_intraday_snapshot=False)
- prepared = indicator_engine.compute(raw.reset_index(drop=False).rename(columns={"index": "date"}))
- history = prepare_indicator_history(prepared)
- latest_bar = history["date"].max().date().isoformat()
- meta = {
- "data_source": "fetch_daily_data+compute",
- "latest_bar": latest_bar,
- "row_count": int(len(history)),
- }
- return history, meta
- def _enrich_block_candidates(
- blocked: pd.DataFrame,
- history: pd.DataFrame,
- config: StrategyConfig,
- ) -> pd.DataFrame:
- if blocked.empty:
- return blocked.copy()
- date_to_pos = {row.date().isoformat(): idx for idx, row in enumerate(history["date"])}
- enriched_rows: list[dict[str, object]] = []
- for row in blocked.itertuples(index=False):
- pos = date_to_pos[str(row.signal_date)]
- signal_row = history.iloc[pos]
- next3 = history.iloc[pos + 1 : pos + 4].copy()
- next10 = history.iloc[pos + 1 : pos + 11].copy()
- next20 = history.iloc[pos + 1 : pos + 21].copy()
- next3_sell_cross = (next3["kdj_sell"] | next3["ql_sell"]) if not next3.empty else pd.Series(dtype=bool)
- confirm_like_bar = _first_confirm_like_bar(
- next3=next3,
- signal_close=float(row.signal_close),
- signal_b1=float(row.signal_b1),
- config=config,
- )
- record = {
- "branch": row.branch,
- "signal_date": row.signal_date,
- "subtype": row.subtype,
- "signal_close": float(row.signal_close),
- "signal_a1": float(row.signal_a1),
- "signal_b1": float(row.signal_b1),
- "signal_c1": float(row.signal_c1),
- "signal_kdj_buy": bool(row.signal_kdj_buy),
- "signal_ql_buy": bool(row.signal_ql_buy),
- "regime": str(signal_row["regime"]),
- "ma20": float(signal_row["ma20"]) if pd.notna(signal_row["ma20"]) else float("nan"),
- "ma60": float(signal_row["ma60"]) if pd.notna(signal_row["ma60"]) else float("nan"),
- "in_release_window": bool(row.in_release_window),
- "bars_available_3": int(len(next3)),
- "bars_available_10": int(len(next10)),
- "bars_available_20": int(len(next20)),
- "next3_any_ql_buy": bool(next3["ql_buy"].any()) if not next3.empty else False,
- "next3_ql_buy_count": int(next3["ql_buy"].sum()) if not next3.empty else 0,
- "next3_any_sell_cross": bool(next3_sell_cross.any()) if not next3.empty else False,
- "next3_sell_cross_count": int(next3_sell_cross.sum()) if not next3.empty else 0,
- "confirm_like_within_3": confirm_like_bar > 0,
- "confirm_like_bar": int(confirm_like_bar) if confirm_like_bar > 0 else 0,
- "next3_max_close_return": _window_max_return(next3, float(row.signal_close)),
- "next3_min_close_return": _window_min_return(next3, float(row.signal_close)),
- "next10_close_return": _window_close_return(next10, float(row.signal_close), 10),
- "next20_close_return": _window_close_return(next20, float(row.signal_close), 20),
- "next20_max_close_return": _window_max_return(next20, float(row.signal_close)),
- "next20_min_close_return": _window_min_return(next20, float(row.signal_close)),
- "next20_max_close_date": _window_extreme_date(next20, "close", "max"),
- }
- enriched_rows.append(record)
- return pd.DataFrame(enriched_rows)
- def _first_confirm_like_bar(next3: pd.DataFrame, signal_close: float, signal_b1: float, config: StrategyConfig) -> int:
- for offset, (_, row) in enumerate(next3.iterrows(), start=1):
- if bool(row["kdj_sell"]) or bool(row["ql_sell"]):
- continue
- if config.glued_followthrough_require_ql_buy_reconfirm and not bool(row["ql_buy"]):
- continue
- if config.glued_followthrough_require_close_break_signal_close and float(row["close"]) <= signal_close:
- continue
- if config.glued_followthrough_require_b1_repair and (float(row["b1"]) - signal_b1) < config.glued_followthrough_b1_repair_min:
- continue
- return offset
- return 0
- def _window_close_return(window: pd.DataFrame, base_price: float, required_bars: int) -> float:
- if len(window) < required_bars:
- return float("nan")
- return float(window.iloc[required_bars - 1]["close"]) / base_price - 1.0
- def _window_max_return(window: pd.DataFrame, base_price: float) -> float:
- if window.empty:
- return float("nan")
- return float(window["close"].max()) / base_price - 1.0
- def _window_min_return(window: pd.DataFrame, base_price: float) -> float:
- if window.empty:
- return float("nan")
- return float(window["close"].min()) / base_price - 1.0
- def _window_extreme_date(window: pd.DataFrame, column: str, mode: str) -> str:
- if window.empty:
- return ""
- idx = window[column].idxmax() if mode == "max" else window[column].idxmin()
- return pd.Timestamp(idx).date().isoformat()
- def _collect_reentry_trade_details(
- history: pd.DataFrame,
- probes: dict[str, ProbeDefinition],
- ) -> pd.DataFrame:
- indexed = history.set_index("date", drop=False)
- rows: list[pd.DataFrame] = []
- for probe in probes.values():
- mapped = collect_followthrough_reentries(indexed, probe.config, branch_name=probe.name)
- _, trades = DragonRuleEngine(config=probe.config).run(indexed)
- trades = trades[trades["buy_reason"].astype(str).str.startswith(REENTRY_REASON_PREFIX)].copy()
- if trades.empty:
- rows.append(
- pd.DataFrame(
- columns=[
- "branch",
- "buy_date",
- "buy_reason",
- "origin_date",
- "subtype",
- "bars_waited",
- "signal_close",
- "signal_a1",
- "signal_b1",
- "signal_c1",
- "buy_close",
- "buy_a1",
- "buy_b1",
- "buy_c1",
- "buy_kdj_buy",
- "buy_ql_buy",
- "b1_repair",
- "in_release_window",
- "buy_price",
- "sell_date",
- "sell_price",
- "sell_reason",
- "holding_days",
- "return_pct",
- ]
- )
- )
- continue
- trades.insert(0, "branch", probe.name)
- merged = mapped.merge(
- trades[
- [
- "branch",
- "buy_date",
- "buy_price",
- "buy_reason",
- "sell_date",
- "sell_price",
- "sell_reason",
- "holding_days",
- "return_pct",
- ]
- ],
- on=["branch", "buy_date", "buy_reason"],
- how="left",
- validate="one_to_one",
- )
- rows.append(merged)
- details = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame()
- if details.empty:
- return details
- return _enrich_reentry_trade_details(details, history)
- def _enrich_reentry_trade_details(details: pd.DataFrame, history: pd.DataFrame) -> pd.DataFrame:
- if details.empty:
- return details.copy()
- if "date" in history.columns and "date" in list(history.index.names):
- history = history.reset_index(drop=True)
- history = history.sort_values("date").reset_index(drop=True)
- date_to_pos = {row.date().isoformat(): idx for idx, row in enumerate(history["date"])}
- open_available = "open" in history.columns and history["open"].notna().any()
- enriched_rows: list[dict[str, object]] = []
- for row in details.itertuples(index=False):
- buy_pos = date_to_pos[str(row.buy_date)]
- sell_pos = date_to_pos[str(row.sell_date)]
- buy_row = history.iloc[buy_pos]
- buy_next = history.iloc[buy_pos + 1] if (buy_pos + 1) < len(history) else None
- sell_next = history.iloc[sell_pos + 1] if (sell_pos + 1) < len(history) else None
- post_exit_5 = history.iloc[sell_pos + 1 : sell_pos + 6]
- post_exit_10 = history.iloc[sell_pos + 1 : sell_pos + 11]
- entry_10 = history.iloc[buy_pos + 1 : buy_pos + 11]
- entry_20 = history.iloc[buy_pos + 1 : buy_pos + 21]
- next_open_return = float("nan")
- if open_available and buy_next is not None and sell_next is not None:
- next_open_return = float(sell_next["open"]) / float(buy_next["open"]) - 1.0
- enriched_rows.append(
- {
- "branch": row.branch,
- "subtype": row.subtype,
- "origin_date": row.origin_date,
- "origin_regime": str(history.iloc[date_to_pos[str(row.origin_date)]]["regime"]) if str(row.origin_date) in date_to_pos else "",
- "buy_date": row.buy_date,
- "buy_regime": str(buy_row["regime"]),
- "buy_reason": row.buy_reason,
- "bars_waited": int(row.bars_waited),
- "signal_close": float(row.signal_close),
- "signal_a1": float(row.signal_a1),
- "signal_b1": float(row.signal_b1),
- "signal_c1": float(row.signal_c1),
- "buy_price": float(row.buy_price),
- "buy_close": float(row.buy_close),
- "buy_a1": float(row.buy_a1),
- "buy_b1": float(row.buy_b1),
- "buy_c1": float(row.buy_c1),
- "b1_repair": float(row.b1_repair),
- "sell_date": row.sell_date,
- "sell_price": float(row.sell_price),
- "sell_reason": row.sell_reason,
- "holding_days": int(row.holding_days),
- "return_pct": float(row.return_pct),
- "next_open_return_pct": next_open_return,
- "next_open_minus_same_close_pct": next_open_return - float(row.return_pct) if pd.notna(next_open_return) else float("nan"),
- "entry_max_close_return_10b": _window_max_return(entry_10, float(row.buy_price)),
- "entry_max_close_return_20b": _window_max_return(entry_20, float(row.buy_price)),
- "post_exit_max_close_return_5b": _window_max_return(post_exit_5, float(row.sell_price)),
- "post_exit_max_close_return_10b": _window_max_return(post_exit_10, float(row.sell_price)),
- "post_exit_min_close_return_5b": _window_min_return(post_exit_5, float(row.sell_price)),
- "in_release_window": bool(row.in_release_window and _in_release_window(str(row.sell_date))),
- }
- )
- return pd.DataFrame(enriched_rows)
- def _summarize_candidates(candidates: pd.DataFrame, reentries: pd.DataFrame) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- subtype_order = {
- "mid_zone_very_weak_b1": 0,
- "high_zone_weak_b1": 1,
- "ql_rebound_weak_followthrough": 2,
- }
- for subtype, group in candidates.groupby("subtype"):
- full_group = group.copy()
- complete_3 = group[group["bars_available_3"] >= 3].copy()
- complete_10 = group[group["bars_available_10"] >= 10].copy()
- complete_20 = group[group["bars_available_20"] >= 20].copy()
- subtype_reentries = reentries[reentries["subtype"] == subtype].copy()
- rows.append(
- {
- "subtype": subtype,
- "sort_key": subtype_order.get(subtype, 99),
- "blocked_count_full": int(len(full_group)),
- "blocked_count_release": int(full_group["in_release_window"].sum()),
- "shadow_confirmed_trade_count": int(len(subtype_reentries)),
- "complete_3bar_count": int(len(complete_3)),
- "complete_10bar_count": int(len(complete_10)),
- "complete_20bar_count": int(len(complete_20)),
- "uptrend_count": int((full_group["regime"] == "uptrend").sum()),
- "range_count": int((full_group["regime"] == "range").sum()),
- "downtrend_count": int((full_group["regime"] == "downtrend").sum()),
- "confirm_like_3bar_rate": float(complete_3["confirm_like_within_3"].mean()) if not complete_3.empty else float("nan"),
- "next3_ql_reconfirm_rate": float(complete_3["next3_any_ql_buy"].mean()) if not complete_3.empty else float("nan"),
- "next3_sell_cross_rate": float(complete_3["next3_any_sell_cross"].mean()) if not complete_3.empty else float("nan"),
- "avg_next3_max_close_return": float(complete_3["next3_max_close_return"].mean()) if not complete_3.empty else float("nan"),
- "avg_next10_close_return": float(complete_10["next10_close_return"].mean()) if not complete_10.empty else float("nan"),
- "avg_next20_close_return": float(complete_20["next20_close_return"].mean()) if not complete_20.empty else float("nan"),
- "avg_next20_max_close_return": float(complete_20["next20_max_close_return"].mean()) if not complete_20.empty else float("nan"),
- }
- )
- summary = pd.DataFrame(rows)
- if summary.empty:
- return summary
- return summary.sort_values(["sort_key", "subtype"]).drop(columns=["sort_key"]).reset_index(drop=True)
- def _summarize_reentries(reentries: pd.DataFrame, probes: dict[str, ProbeDefinition]) -> pd.DataFrame:
- rows: list[dict[str, object]] = []
- for probe in probes.values():
- group = reentries[reentries["branch"] == probe.name].copy()
- rows.append(
- {
- "branch": probe.name,
- "allowed_subtype": probe.allowed_subtype,
- "trades": int(len(group)),
- "release_window_trades": int(group["in_release_window"].sum()) if not group.empty else 0,
- "avg_bars_waited": float(group["bars_waited"].mean()) if not group.empty else float("nan"),
- "same_close_win_rate": float((group["return_pct"] > 0).mean()) if not group.empty else float("nan"),
- "same_close_avg_return": float(group["return_pct"].mean()) if not group.empty else float("nan"),
- "next_open_win_rate": float((group["next_open_return_pct"] > 0).mean()) if not group.empty else float("nan"),
- "next_open_avg_return": float(group["next_open_return_pct"].mean()) if not group.empty else float("nan"),
- "avg_next_open_minus_same_close": float(group["next_open_minus_same_close_pct"].mean()) if not group.empty else float("nan"),
- "avg_holding_days": float(group["holding_days"].mean()) if not group.empty else float("nan"),
- "avg_entry_max_close_return_10b": float(group["entry_max_close_return_10b"].mean()) if not group.empty else float("nan"),
- "avg_entry_max_close_return_20b": float(group["entry_max_close_return_20b"].mean()) if not group.empty else float("nan"),
- "avg_post_exit_max_close_return_5b": float(group["post_exit_max_close_return_5b"].mean()) if not group.empty else float("nan"),
- "avg_post_exit_max_close_return_10b": float(group["post_exit_max_close_return_10b"].mean()) if not group.empty else float("nan"),
- "exit_reason_distribution": _value_counts_string(group["sell_reason"]) if not group.empty else "",
- }
- )
- return pd.DataFrame(rows)
- def _value_counts_string(series: pd.Series) -> str:
- if series.empty:
- return ""
- counts = series.astype(str).value_counts()
- return " | ".join(f"{idx}:{int(val)}" for idx, val in counts.items())
- def _latest_case_review(candidates: pd.DataFrame, latest_signal_date: str) -> list[str]:
- case = candidates[candidates["signal_date"] == latest_signal_date].copy()
- if case.empty:
- return []
- row = case.iloc[0]
- return [
- "## Latest Block Case",
- f"- signal_date `{row['signal_date']}` | subtype `{row['subtype']}` | regime `{row['regime']}`",
- f"- observed bars after block `{int(row['bars_available_3'])}/3` | ql_reconfirm_count `{int(row['next3_ql_buy_count'])}` | sell_cross_count `{int(row['next3_sell_cross_count'])}`",
- f"- max_close_return so far `{_format_pct(float(row['next3_max_close_return']))}` | confirm_like_within_3 `{bool(row['confirm_like_within_3'])}`",
- "",
- ]
- def _build_review_markdown(
- *,
- meta: dict[str, object],
- candidates: pd.DataFrame,
- candidate_summary: pd.DataFrame,
- reentries: pd.DataFrame,
- reentry_summary: pd.DataFrame,
- ) -> str:
- lines = [
- "# Dragon Followthrough Profit Loop Review",
- "",
- "## Scope",
- "- objective: inspect whether false-veto glued weak rebounds deserve a fast trend reentry path",
- f"- data_source: `{meta['data_source']}`",
- f"- latest_bar: `{meta['latest_bar']}`",
- f"- row_count: `{int(meta['row_count'])}`",
- "",
- "## Subtype Readout",
- ]
- for row in candidate_summary.itertuples(index=False):
- lines.append(
- f"- `{row.subtype}` | blocked_full `{int(row.blocked_count_full)}` | shadow_confirmed `{int(row.shadow_confirmed_trade_count)}` | "
- f"confirm_like_3bar `{_format_pct(float(row.confirm_like_3bar_rate))}` | "
- f"next3_sell_cross_rate `{_format_pct(float(row.next3_sell_cross_rate))}` | "
- f"avg_next20_max `{_format_pct(float(row.avg_next20_max_close_return))}`"
- )
- lines.extend(["", "## Probe Readout"])
- for row in reentry_summary.itertuples(index=False):
- lines.append(
- f"- `{row.branch}` -> `{row.allowed_subtype}` | trades `{int(row.trades)}` | "
- f"same_close_avg `{_format_pct(float(row.same_close_avg_return))}` | "
- f"next_open_avg `{_format_pct(float(row.next_open_avg_return))}` | "
- f"post_exit_max_10b `{_format_pct(float(row.avg_post_exit_max_close_return_10b))}` | "
- f"exit_reasons `{row.exit_reason_distribution or 'none'}`"
- )
- latest_signal_date = ""
- if not candidates.empty:
- latest_signal_date = str(candidates["signal_date"].max())
- lines.extend([""] + _latest_case_review(candidates, latest_signal_date))
- mid_probe = reentry_summary[reentry_summary["branch"] == "probe_mid_zone"]
- high_probe = reentry_summary[reentry_summary["branch"] == "probe_high_zone"]
- ql_probe = reentry_summary[reentry_summary["branch"] == "probe_ql_rebound"]
- lines.extend(
- [
- "## Judgment",
- "- `mid_zone_very_weak_b1` is the only subtype that still shows a non-zero delayed-reentry path; it stays the only practical followthrough research lane.",
- "- `high_zone_weak_b1` should not be promoted. Enabling pending there only created losing followthrough trades in this replay.",
- "- `ql_rebound_weak_followthrough` remains a hard-block family. The probe branch still produced zero confirmed reentries there.",
- ]
- )
- if not mid_probe.empty:
- row = mid_probe.iloc[0]
- lines.append(
- f"- execution timing remains a real question only for the mid-zone path: same_close `{_format_pct(float(row['same_close_avg_return']))}` vs "
- f"next_open `{_format_pct(float(row['next_open_avg_return']))}`."
- )
- lines.append(
- "- entry-specific exit treatment is worth a narrow check only if the mid-zone path keeps showing higher post-entry upside than realized return."
- )
- if not high_probe.empty:
- row = high_probe.iloc[0]
- lines.append(
- f"- high-zone probe result is already weak enough to stop here: trades `{int(row['trades'])}`, same_close_avg `{_format_pct(float(row['same_close_avg_return']))}`."
- )
- if not ql_probe.empty:
- row = ql_probe.iloc[0]
- lines.append(
- f"- ql rebound probe stays off the table for promotion: confirmed trades `{int(row['trades'])}`."
- )
- if not reentries.empty:
- lines.extend(["", "## Trade Detail Highlights"])
- for row in reentries.itertuples(index=False):
- lines.append(
- f"- `{row.branch}` | origin `{row.origin_date}` -> buy `{row.buy_date}` -> sell `{row.sell_date}` | "
- f"same_close `{_format_pct(float(row.return_pct))}` | next_open `{_format_pct(float(row.next_open_return_pct))}` | "
- f"sell `{row.sell_reason}` | post_exit_max_10b `{_format_pct(float(row.post_exit_max_close_return_10b))}`"
- )
- return "\n".join(lines) + "\n"
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- history, meta = _load_full_history()
- base_config = alpha_first_glued_refined_hot_cap_config()
- probes = build_followthrough_probe_configs()
- normalized_history = history.reset_index(drop=True) if "date" in history.columns and "date" in list(history.index.names) else history.copy()
- candidates = collect_glued_block_candidates(history, base_config, branch_name="base_rc1")
- candidates = _enrich_block_candidates(candidates, normalized_history, base_config)
- reentries = _collect_reentry_trade_details(history, probes)
- candidate_summary = _summarize_candidates(candidates, reentries)
- reentry_summary = _summarize_reentries(reentries, probes)
- review = _build_review_markdown(
- meta=meta,
- candidates=candidates,
- candidate_summary=candidate_summary,
- reentries=reentries,
- reentry_summary=reentry_summary,
- )
- candidates.to_csv(base_dir / OUTPUT_CANDIDATES, index=False, encoding="utf-8-sig")
- candidate_summary.to_csv(base_dir / OUTPUT_CANDIDATE_SUMMARY, index=False, encoding="utf-8-sig")
- reentries.to_csv(base_dir / OUTPUT_REENTRIES, index=False, encoding="utf-8-sig")
- reentry_summary.to_csv(base_dir / OUTPUT_REENTRY_SUMMARY, index=False, encoding="utf-8-sig")
- (base_dir / OUTPUT_REVIEW).write_text(review, encoding="utf-8")
- if __name__ == "__main__":
- main()
|