| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Optional
- import pandas as pd
- from dragon_branch_configs import alpha_first_glued_refined_hot_cap_config, alpha_first_selective_veto_config
- from dragon_strategy import DragonRuleEngine
- START_DATE = "2016-01-01"
- END_DATE = "2025-12-31"
- def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
- return pd.read_csv(base_dir / name, encoding="utf-8-sig")
- def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame:
- df = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
- df["date"] = pd.to_datetime(df["date"])
- return df.sort_values("date").reset_index(drop=True)
- def _profit_factor(series: pd.Series) -> float:
- gross_profit = series[series > 0].sum()
- gross_loss = -series[series < 0].sum()
- if gross_loss == 0:
- return float("inf") if gross_profit > 0 else 0.0
- return float(gross_profit / gross_loss)
- def _pct(value: Optional[float]) -> str:
- if value is None or pd.isna(value):
- return "n/a"
- if value == float("inf"):
- return "inf"
- return f"{float(value):.2%}"
- def _holding_bucket(days: int) -> str:
- if days <= 5:
- return "00-05d"
- if days <= 10:
- return "06-10d"
- if days <= 20:
- return "11-20d"
- if days <= 40:
- return "21-40d"
- return "41d+"
- def _build_trade_quality(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
- trades = trades.copy()
- trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
- pos_lookup = {dt.date().isoformat(): idx for idx, dt in enumerate(indicators["date"])}
- indicator_by_date = indicators.set_index(indicators["date"].dt.date)
- buy_a1: list[float] = []
- buy_b1: list[float] = []
- buy_c1: list[float] = []
- mfe_list: list[float] = []
- mae_list: list[float] = []
- entry_forward_list: list[float] = []
- exit_followthrough_list: list[float] = []
- for _, trade in trades.iterrows():
- buy_date = pd.Timestamp(trade["buy_date"]).date()
- entry_price = float(trade["buy_price"])
- exit_price = float(trade["sell_price"])
- buy_row = indicator_by_date.loc[buy_date]
- buy_a1.append(float(buy_row["a1"]))
- buy_b1.append(float(buy_row["b1"]))
- buy_c1.append(float(buy_row["c1"]))
- buy_idx = pos_lookup[trade["buy_date"]]
- sell_idx = pos_lookup[trade["sell_date"]]
- window = indicators[
- (indicators["date"] >= pd.Timestamp(trade["buy_date"])) & (indicators["date"] <= pd.Timestamp(trade["sell_date"]))
- ]
- mfe_list.append(float(window["high"].max()) / entry_price - 1.0)
- mae_list.append(float(window["low"].min()) / entry_price - 1.0)
- buy_future = indicators.iloc[buy_idx + 1 : buy_idx + 6]
- sell_future = indicators.iloc[sell_idx + 1 : sell_idx + 6]
- entry_forward_list.append(float("nan") if buy_future.empty else float(buy_future["close"].iloc[-1]) / entry_price - 1.0)
- exit_followthrough_list.append(float("nan") if sell_future.empty else float(sell_future["low"].min()) / exit_price - 1.0)
- trades["buy_a1"] = buy_a1
- trades["buy_b1"] = buy_b1
- trades["buy_c1"] = buy_c1
- trades["mfe_pct"] = mfe_list
- trades["mae_pct"] = mae_list
- trades["entry_forward_5d_pct"] = entry_forward_list
- trades["exit_followthrough_5d_pct"] = exit_followthrough_list
- return trades
- def _run_branch(indicators: pd.DataFrame, config) -> pd.DataFrame:
- indexed = indicators.set_index("date", drop=False)
- engine = DragonRuleEngine(config=config)
- _, trades = engine.run(indexed)
- trades = trades[
- (trades["buy_date"] >= START_DATE)
- & (trades["buy_date"] <= END_DATE)
- & (trades["sell_date"] >= START_DATE)
- & (trades["sell_date"] <= END_DATE)
- ].copy()
- return _build_trade_quality(trades.copy(), indicators)
- def _trade_key(df: pd.DataFrame) -> set[tuple[str, str, str, str]]:
- return set(zip(df["buy_date"], df["sell_date"], df["buy_reason"], df["sell_reason"]))
- def _veto_bucket(c1: float, b1: float) -> str:
- if 23 <= c1 < 28 and b1 <= 0.02:
- return "low_weak_range"
- if 40 <= c1 < 75 and b1 >= 0.10:
- return "hot_positive_b1_cap75"
- return "other"
- def _recommendation(row: pd.Series) -> tuple[str, str]:
- ret = float(row["return_pct"])
- mfe = float(row["mfe_pct"])
- holding = int(row["holding_days"])
- replacement_ret = row.get("replacement_return_pct")
- if ret < 0 and holding <= 10 and float(row["exit_followthrough_5d_pct"]) <= 0:
- return "KEEP_REMOVAL", "Removed trade is a short loser and price still weakens after exit."
- if ret < 0 and mfe <= 0.02:
- return "KEEP_REMOVAL", "Removed trade never developed enough profit room to defend its inclusion."
- if ret > 0:
- if ret <= 0.01 and pd.notna(replacement_ret) and float(replacement_ret) <= ret:
- return "OBSERVE_REMOVAL", "Removed trade is only a micro-winner and the replacement path is not stronger."
- return "OVER_REMOVAL", "Removed trade keeps meaningful alpha and should not be discarded without a better replacement."
- return "KEEP_REMOVAL", "Removed trade remains a weak short-holding sample under the alpha-first objective."
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- indicators = _load_indicator_snapshot(base_dir)
- workbook_events = _load_csv(base_dir, "true_trade_events.csv")
- alpha_trades = _run_branch(indicators, alpha_first_selective_veto_config())
- refined_trades = _run_branch(indicators, alpha_first_glued_refined_hot_cap_config())
- workbook_buy = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "BUY")]["date"])
- workbook_sell = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "SELL")]["date"])
- removed = pd.DataFrame(
- sorted(_trade_key(alpha_trades) - _trade_key(refined_trades)),
- columns=["buy_date", "sell_date", "buy_reason", "sell_reason"],
- )
- rows: list[dict[str, object]] = []
- for _, removed_row in removed.iterrows():
- trade = alpha_trades[
- (alpha_trades["buy_date"] == removed_row["buy_date"])
- & (alpha_trades["sell_date"] == removed_row["sell_date"])
- & (alpha_trades["buy_reason"] == removed_row["buy_reason"])
- & (alpha_trades["sell_reason"] == removed_row["sell_reason"])
- ].iloc[0]
- sell_dt = pd.Timestamp(trade["sell_date"])
- replacement = refined_trades[
- (pd.to_datetime(refined_trades["buy_date"]) > sell_dt)
- & (pd.to_datetime(refined_trades["buy_date"]) <= sell_dt + pd.Timedelta(days=10))
- ].sort_values("buy_date")
- replacement_row = replacement.iloc[0] if not replacement.empty else None
- recommendation, reason = _recommendation(
- pd.Series(
- {
- **trade.to_dict(),
- "replacement_return_pct": None if replacement_row is None else float(replacement_row["return_pct"]),
- }
- )
- )
- rows.append(
- {
- "buy_date": trade["buy_date"],
- "sell_date": trade["sell_date"],
- "buy_reason": trade["buy_reason"],
- "sell_reason": trade["sell_reason"],
- "veto_bucket": _veto_bucket(float(trade["buy_c1"]), float(trade["buy_b1"])),
- "holding_bucket": trade["holding_bucket"],
- "holding_days": int(trade["holding_days"]),
- "return_pct": float(trade["return_pct"]),
- "mfe_pct": float(trade["mfe_pct"]),
- "mae_pct": float(trade["mae_pct"]),
- "entry_forward_5d_pct": float(trade["entry_forward_5d_pct"]),
- "exit_followthrough_5d_pct": float(trade["exit_followthrough_5d_pct"]),
- "buy_a1": float(trade["buy_a1"]),
- "buy_b1": float(trade["buy_b1"]),
- "buy_c1": float(trade["buy_c1"]),
- "buy_aligned_with_workbook": trade["buy_date"] in workbook_buy,
- "sell_aligned_with_workbook": trade["sell_date"] in workbook_sell,
- "replacement_buy_date": "" if replacement_row is None else str(replacement_row["buy_date"]),
- "replacement_sell_date": "" if replacement_row is None else str(replacement_row["sell_date"]),
- "replacement_buy_reason": "" if replacement_row is None else str(replacement_row["buy_reason"]),
- "replacement_sell_reason": "" if replacement_row is None else str(replacement_row["sell_reason"]),
- "replacement_return_pct": float("nan") if replacement_row is None else float(replacement_row["return_pct"]),
- "replacement_gap_days": float("nan")
- if replacement_row is None
- else int((pd.Timestamp(replacement_row["buy_date"]) - sell_dt).days),
- "recommendation": recommendation,
- "recommendation_reason": reason,
- }
- )
- attribution = pd.DataFrame(rows).sort_values(["veto_bucket", "buy_date"]).reset_index(drop=True)
- attribution.to_csv(base_dir / "dragon_glued_refined_removed_trade_attribution.csv", index=False, encoding="utf-8-sig")
- pf = _profit_factor(attribution["return_pct"])
- pf_text = "inf" if pf == float("inf") else f"{pf:.2f}"
- lines = [
- "# Dragon Glued Refined Removed-Trade Review",
- "",
- "## Snapshot",
- f"- removed trades vs current alpha-first: `{len(attribution)}`",
- f"- avg_return of removed set: `{_pct(float(attribution['return_pct'].mean()))}`",
- f"- win_rate of removed set: `{_pct(float((attribution['return_pct'] > 0).mean()))}`",
- f"- profit_factor of removed set: `{pf_text}`",
- "",
- "## Recommendation Mix",
- f"- KEEP_REMOVAL: `{int((attribution['recommendation'] == 'KEEP_REMOVAL').sum())}`",
- f"- OBSERVE_REMOVAL: `{int((attribution['recommendation'] == 'OBSERVE_REMOVAL').sum())}`",
- f"- OVER_REMOVAL: `{int((attribution['recommendation'] == 'OVER_REMOVAL').sum())}`",
- "",
- "## Bucket View",
- ]
- for bucket, group in attribution.groupby("veto_bucket", dropna=False):
- lines.append(
- f"- `{bucket}`: trades `{len(group)}`, avg_return `{_pct(float(group['return_pct'].mean()))}`, "
- f"win_rate `{_pct(float((group['return_pct'] > 0).mean()))}`, avg_mfe `{_pct(float(group['mfe_pct'].mean()))}`, "
- f"avg_mae `{_pct(float(group['mae_pct'].mean()))}`"
- )
- lines.extend(
- [
- "",
- "## Quant Judgment",
- "- The refined branch mostly removes weak short-holding glued trades rather than medium-quality alpha trades.",
- "- If this review remains dominated by KEEP_REMOVAL and contains no meaningful OVER_REMOVAL bucket, the branch is structurally explainable rather than a black-box overfit.",
- "",
- "## Detailed Cards",
- ]
- )
- for _, row in attribution.iterrows():
- replacement = "none"
- if isinstance(row["replacement_buy_date"], str) and row["replacement_buy_date"]:
- replacement = (
- f"{row['replacement_buy_date']} -> {row['replacement_sell_date']} / "
- f"{row['replacement_buy_reason']} -> {row['replacement_sell_reason']} / "
- f"{_pct(row['replacement_return_pct'])}"
- )
- lines.extend(
- [
- f"### {row['buy_date']} -> {row['sell_date']}",
- f"- Bucket: `{row['veto_bucket']}` | holding `{row['holding_bucket']}`",
- f"- Trade: `{row['buy_reason']} -> {row['sell_reason']}` | return `{_pct(row['return_pct'])}` | holding `{int(row['holding_days'])}` days",
- f"- MFE / MAE: `{_pct(row['mfe_pct'])}` / `{_pct(row['mae_pct'])}`",
- f"- Entry 5d / Exit followthrough 5d: `{_pct(row['entry_forward_5d_pct'])}` / `{_pct(row['exit_followthrough_5d_pct'])}`",
- f"- Entry indicators: `a1={float(row['buy_a1']):.4f}` `b1={float(row['buy_b1']):.4f}` `c1={float(row['buy_c1']):.2f}`",
- f"- Workbook aligned: buy `{bool(row['buy_aligned_with_workbook'])}` / sell `{bool(row['sell_aligned_with_workbook'])}`",
- f"- Replacement path within 10d: `{replacement}`",
- f"- Recommendation: `{row['recommendation']}` | {row['recommendation_reason']}",
- "",
- ]
- )
- (base_dir / "dragon_glued_refined_removed_trade_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|