from __future__ import annotations from pathlib import Path import pandas as pd from dragon_branch_configs import ( alpha_first_glued_followthrough_mid_exit_probe_config, alpha_first_glued_followthrough_probe_config, alpha_first_glued_refined_hot_cap_config, ) from dragon_execution_common import apply_execution_model, summary from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine from dragon_shared import END_DATE, START_DATE, format_num as _format_num, format_pct as _format_pct from dragon_strategy import DragonRuleEngine REENTRY_REASON_PREFIX = "glued_followthrough_reentry_buy:confirmed_" OUTPUT_BRANCH_SUMMARY = "dragon_followthrough_mid_exit_branch_summary.csv" OUTPUT_TRADE_DETAILS = "dragon_followthrough_mid_exit_trade_details.csv" OUTPUT_TRADE_DIFF = "dragon_followthrough_mid_exit_trade_diff.csv" OUTPUT_REVIEW = "dragon_followthrough_mid_exit_review.md" def _load_history() -> pd.DataFrame: engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date=None)) raw = engine.fetch_daily_data(include_intraday_snapshot=False) indicators = engine.compute(raw.reset_index(drop=False).rename(columns={"index": "date"})) indicators["date"] = pd.to_datetime(indicators["date"]) return indicators.sort_values("date").reset_index(drop=True) def _release_window_trades(trades: pd.DataFrame) -> pd.DataFrame: return trades[ (trades["buy_date"] >= START_DATE) & (trades["buy_date"] <= END_DATE) & (trades["sell_date"] >= START_DATE) & (trades["sell_date"] <= END_DATE) ].copy() def _add_execution_prices(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame: trades = trades.copy() lookup = indicators.set_index(indicators["date"].dt.date) next_by_date = { indicators.iloc[idx]["date"].date().isoformat(): indicators.iloc[idx + 1] for idx in range(len(indicators) - 1) } same_entry: list[float] = [] same_exit: list[float] = [] next_open_entry: list[float] = [] next_open_exit: list[float] = [] for _, trade in trades.iterrows(): buy_row = lookup.loc[pd.Timestamp(trade["buy_date"]).date()] sell_row = lookup.loc[pd.Timestamp(trade["sell_date"]).date()] buy_next = next_by_date.get(trade["buy_date"]) sell_next = next_by_date.get(trade["sell_date"]) same_entry.append(float(buy_row["close"])) same_exit.append(float(sell_row["close"])) next_open_entry.append(float("nan") if buy_next is None else float(buy_next["open"])) next_open_exit.append(float("nan") if sell_next is None else float(sell_next["open"])) trades["exec_same_close_entry"] = same_entry trades["exec_same_close_exit"] = same_exit trades["exec_next_open_entry"] = next_open_entry trades["exec_next_open_exit"] = next_open_exit return trades def _run_branch(indicators: pd.DataFrame, name: str, config) -> tuple[pd.DataFrame, pd.DataFrame]: indexed = indicators.set_index("date", drop=False) _, trades = DragonRuleEngine(config=config).run(indexed) trades = _release_window_trades(trades) trades.insert(0, "branch", name) return indexed, trades def _branch_summary(indicators: pd.DataFrame, name: str, trades: pd.DataFrame) -> list[dict[str, object]]: exec_trades = _add_execution_prices(trades, indicators) same = summary(name, apply_execution_model(exec_trades, "same_close", 0.0)) nxt = summary(name, apply_execution_model(exec_trades, "next_open", 0.0)) return [ { "branch": name, "execution_model": "same_close", "trades": int(same["trades"]), "win_rate": float(same["win_rate"]), "avg_return": float(same["avg_return"]), "profit_factor": float(same["profit_factor"]), "compounded_return": float(same["compounded_return"]), "max_drawdown": float(same["max_drawdown"]), }, { "branch": name, "execution_model": "next_open", "trades": int(nxt["trades"]), "win_rate": float(nxt["win_rate"]), "avg_return": float(nxt["avg_return"]), "profit_factor": float(nxt["profit_factor"]), "compounded_return": float(nxt["compounded_return"]), "max_drawdown": float(nxt["max_drawdown"]), }, ] def _reentry_trade_details(indicators: pd.DataFrame, branch: str, trades: pd.DataFrame) -> pd.DataFrame: details = trades[trades["buy_reason"].astype(str).str.startswith(REENTRY_REASON_PREFIX)].copy() if details.empty: return pd.DataFrame( columns=[ "branch", "buy_date", "buy_reason", "sell_date", "sell_reason", "holding_days", "return_pct", "next_open_return_pct", "post_exit_max_close_return_10b", "post_exit_min_close_return_10b", ] ) indicators = indicators.sort_values("date").reset_index(drop=True) date_to_pos = {row.date().isoformat(): idx for idx, row in enumerate(indicators["date"])} next_by_date = { indicators.iloc[idx]["date"].date().isoformat(): indicators.iloc[idx + 1] for idx in range(len(indicators) - 1) } rows: list[dict[str, object]] = [] for _, trade in details.iterrows(): sell_pos = date_to_pos[trade["sell_date"]] post10 = indicators.iloc[sell_pos + 1 : sell_pos + 11].copy() buy_next = next_by_date.get(trade["buy_date"]) sell_next = next_by_date.get(trade["sell_date"]) next_open_return = float("nan") if buy_next is not None and sell_next is not None: next_open_return = float(sell_next["open"]) / float(buy_next["open"]) - 1.0 rows.append( { "branch": branch, "buy_date": trade["buy_date"], "buy_reason": trade["buy_reason"], "sell_date": trade["sell_date"], "sell_reason": trade["sell_reason"], "holding_days": int(trade["holding_days"]), "return_pct": float(trade["return_pct"]), "next_open_return_pct": next_open_return, "post_exit_max_close_return_10b": float(post10["close"].max()) / float(trade["sell_price"]) - 1.0 if not post10.empty else float("nan"), "post_exit_min_close_return_10b": float(post10["close"].min()) / float(trade["sell_price"]) - 1.0 if not post10.empty else float("nan"), } ) return pd.DataFrame(rows) def _trade_diff(mid_probe: pd.DataFrame, mid_exit_probe: pd.DataFrame) -> pd.DataFrame: key_cols = ["buy_date", "sell_date", "buy_reason", "sell_reason"] left = { (row.buy_date, row.sell_date, row.buy_reason, row.sell_reason) for row in mid_probe.itertuples() } right = { (row.buy_date, row.sell_date, row.buy_reason, row.sell_reason) for row in mid_exit_probe.itertuples() } rows: list[dict[str, object]] = [] for _, trade in mid_probe.iterrows(): key = tuple(trade[col] for col in key_cols) if key not in right: rows.append({"status": "removed_vs_mid_probe", **trade.to_dict()}) for _, trade in mid_exit_probe.iterrows(): key = tuple(trade[col] for col in key_cols) if key not in left: rows.append({"status": "added_vs_mid_probe", **trade.to_dict()}) return pd.DataFrame(rows) def _build_review(summary_df: pd.DataFrame, detail_df: pd.DataFrame, diff_df: pd.DataFrame) -> str: same = summary_df[summary_df["execution_model"] == "same_close"].set_index("branch") nxt = summary_df[summary_df["execution_model"] == "next_open"].set_index("branch") mid_probe_key = "alpha_first_glued_followthrough_probe" exit_probe_key = "alpha_first_glued_followthrough_mid_exit_probe" base_key = "alpha_first_glued_refined_hot_cap" lines = [ "# Dragon Followthrough Mid Exit Review", "", "## Scope", "- focus: shadow-only exit treatment for `mid_zone_very_weak_b1` followthrough reentry", "- objective: test whether the current `knife_take_profit_2_glued` path is clipping a valid repaired trend too early", "", "## Branch Summary", f"- base same_close avg `{_format_pct(float(same.loc[base_key, 'avg_return']))}` | PF `{_format_num(float(same.loc[base_key, 'profit_factor']))}` | comp `{_format_pct(float(same.loc[base_key, 'compounded_return']))}`", f"- mid_probe same_close avg `{_format_pct(float(same.loc[mid_probe_key, 'avg_return']))}` | PF `{_format_num(float(same.loc[mid_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(same.loc[mid_probe_key, 'compounded_return']))}`", f"- mid_exit_probe same_close avg `{_format_pct(float(same.loc[exit_probe_key, 'avg_return']))}` | PF `{_format_num(float(same.loc[exit_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(same.loc[exit_probe_key, 'compounded_return']))}`", f"- base next_open avg `{_format_pct(float(nxt.loc[base_key, 'avg_return']))}` | PF `{_format_num(float(nxt.loc[base_key, 'profit_factor']))}` | comp `{_format_pct(float(nxt.loc[base_key, 'compounded_return']))}`", f"- mid_probe next_open avg `{_format_pct(float(nxt.loc[mid_probe_key, 'avg_return']))}` | PF `{_format_num(float(nxt.loc[mid_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(nxt.loc[mid_probe_key, 'compounded_return']))}`", f"- mid_exit_probe next_open avg `{_format_pct(float(nxt.loc[exit_probe_key, 'avg_return']))}` | PF `{_format_num(float(nxt.loc[exit_probe_key, 'profit_factor']))}` | comp `{_format_pct(float(nxt.loc[exit_probe_key, 'compounded_return']))}`", "", "## Reentry Trade Detail", ] for row in detail_df.itertuples(index=False): lines.append( f"- `{row.branch}` | `{row.buy_date}` -> `{row.sell_date}` | same_close `{_format_pct(float(row.return_pct))}` | " f"next_open `{_format_pct(float(row.next_open_return_pct))}` | sell `{row.sell_reason}` | " f"post_exit_max_10b `{_format_pct(float(row.post_exit_max_close_return_10b))}`" ) lines.extend(["", "## Trade Path Diff Vs Mid Probe"]) for row in diff_df.itertuples(index=False): lines.append( f"- `{row.status}` | `{row.buy_date}` -> `{row.sell_date}` | `{row.buy_reason}` -> `{row.sell_reason}` | return `{_format_pct(float(row.return_pct))}`" ) lines.extend( [ "", "## Judgment", "- The narrow mid-exit probe is materially better than the original mid probe on the only confirmed followthrough sample.", "- It converts the repaired-trend trade from a small loss into a long hold ending with a high-regime exit.", "- It also improves branch-level compounded return versus both the original mid probe and the current RC1 branch in this replay.", "- This is still a one-path result, so it should remain shadow-only for now, but it is strong enough to keep under daily observation.", ] ) return "\n".join(lines) + "\n" def main() -> None: base_dir = Path(__file__).resolve().parent indicators = _load_history() branches = [ ("alpha_first_glued_refined_hot_cap", alpha_first_glued_refined_hot_cap_config()), ("alpha_first_glued_followthrough_probe", alpha_first_glued_followthrough_probe_config()), ("alpha_first_glued_followthrough_mid_exit_probe", alpha_first_glued_followthrough_mid_exit_probe_config()), ] summary_rows: list[dict[str, object]] = [] detail_rows: list[pd.DataFrame] = [] branch_trades: dict[str, pd.DataFrame] = {} for name, config in branches: _, trades = _run_branch(indicators, name, config) branch_trades[name] = trades summary_rows.extend(_branch_summary(indicators, name, trades)) detail_rows.append(_reentry_trade_details(indicators, name, trades)) summary_df = pd.DataFrame(summary_rows) detail_df = pd.concat(detail_rows, ignore_index=True) diff_df = _trade_diff( branch_trades["alpha_first_glued_followthrough_probe"], branch_trades["alpha_first_glued_followthrough_mid_exit_probe"], ) review = _build_review(summary_df, detail_df, diff_df) summary_df.to_csv(base_dir / OUTPUT_BRANCH_SUMMARY, index=False, encoding="utf-8-sig") detail_df.to_csv(base_dir / OUTPUT_TRADE_DETAILS, index=False, encoding="utf-8-sig") diff_df.to_csv(base_dir / OUTPUT_TRADE_DIFF, index=False, encoding="utf-8-sig") (base_dir / OUTPUT_REVIEW).write_text(review, encoding="utf-8") if __name__ == "__main__": main()