from __future__ import annotations from dataclasses import dataclass from pathlib import Path import pandas as pd from dragon_branch_configs import ( alpha_first_glued_followthrough_probe_config, alpha_first_glued_refined_hot_cap_config, ) from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine from dragon_shared import END_DATE, START_DATE, format_pct as _format_pct from dragon_strategy import DragonRuleEngine from dragon_strategy_config import StrategyConfig REENTRY_REASON_PREFIX = "glued_followthrough_reentry_buy:confirmed_" OUTPUT_CANDIDATES = "dragon_followthrough_profit_loop_candidates.csv" OUTPUT_CANDIDATE_SUMMARY = "dragon_followthrough_profit_loop_candidate_summary.csv" OUTPUT_REENTRIES = "dragon_followthrough_profit_loop_reentries.csv" OUTPUT_REENTRY_SUMMARY = "dragon_followthrough_profit_loop_reentry_summary.csv" OUTPUT_REVIEW = "dragon_followthrough_profit_loop_review.md" @dataclass(frozen=True) class ProbeDefinition: name: str label: str allowed_subtype: str config: StrategyConfig def build_followthrough_probe_configs() -> dict[str, ProbeDefinition]: base = alpha_first_glued_refined_hot_cap_config() return { "probe_mid_zone": ProbeDefinition( name="probe_mid_zone", label="pending only for mid_zone_very_weak_b1", allowed_subtype="mid_zone_very_weak_b1", config=alpha_first_glued_followthrough_probe_config(), ), "probe_high_zone": ProbeDefinition( name="probe_high_zone", label="pending only for high_zone_weak_b1", allowed_subtype="high_zone_weak_b1", config=base.with_updates( glued_followthrough_pending_enabled=True, glued_followthrough_allow_mid_zone_very_weak_b1=False, glued_followthrough_allow_high_zone_weak_b1=True, glued_followthrough_allow_ql_rebound_weak_followthrough=False, ), ), "probe_ql_rebound": ProbeDefinition( name="probe_ql_rebound", label="pending only for ql_rebound_weak_followthrough", allowed_subtype="ql_rebound_weak_followthrough", config=base.with_updates( glued_followthrough_pending_enabled=True, glued_followthrough_allow_mid_zone_very_weak_b1=False, glued_followthrough_allow_high_zone_weak_b1=False, glued_followthrough_allow_ql_rebound_weak_followthrough=True, ), ), } def prepare_indicator_history(df: pd.DataFrame) -> pd.DataFrame: history = df.copy() if "date" in history.columns and "date" in list(history.index.names): history = history.reset_index(drop=True) if "date" not in history.columns: history = history.reset_index() if "date" not in history.columns: history = history.rename(columns={history.columns[0]: "date"}) history["date"] = pd.to_datetime(history["date"]) history = history.sort_values("date").reset_index(drop=True) for column in ["open", "high", "low", "close", "a1", "b1", "c1"]: if column in history.columns: history[column] = history[column].astype(float) for column in ["kdj_buy", "kdj_sell", "ql_buy", "ql_sell"]: if column in history.columns: history[column] = history[column].astype(bool) history["ma20"] = history["close"].rolling(20).mean() history["ma60"] = history["close"].rolling(60).mean() history["regime"] = history.apply(_classify_regime, axis=1) return history.set_index("date", drop=False) def collect_glued_block_candidates( df: pd.DataFrame, config: StrategyConfig, *, branch_name: str = "base_rc1", ) -> pd.DataFrame: indexed = prepare_indicator_history(df) blocked, _ = _collect_branch_diagnostics(indexed, config, branch_name) return blocked def collect_followthrough_reentries( df: pd.DataFrame, config: StrategyConfig, *, branch_name: str = "probe", ) -> pd.DataFrame: indexed = prepare_indicator_history(df) _, reentries = _collect_branch_diagnostics(indexed, config, branch_name) return reentries def _classify_regime(row: pd.Series) -> str: close = float(row["close"]) ma20 = float(row["ma20"]) if pd.notna(row["ma20"]) else float("nan") ma60 = float(row["ma60"]) if pd.notna(row["ma60"]) else float("nan") if pd.isna(ma20) or pd.isna(ma60): return "unknown" if close >= ma20 >= ma60: return "uptrend" if close <= ma20 <= ma60: return "downtrend" return "range" def _in_release_window(value: str) -> bool: return START_DATE <= value <= END_DATE def _collect_branch_diagnostics( indexed: pd.DataFrame, config: StrategyConfig, branch_name: str, ) -> tuple[pd.DataFrame, pd.DataFrame]: engine = DragonRuleEngine(config=config) blocked_rows: list[dict[str, object]] = [] reentry_rows: list[dict[str, object]] = [] prev_row = None for _, row in indexed.iterrows(): engine._record_cross_counters(row) engine._update_position_counters(row) engine._update_pending_states(row) just_bought = False pending_snapshot = { "origin_date": ( engine.context.pending_glued_followthrough_origin_date.isoformat() if engine.context.pending_glued_followthrough_origin_date else "" ), "subtype": engine.context.pending_glued_followthrough_subtype, "bars_waited": int(engine.context.pending_glued_followthrough_bars_waited), "signal_close": float(engine.context.pending_glued_followthrough_signal_close or 0.0), "signal_a1": float(engine.context.pending_glued_followthrough_a1 or 0.0), "signal_b1": float(engine.context.pending_glued_followthrough_b1 or 0.0), "signal_c1": float(engine.context.pending_glued_followthrough_c1 or 0.0), } if (not engine.context.in_position) or bool(row["kdj_buy"] or row["ql_buy"]): action, reason = engine._buy_decision(row, prev_row) if action == "BLOCK" and reason == "buy_block_glued_high_weak_rebound": subtype = engine._glued_high_weak_rebound_subtype(row) blocked_rows.append( { "branch": branch_name, "signal_date": row.name.date().isoformat(), "subtype": subtype, "signal_close": float(row["close"]), "signal_a1": float(row["a1"]), "signal_b1": float(row["b1"]), "signal_c1": float(row["c1"]), "signal_kdj_buy": bool(row["kdj_buy"]), "signal_ql_buy": bool(row["ql_buy"]), "in_release_window": _in_release_window(row.name.date().isoformat()), } ) elif action == "BUY": if reason.startswith(REENTRY_REASON_PREFIX): reentry_rows.append( { "branch": branch_name, "buy_date": row.name.date().isoformat(), "buy_reason": reason, "origin_date": pending_snapshot["origin_date"], "subtype": pending_snapshot["subtype"], "bars_waited": pending_snapshot["bars_waited"], "signal_close": pending_snapshot["signal_close"], "signal_a1": pending_snapshot["signal_a1"], "signal_b1": pending_snapshot["signal_b1"], "signal_c1": pending_snapshot["signal_c1"], "buy_close": float(row["close"]), "buy_a1": float(row["a1"]), "buy_b1": float(row["b1"]), "buy_c1": float(row["c1"]), "buy_kdj_buy": bool(row["kdj_buy"]), "buy_ql_buy": bool(row["ql_buy"]), "b1_repair": float(row["b1"]) - pending_snapshot["signal_b1"], "in_release_window": _in_release_window(row.name.date().isoformat()), } ) engine._post_real_buy(row, reason) just_bought = True elif action == "AUX_BUY": engine._post_aux_buy(row) state_aux_sell_candidate = (not engine.context.in_position) and engine._should_emit_state_aux_sell(row) if not just_bought and (engine.context.in_position or bool(row["kdj_sell"] or row["ql_sell"]) or state_aux_sell_candidate): if engine.context.in_position and bool(row["kdj_sell"] or row["ql_sell"]): engine.context.sell_signal_count += 1 if bool(row["kdj_sell"]): engine.context.kdj_sell_signal_count += 1 if bool(row["ql_sell"]): engine.context.ql_sell_signal_count += 1 if float(row["b1"]) < 0: engine.context.b1_negative_sell_count += 1 action, reason = engine._sell_decision(row, prev_row) if action == "SELL": engine.context.first_exit_checked = True engine._post_real_sell(row, reason) elif action == "AUX_SELL": if engine.context.in_position: engine.context.first_exit_checked = True engine._post_aux_sell(row, reason) prev_row = row blocked_df = pd.DataFrame(blocked_rows) reentry_df = pd.DataFrame(reentry_rows) return blocked_df, reentry_df def _load_full_history() -> tuple[pd.DataFrame, dict[str, object]]: indicator_engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date=None)) raw = indicator_engine.fetch_daily_data(include_intraday_snapshot=False) prepared = indicator_engine.compute(raw.reset_index(drop=False).rename(columns={"index": "date"})) history = prepare_indicator_history(prepared) latest_bar = history["date"].max().date().isoformat() meta = { "data_source": "fetch_daily_data+compute", "latest_bar": latest_bar, "row_count": int(len(history)), } return history, meta def _enrich_block_candidates( blocked: pd.DataFrame, history: pd.DataFrame, config: StrategyConfig, ) -> pd.DataFrame: if blocked.empty: return blocked.copy() date_to_pos = {row.date().isoformat(): idx for idx, row in enumerate(history["date"])} enriched_rows: list[dict[str, object]] = [] for row in blocked.itertuples(index=False): pos = date_to_pos[str(row.signal_date)] signal_row = history.iloc[pos] next3 = history.iloc[pos + 1 : pos + 4].copy() next10 = history.iloc[pos + 1 : pos + 11].copy() next20 = history.iloc[pos + 1 : pos + 21].copy() next3_sell_cross = (next3["kdj_sell"] | next3["ql_sell"]) if not next3.empty else pd.Series(dtype=bool) confirm_like_bar = _first_confirm_like_bar( next3=next3, signal_close=float(row.signal_close), signal_b1=float(row.signal_b1), config=config, ) record = { "branch": row.branch, "signal_date": row.signal_date, "subtype": row.subtype, "signal_close": float(row.signal_close), "signal_a1": float(row.signal_a1), "signal_b1": float(row.signal_b1), "signal_c1": float(row.signal_c1), "signal_kdj_buy": bool(row.signal_kdj_buy), "signal_ql_buy": bool(row.signal_ql_buy), "regime": str(signal_row["regime"]), "ma20": float(signal_row["ma20"]) if pd.notna(signal_row["ma20"]) else float("nan"), "ma60": float(signal_row["ma60"]) if pd.notna(signal_row["ma60"]) else float("nan"), "in_release_window": bool(row.in_release_window), "bars_available_3": int(len(next3)), "bars_available_10": int(len(next10)), "bars_available_20": int(len(next20)), "next3_any_ql_buy": bool(next3["ql_buy"].any()) if not next3.empty else False, "next3_ql_buy_count": int(next3["ql_buy"].sum()) if not next3.empty else 0, "next3_any_sell_cross": bool(next3_sell_cross.any()) if not next3.empty else False, "next3_sell_cross_count": int(next3_sell_cross.sum()) if not next3.empty else 0, "confirm_like_within_3": confirm_like_bar > 0, "confirm_like_bar": int(confirm_like_bar) if confirm_like_bar > 0 else 0, "next3_max_close_return": _window_max_return(next3, float(row.signal_close)), "next3_min_close_return": _window_min_return(next3, float(row.signal_close)), "next10_close_return": _window_close_return(next10, float(row.signal_close), 10), "next20_close_return": _window_close_return(next20, float(row.signal_close), 20), "next20_max_close_return": _window_max_return(next20, float(row.signal_close)), "next20_min_close_return": _window_min_return(next20, float(row.signal_close)), "next20_max_close_date": _window_extreme_date(next20, "close", "max"), } enriched_rows.append(record) return pd.DataFrame(enriched_rows) def _first_confirm_like_bar(next3: pd.DataFrame, signal_close: float, signal_b1: float, config: StrategyConfig) -> int: for offset, (_, row) in enumerate(next3.iterrows(), start=1): if bool(row["kdj_sell"]) or bool(row["ql_sell"]): continue if config.glued_followthrough_require_ql_buy_reconfirm and not bool(row["ql_buy"]): continue if config.glued_followthrough_require_close_break_signal_close and float(row["close"]) <= signal_close: continue if config.glued_followthrough_require_b1_repair and (float(row["b1"]) - signal_b1) < config.glued_followthrough_b1_repair_min: continue return offset return 0 def _window_close_return(window: pd.DataFrame, base_price: float, required_bars: int) -> float: if len(window) < required_bars: return float("nan") return float(window.iloc[required_bars - 1]["close"]) / base_price - 1.0 def _window_max_return(window: pd.DataFrame, base_price: float) -> float: if window.empty: return float("nan") return float(window["close"].max()) / base_price - 1.0 def _window_min_return(window: pd.DataFrame, base_price: float) -> float: if window.empty: return float("nan") return float(window["close"].min()) / base_price - 1.0 def _window_extreme_date(window: pd.DataFrame, column: str, mode: str) -> str: if window.empty: return "" idx = window[column].idxmax() if mode == "max" else window[column].idxmin() return pd.Timestamp(idx).date().isoformat() def _collect_reentry_trade_details( history: pd.DataFrame, probes: dict[str, ProbeDefinition], ) -> pd.DataFrame: indexed = history.set_index("date", drop=False) rows: list[pd.DataFrame] = [] for probe in probes.values(): mapped = collect_followthrough_reentries(indexed, probe.config, branch_name=probe.name) _, trades = DragonRuleEngine(config=probe.config).run(indexed) trades = trades[trades["buy_reason"].astype(str).str.startswith(REENTRY_REASON_PREFIX)].copy() if trades.empty: rows.append( pd.DataFrame( columns=[ "branch", "buy_date", "buy_reason", "origin_date", "subtype", "bars_waited", "signal_close", "signal_a1", "signal_b1", "signal_c1", "buy_close", "buy_a1", "buy_b1", "buy_c1", "buy_kdj_buy", "buy_ql_buy", "b1_repair", "in_release_window", "buy_price", "sell_date", "sell_price", "sell_reason", "holding_days", "return_pct", ] ) ) continue trades.insert(0, "branch", probe.name) merged = mapped.merge( trades[ [ "branch", "buy_date", "buy_price", "buy_reason", "sell_date", "sell_price", "sell_reason", "holding_days", "return_pct", ] ], on=["branch", "buy_date", "buy_reason"], how="left", validate="one_to_one", ) rows.append(merged) details = pd.concat(rows, ignore_index=True) if rows else pd.DataFrame() if details.empty: return details return _enrich_reentry_trade_details(details, history) def _enrich_reentry_trade_details(details: pd.DataFrame, history: pd.DataFrame) -> pd.DataFrame: if details.empty: return details.copy() if "date" in history.columns and "date" in list(history.index.names): history = history.reset_index(drop=True) history = history.sort_values("date").reset_index(drop=True) date_to_pos = {row.date().isoformat(): idx for idx, row in enumerate(history["date"])} open_available = "open" in history.columns and history["open"].notna().any() enriched_rows: list[dict[str, object]] = [] for row in details.itertuples(index=False): buy_pos = date_to_pos[str(row.buy_date)] sell_pos = date_to_pos[str(row.sell_date)] buy_row = history.iloc[buy_pos] buy_next = history.iloc[buy_pos + 1] if (buy_pos + 1) < len(history) else None sell_next = history.iloc[sell_pos + 1] if (sell_pos + 1) < len(history) else None post_exit_5 = history.iloc[sell_pos + 1 : sell_pos + 6] post_exit_10 = history.iloc[sell_pos + 1 : sell_pos + 11] entry_10 = history.iloc[buy_pos + 1 : buy_pos + 11] entry_20 = history.iloc[buy_pos + 1 : buy_pos + 21] next_open_return = float("nan") if open_available and buy_next is not None and sell_next is not None: next_open_return = float(sell_next["open"]) / float(buy_next["open"]) - 1.0 enriched_rows.append( { "branch": row.branch, "subtype": row.subtype, "origin_date": row.origin_date, "origin_regime": str(history.iloc[date_to_pos[str(row.origin_date)]]["regime"]) if str(row.origin_date) in date_to_pos else "", "buy_date": row.buy_date, "buy_regime": str(buy_row["regime"]), "buy_reason": row.buy_reason, "bars_waited": int(row.bars_waited), "signal_close": float(row.signal_close), "signal_a1": float(row.signal_a1), "signal_b1": float(row.signal_b1), "signal_c1": float(row.signal_c1), "buy_price": float(row.buy_price), "buy_close": float(row.buy_close), "buy_a1": float(row.buy_a1), "buy_b1": float(row.buy_b1), "buy_c1": float(row.buy_c1), "b1_repair": float(row.b1_repair), "sell_date": row.sell_date, "sell_price": float(row.sell_price), "sell_reason": row.sell_reason, "holding_days": int(row.holding_days), "return_pct": float(row.return_pct), "next_open_return_pct": next_open_return, "next_open_minus_same_close_pct": next_open_return - float(row.return_pct) if pd.notna(next_open_return) else float("nan"), "entry_max_close_return_10b": _window_max_return(entry_10, float(row.buy_price)), "entry_max_close_return_20b": _window_max_return(entry_20, float(row.buy_price)), "post_exit_max_close_return_5b": _window_max_return(post_exit_5, float(row.sell_price)), "post_exit_max_close_return_10b": _window_max_return(post_exit_10, float(row.sell_price)), "post_exit_min_close_return_5b": _window_min_return(post_exit_5, float(row.sell_price)), "in_release_window": bool(row.in_release_window and _in_release_window(str(row.sell_date))), } ) return pd.DataFrame(enriched_rows) def _summarize_candidates(candidates: pd.DataFrame, reentries: pd.DataFrame) -> pd.DataFrame: rows: list[dict[str, object]] = [] subtype_order = { "mid_zone_very_weak_b1": 0, "high_zone_weak_b1": 1, "ql_rebound_weak_followthrough": 2, } for subtype, group in candidates.groupby("subtype"): full_group = group.copy() complete_3 = group[group["bars_available_3"] >= 3].copy() complete_10 = group[group["bars_available_10"] >= 10].copy() complete_20 = group[group["bars_available_20"] >= 20].copy() subtype_reentries = reentries[reentries["subtype"] == subtype].copy() rows.append( { "subtype": subtype, "sort_key": subtype_order.get(subtype, 99), "blocked_count_full": int(len(full_group)), "blocked_count_release": int(full_group["in_release_window"].sum()), "shadow_confirmed_trade_count": int(len(subtype_reentries)), "complete_3bar_count": int(len(complete_3)), "complete_10bar_count": int(len(complete_10)), "complete_20bar_count": int(len(complete_20)), "uptrend_count": int((full_group["regime"] == "uptrend").sum()), "range_count": int((full_group["regime"] == "range").sum()), "downtrend_count": int((full_group["regime"] == "downtrend").sum()), "confirm_like_3bar_rate": float(complete_3["confirm_like_within_3"].mean()) if not complete_3.empty else float("nan"), "next3_ql_reconfirm_rate": float(complete_3["next3_any_ql_buy"].mean()) if not complete_3.empty else float("nan"), "next3_sell_cross_rate": float(complete_3["next3_any_sell_cross"].mean()) if not complete_3.empty else float("nan"), "avg_next3_max_close_return": float(complete_3["next3_max_close_return"].mean()) if not complete_3.empty else float("nan"), "avg_next10_close_return": float(complete_10["next10_close_return"].mean()) if not complete_10.empty else float("nan"), "avg_next20_close_return": float(complete_20["next20_close_return"].mean()) if not complete_20.empty else float("nan"), "avg_next20_max_close_return": float(complete_20["next20_max_close_return"].mean()) if not complete_20.empty else float("nan"), } ) summary = pd.DataFrame(rows) if summary.empty: return summary return summary.sort_values(["sort_key", "subtype"]).drop(columns=["sort_key"]).reset_index(drop=True) def _summarize_reentries(reentries: pd.DataFrame, probes: dict[str, ProbeDefinition]) -> pd.DataFrame: rows: list[dict[str, object]] = [] for probe in probes.values(): group = reentries[reentries["branch"] == probe.name].copy() rows.append( { "branch": probe.name, "allowed_subtype": probe.allowed_subtype, "trades": int(len(group)), "release_window_trades": int(group["in_release_window"].sum()) if not group.empty else 0, "avg_bars_waited": float(group["bars_waited"].mean()) if not group.empty else float("nan"), "same_close_win_rate": float((group["return_pct"] > 0).mean()) if not group.empty else float("nan"), "same_close_avg_return": float(group["return_pct"].mean()) if not group.empty else float("nan"), "next_open_win_rate": float((group["next_open_return_pct"] > 0).mean()) if not group.empty else float("nan"), "next_open_avg_return": float(group["next_open_return_pct"].mean()) if not group.empty else float("nan"), "avg_next_open_minus_same_close": float(group["next_open_minus_same_close_pct"].mean()) if not group.empty else float("nan"), "avg_holding_days": float(group["holding_days"].mean()) if not group.empty else float("nan"), "avg_entry_max_close_return_10b": float(group["entry_max_close_return_10b"].mean()) if not group.empty else float("nan"), "avg_entry_max_close_return_20b": float(group["entry_max_close_return_20b"].mean()) if not group.empty else float("nan"), "avg_post_exit_max_close_return_5b": float(group["post_exit_max_close_return_5b"].mean()) if not group.empty else float("nan"), "avg_post_exit_max_close_return_10b": float(group["post_exit_max_close_return_10b"].mean()) if not group.empty else float("nan"), "exit_reason_distribution": _value_counts_string(group["sell_reason"]) if not group.empty else "", } ) return pd.DataFrame(rows) def _value_counts_string(series: pd.Series) -> str: if series.empty: return "" counts = series.astype(str).value_counts() return " | ".join(f"{idx}:{int(val)}" for idx, val in counts.items()) def _latest_case_review(candidates: pd.DataFrame, latest_signal_date: str) -> list[str]: case = candidates[candidates["signal_date"] == latest_signal_date].copy() if case.empty: return [] row = case.iloc[0] return [ "## Latest Block Case", f"- signal_date `{row['signal_date']}` | subtype `{row['subtype']}` | regime `{row['regime']}`", f"- observed bars after block `{int(row['bars_available_3'])}/3` | ql_reconfirm_count `{int(row['next3_ql_buy_count'])}` | sell_cross_count `{int(row['next3_sell_cross_count'])}`", f"- max_close_return so far `{_format_pct(float(row['next3_max_close_return']))}` | confirm_like_within_3 `{bool(row['confirm_like_within_3'])}`", "", ] def _build_review_markdown( *, meta: dict[str, object], candidates: pd.DataFrame, candidate_summary: pd.DataFrame, reentries: pd.DataFrame, reentry_summary: pd.DataFrame, ) -> str: lines = [ "# Dragon Followthrough Profit Loop Review", "", "## Scope", "- objective: inspect whether false-veto glued weak rebounds deserve a fast trend reentry path", f"- data_source: `{meta['data_source']}`", f"- latest_bar: `{meta['latest_bar']}`", f"- row_count: `{int(meta['row_count'])}`", "", "## Subtype Readout", ] for row in candidate_summary.itertuples(index=False): lines.append( f"- `{row.subtype}` | blocked_full `{int(row.blocked_count_full)}` | shadow_confirmed `{int(row.shadow_confirmed_trade_count)}` | " f"confirm_like_3bar `{_format_pct(float(row.confirm_like_3bar_rate))}` | " f"next3_sell_cross_rate `{_format_pct(float(row.next3_sell_cross_rate))}` | " f"avg_next20_max `{_format_pct(float(row.avg_next20_max_close_return))}`" ) lines.extend(["", "## Probe Readout"]) for row in reentry_summary.itertuples(index=False): lines.append( f"- `{row.branch}` -> `{row.allowed_subtype}` | trades `{int(row.trades)}` | " f"same_close_avg `{_format_pct(float(row.same_close_avg_return))}` | " f"next_open_avg `{_format_pct(float(row.next_open_avg_return))}` | " f"post_exit_max_10b `{_format_pct(float(row.avg_post_exit_max_close_return_10b))}` | " f"exit_reasons `{row.exit_reason_distribution or 'none'}`" ) latest_signal_date = "" if not candidates.empty: latest_signal_date = str(candidates["signal_date"].max()) lines.extend([""] + _latest_case_review(candidates, latest_signal_date)) mid_probe = reentry_summary[reentry_summary["branch"] == "probe_mid_zone"] high_probe = reentry_summary[reentry_summary["branch"] == "probe_high_zone"] ql_probe = reentry_summary[reentry_summary["branch"] == "probe_ql_rebound"] lines.extend( [ "## Judgment", "- `mid_zone_very_weak_b1` is the only subtype that still shows a non-zero delayed-reentry path; it stays the only practical followthrough research lane.", "- `high_zone_weak_b1` should not be promoted. Enabling pending there only created losing followthrough trades in this replay.", "- `ql_rebound_weak_followthrough` remains a hard-block family. The probe branch still produced zero confirmed reentries there.", ] ) if not mid_probe.empty: row = mid_probe.iloc[0] lines.append( f"- execution timing remains a real question only for the mid-zone path: same_close `{_format_pct(float(row['same_close_avg_return']))}` vs " f"next_open `{_format_pct(float(row['next_open_avg_return']))}`." ) lines.append( "- entry-specific exit treatment is worth a narrow check only if the mid-zone path keeps showing higher post-entry upside than realized return." ) if not high_probe.empty: row = high_probe.iloc[0] lines.append( f"- high-zone probe result is already weak enough to stop here: trades `{int(row['trades'])}`, same_close_avg `{_format_pct(float(row['same_close_avg_return']))}`." ) if not ql_probe.empty: row = ql_probe.iloc[0] lines.append( f"- ql rebound probe stays off the table for promotion: confirmed trades `{int(row['trades'])}`." ) if not reentries.empty: lines.extend(["", "## Trade Detail Highlights"]) for row in reentries.itertuples(index=False): lines.append( f"- `{row.branch}` | origin `{row.origin_date}` -> buy `{row.buy_date}` -> sell `{row.sell_date}` | " f"same_close `{_format_pct(float(row.return_pct))}` | next_open `{_format_pct(float(row.next_open_return_pct))}` | " f"sell `{row.sell_reason}` | post_exit_max_10b `{_format_pct(float(row.post_exit_max_close_return_10b))}`" ) return "\n".join(lines) + "\n" def main() -> None: base_dir = Path(__file__).resolve().parent history, meta = _load_full_history() base_config = alpha_first_glued_refined_hot_cap_config() probes = build_followthrough_probe_configs() normalized_history = history.reset_index(drop=True) if "date" in history.columns and "date" in list(history.index.names) else history.copy() candidates = collect_glued_block_candidates(history, base_config, branch_name="base_rc1") candidates = _enrich_block_candidates(candidates, normalized_history, base_config) reentries = _collect_reentry_trade_details(history, probes) candidate_summary = _summarize_candidates(candidates, reentries) reentry_summary = _summarize_reentries(reentries, probes) review = _build_review_markdown( meta=meta, candidates=candidates, candidate_summary=candidate_summary, reentries=reentries, reentry_summary=reentry_summary, ) candidates.to_csv(base_dir / OUTPUT_CANDIDATES, index=False, encoding="utf-8-sig") candidate_summary.to_csv(base_dir / OUTPUT_CANDIDATE_SUMMARY, index=False, encoding="utf-8-sig") reentries.to_csv(base_dir / OUTPUT_REENTRIES, index=False, encoding="utf-8-sig") reentry_summary.to_csv(base_dir / OUTPUT_REENTRY_SUMMARY, index=False, encoding="utf-8-sig") (base_dir / OUTPUT_REVIEW).write_text(review, encoding="utf-8") if __name__ == "__main__": main()