| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- from __future__ import annotations
- from pathlib import Path
- import pandas as pd
- def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
- return pd.read_csv(base_dir / name, encoding="utf-8-sig")
- def _profit_factor(series: pd.Series) -> float:
- gross_profit = series[series > 0].sum()
- gross_loss = -series[series < 0].sum()
- if gross_loss == 0:
- return float("inf") if gross_profit > 0 else 0.0
- return float(gross_profit / gross_loss)
- def _holding_bucket(days: int) -> str:
- if days <= 5:
- return "00-05d"
- if days <= 10:
- return "06-10d"
- if days <= 20:
- return "11-20d"
- if days <= 40:
- return "21-40d"
- return "41d+"
- def _summarize(df: pd.DataFrame, group_cols: list[str]) -> pd.DataFrame:
- if group_cols:
- grouped = df.groupby(group_cols, dropna=False)
- else:
- grouped = df.groupby(lambda _: "ALL")
- rows: list[dict[str, object]] = []
- for key, group in grouped:
- if not isinstance(key, tuple):
- key = (key,)
- row = {col: val for col, val in zip(group_cols or ["scope"], key)}
- row["trades"] = int(len(group))
- row["win_rate"] = float((group["return_pct"] > 0).mean())
- row["avg_return"] = float(group["return_pct"].mean())
- row["median_return"] = float(group["return_pct"].median())
- row["profit_factor"] = _profit_factor(group["return_pct"])
- row["expectancy"] = float(group["return_pct"].mean())
- row["avg_holding_days"] = float(group["holding_days"].mean())
- row["avg_mfe_pct"] = float(group["mfe_pct"].mean())
- row["avg_mae_pct"] = float(group["mae_pct"].mean())
- row["avg_giveback_from_peak_pct"] = float(group["giveback_from_peak_pct"].mean())
- row["avg_exit_followthrough_5d_pct"] = float(group["exit_followthrough_5d_pct"].mean())
- row["avg_exit_rebound_5d_pct"] = float(group["exit_rebound_5d_pct"].mean())
- rows.append(row)
- return pd.DataFrame(rows)
- def _format_pct(value: float) -> str:
- if pd.isna(value):
- return "NA"
- if value == float("inf"):
- return "inf"
- return f"{value:.2%}"
- def _safe_value(df: pd.DataFrame, col: str) -> str:
- if df.empty:
- return "NA"
- value = df.iloc[0][col]
- if isinstance(value, float):
- return _format_pct(value) if "rate" in col or "return" in col or "pct" in col else f"{value:.2f}"
- return str(value)
- def _build_trade_quality(trades: pd.DataFrame, path_trace: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
- trades = trades.copy()
- path_trace = path_trace.copy()
- indicators = indicators.copy()
- trades["buy_dt"] = pd.to_datetime(trades["buy_date"])
- trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
- trades["sell_year"] = trades["sell_dt"].dt.year
- trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
- indicators["dt"] = pd.to_datetime(indicators["date"])
- indicators = indicators.sort_values("dt").reset_index(drop=True)
- pos_lookup = {dt.date().isoformat(): idx for idx, dt in enumerate(indicators["dt"])}
- mfe_list: list[float] = []
- mae_list: list[float] = []
- giveback_list: list[float] = []
- exit_followthrough_list: list[float] = []
- exit_rebound_list: list[float] = []
- for _, trade in trades.iterrows():
- buy_date = trade["buy_date"]
- sell_date = trade["sell_date"]
- entry_price = float(trade["buy_price"])
- exit_price = float(trade["sell_price"])
- window = indicators[(indicators["dt"] >= trade["buy_dt"]) & (indicators["dt"] <= trade["sell_dt"])]
- max_high = float(window["high"].max())
- min_low = float(window["low"].min())
- mfe_list.append(max_high / entry_price - 1.0)
- mae_list.append(min_low / entry_price - 1.0)
- giveback_list.append(exit_price / max_high - 1.0)
- sell_idx = pos_lookup.get(sell_date)
- if sell_idx is None:
- exit_followthrough_list.append(float("nan"))
- exit_rebound_list.append(float("nan"))
- continue
- future = indicators.iloc[sell_idx + 1 : sell_idx + 6]
- if future.empty:
- exit_followthrough_list.append(float("nan"))
- exit_rebound_list.append(float("nan"))
- continue
- exit_followthrough_list.append(float(future["low"].min()) / exit_price - 1.0)
- exit_rebound_list.append(float(future["high"].max()) / exit_price - 1.0)
- trades["mfe_pct"] = mfe_list
- trades["mae_pct"] = mae_list
- trades["giveback_from_peak_pct"] = giveback_list
- trades["exit_followthrough_5d_pct"] = exit_followthrough_list
- trades["exit_rebound_5d_pct"] = exit_rebound_list
- merge_cols = [
- "buy_date",
- "sell_date",
- "market_state_layer",
- "entry_qualification_layer",
- "position_management_layer",
- "aux_context_layer",
- "aux_signal_count",
- "hold_aux_buy_count",
- "post_exit_aux_sell_count",
- "next_buy_date",
- "layer_path",
- ]
- return trades.merge(path_trace[merge_cols], on=["buy_date", "sell_date"], how="left")
- def _build_rule_stability(df: pd.DataFrame) -> pd.DataFrame:
- baseline = {
- "trades": int(len(df)),
- "win_rate": float((df["return_pct"] > 0).mean()),
- "avg_return": float(df["return_pct"].mean()),
- "profit_factor": _profit_factor(df["return_pct"]),
- }
- rows: list[dict[str, object]] = []
- for rule_type, col in [("entry_rule", "buy_reason"), ("exit_rule", "sell_reason")]:
- for rule_name, group in df.groupby(col):
- remaining = df[df[col] != rule_name]
- row = {
- "rule_type": rule_type,
- "rule_name": rule_name,
- "removed_trades": int(len(group)),
- "remaining_trades": int(len(remaining)),
- "baseline_trades": baseline["trades"],
- "baseline_win_rate": baseline["win_rate"],
- "baseline_avg_return": baseline["avg_return"],
- "baseline_profit_factor": baseline["profit_factor"],
- }
- if remaining.empty:
- row["remaining_win_rate"] = float("nan")
- row["remaining_avg_return"] = float("nan")
- row["remaining_profit_factor"] = float("nan")
- else:
- row["remaining_win_rate"] = float((remaining["return_pct"] > 0).mean())
- row["remaining_avg_return"] = float(remaining["return_pct"].mean())
- row["remaining_profit_factor"] = _profit_factor(remaining["return_pct"])
- row["delta_win_rate"] = row["remaining_win_rate"] - baseline["win_rate"]
- row["delta_avg_return"] = row["remaining_avg_return"] - baseline["avg_return"]
- row["delta_profit_factor"] = row["remaining_profit_factor"] - baseline["profit_factor"]
- rows.append(row)
- return pd.DataFrame(rows)
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- trades = _load_csv(base_dir, "dragon_strategy_trades.csv")
- path_trace = _load_csv(base_dir, "dragon_trade_path_trace.csv")
- indicators = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
- quality = _build_trade_quality(trades, path_trace, indicators)
- quality.to_csv(base_dir / "dragon_trade_quality.csv", index=False, encoding="utf-8-sig")
- baseline_summary = _summarize(quality, [])
- holding_summary = _summarize(quality, ["holding_bucket"]).sort_values("holding_bucket")
- yearly_summary = _summarize(quality, ["sell_year"]).sort_values("sell_year")
- state_summary = _summarize(quality, ["market_state_layer"]).sort_values("trades", ascending=False)
- entry_summary = _summarize(quality, ["buy_reason"]).sort_values("trades", ascending=False)
- exit_summary = _summarize(quality, ["sell_reason"]).sort_values("trades", ascending=False)
- path_summary = _summarize(quality, ["market_state_layer", "entry_qualification_layer", "position_management_layer"]).sort_values(
- "trades", ascending=False
- )
- split_summary = _summarize(
- quality.assign(sample_split=quality["sell_year"].apply(lambda x: "2016-2020" if x <= 2020 else "2021-2025")),
- ["sample_split"],
- ).sort_values("sample_split")
- stability = _build_rule_stability(quality).sort_values(["rule_type", "delta_avg_return"])
- group_frames = []
- for group_type, df in [
- ("holding_bucket", holding_summary),
- ("sell_year", yearly_summary),
- ("market_state_layer", state_summary),
- ("buy_reason", entry_summary),
- ("sell_reason", exit_summary),
- ("path_core", path_summary),
- ("sample_split", split_summary),
- ]:
- group_frames.append(df.assign(group_type=group_type))
- group_summary = pd.concat(group_frames, ignore_index=True, sort=False)
- group_summary.to_csv(base_dir / "dragon_trade_group_summary.csv", index=False, encoding="utf-8-sig")
- yearly_summary.to_csv(base_dir / "dragon_yearly_performance.csv", index=False, encoding="utf-8-sig")
- entry_summary.assign(rule_type="entry_rule").to_csv(
- base_dir / "dragon_rule_contribution_entry.csv", index=False, encoding="utf-8-sig"
- )
- exit_summary.assign(rule_type="exit_rule").to_csv(
- base_dir / "dragon_rule_contribution_exit.csv", index=False, encoding="utf-8-sig"
- )
- stability.to_csv(base_dir / "dragon_rule_stability.csv", index=False, encoding="utf-8-sig")
- best_entry = entry_summary[entry_summary["trades"] >= 3].sort_values("avg_return", ascending=False).head(3)
- weakest_entry = entry_summary[entry_summary["trades"] >= 3].sort_values("avg_return", ascending=True).head(3)
- best_exit = exit_summary[exit_summary["trades"] >= 3].sort_values("avg_exit_followthrough_5d_pct").head(3)
- weakest_exit = exit_summary[exit_summary["trades"] >= 3].sort_values("avg_exit_followthrough_5d_pct", ascending=False).head(3)
- worst_rule_removal = stability.sort_values("delta_avg_return").head(5)
- best_rule_removal = stability.sort_values("delta_avg_return", ascending=False).head(5)
- lines = [
- "# Dragon Robustness Report",
- "",
- "## Baseline",
- f"- trades: `{int(baseline_summary.iloc[0]['trades'])}`",
- f"- win_rate: `{_format_pct(float(baseline_summary.iloc[0]['win_rate']))}`",
- f"- avg_return: `{_format_pct(float(baseline_summary.iloc[0]['avg_return']))}`",
- f"- median_return: `{_format_pct(float(baseline_summary.iloc[0]['median_return']))}`",
- f"- profit_factor: `{baseline_summary.iloc[0]['profit_factor']:.2f}`",
- f"- avg_mfe: `{_format_pct(float(baseline_summary.iloc[0]['avg_mfe_pct']))}`",
- f"- avg_mae: `{_format_pct(float(baseline_summary.iloc[0]['avg_mae_pct']))}`",
- f"- avg_exit_followthrough_5d: `{_format_pct(float(baseline_summary.iloc[0]['avg_exit_followthrough_5d_pct']))}`",
- "",
- "## Holding-Bucket View",
- ]
- for _, row in holding_summary.iterrows():
- lines.append(
- f"- `{row['holding_bucket']}`: trades `{int(row['trades'])}`, win_rate `{_format_pct(float(row['win_rate']))}`, "
- f"avg_return `{_format_pct(float(row['avg_return']))}`, avg_mfe `{_format_pct(float(row['avg_mfe_pct']))}`, "
- f"avg_mae `{_format_pct(float(row['avg_mae_pct']))}`"
- )
- lines.extend(["", "## Yearly View"])
- for _, row in yearly_summary.iterrows():
- lines.append(
- f"- `{int(row['sell_year'])}`: trades `{int(row['trades'])}`, win_rate `{_format_pct(float(row['win_rate']))}`, "
- f"avg_return `{_format_pct(float(row['avg_return']))}`, profit_factor `{row['profit_factor']:.2f}`"
- )
- lines.extend(["", "## Sample Split"])
- for _, row in split_summary.iterrows():
- lines.append(
- f"- `{row['sample_split']}`: trades `{int(row['trades'])}`, win_rate `{_format_pct(float(row['win_rate']))}`, "
- f"avg_return `{_format_pct(float(row['avg_return']))}`, profit_factor `{row['profit_factor']:.2f}`"
- )
- lines.extend(["", "## Regime View"])
- for _, row in state_summary.iterrows():
- lines.append(
- f"- `{row['market_state_layer']}`: trades `{int(row['trades'])}`, avg_return `{_format_pct(float(row['avg_return']))}`, "
- f"profit_factor `{row['profit_factor']:.2f}`, avg_mae `{_format_pct(float(row['avg_mae_pct']))}`"
- )
- lines.extend(["", "## Best Entry Rules"])
- for _, row in best_entry.iterrows():
- lines.append(
- f"- `{row['buy_reason']}`: trades `{int(row['trades'])}`, avg_return `{_format_pct(float(row['avg_return']))}`, "
- f"win_rate `{_format_pct(float(row['win_rate']))}`, avg_mfe `{_format_pct(float(row['avg_mfe_pct']))}`"
- )
- lines.extend(["", "## Weakest Entry Rules"])
- for _, row in weakest_entry.iterrows():
- lines.append(
- f"- `{row['buy_reason']}`: trades `{int(row['trades'])}`, avg_return `{_format_pct(float(row['avg_return']))}`, "
- f"win_rate `{_format_pct(float(row['win_rate']))}`, avg_mae `{_format_pct(float(row['avg_mae_pct']))}`"
- )
- lines.extend(["", "## Best Exit Rules"])
- for _, row in best_exit.iterrows():
- lines.append(
- f"- `{row['sell_reason']}`: trades `{int(row['trades'])}`, avg_exit_followthrough_5d `{_format_pct(float(row['avg_exit_followthrough_5d_pct']))}`, "
- f"avg_return `{_format_pct(float(row['avg_return']))}`"
- )
- lines.extend(["", "## Weakest Exit Rules"])
- for _, row in weakest_exit.iterrows():
- lines.append(
- f"- `{row['sell_reason']}`: trades `{int(row['trades'])}`, avg_exit_followthrough_5d `{_format_pct(float(row['avg_exit_followthrough_5d_pct']))}`, "
- f"avg_return `{_format_pct(float(row['avg_return']))}`"
- )
- lines.extend(["", "## Realized Contribution Stress Test"])
- lines.append("- Interpretation: this removes realized trades by rule from the current trade set; it is not yet a full re-run stability test.")
- lines.append("- Worst removals for average return:")
- for _, row in worst_rule_removal.iterrows():
- lines.append(
- f"- `{row['rule_type']} / {row['rule_name']}`: removed `{int(row['removed_trades'])}` trades, "
- f"delta_avg_return `{_format_pct(float(row['delta_avg_return']))}`, delta_profit_factor `{row['delta_profit_factor']:.2f}`"
- )
- lines.append("- Best removals for average return:")
- for _, row in best_rule_removal.iterrows():
- lines.append(
- f"- `{row['rule_type']} / {row['rule_name']}`: removed `{int(row['removed_trades'])}` trades, "
- f"delta_avg_return `{_format_pct(float(row['delta_avg_return']))}`, delta_profit_factor `{row['delta_profit_factor']:.2f}`"
- )
- lines.extend(
- [
- "",
- "## Next Stage-3 Gaps",
- "- Threshold perturbation is not yet formalized because the current strategy logic is still hard-coded, not parameterized.",
- "- A true leave-one-rule-out stability test still needs rerun-able switches in `dragon_strategy.py` rather than ex-post trade deletion only.",
- ]
- )
- (base_dir / "dragon_robustness_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|