from __future__ import annotations from pathlib import Path import pandas as pd def _load_csv(base_dir: Path, name: str) -> pd.DataFrame: return pd.read_csv(base_dir / name, encoding="utf-8-sig") def _profit_factor(series: pd.Series) -> float: gross_profit = series[series > 0].sum() gross_loss = -series[series < 0].sum() if gross_loss == 0: return float("inf") if gross_profit > 0 else 0.0 return float(gross_profit / gross_loss) def _max_drawdown(series: pd.Series) -> float: if series.empty: return float("nan") equity = (1.0 + series).cumprod() running_max = equity.cummax() drawdown = equity / running_max - 1.0 return float(drawdown.min()) def _segment_stats(df: pd.DataFrame) -> dict[str, float | int]: if df.empty: return { "trades": 0, "win_rate": float("nan"), "avg_return": float("nan"), "median_return": float("nan"), "profit_factor": float("nan"), "compounded_return": float("nan"), "max_drawdown": float("nan"), } returns = df["return_pct"].astype(float) return { "trades": int(len(df)), "win_rate": float((returns > 0).mean()), "avg_return": float(returns.mean()), "median_return": float(returns.median()), "profit_factor": _profit_factor(returns), "compounded_return": float((1.0 + returns).prod() - 1.0), "max_drawdown": _max_drawdown(returns), } def _format_pct(value: float) -> str: if pd.isna(value): return "NA" if value == float("inf"): return "inf" return f"{value:.2%}" def _format_num(value: float) -> str: if pd.isna(value): return "NA" if value == float("inf"): return "inf" return f"{value:.2f}" def _build_walk_forward(trades: pd.DataFrame) -> pd.DataFrame: years = sorted(int(year) for year in trades["sell_year"].unique()) rows: list[dict[str, object]] = [] for idx, test_year in enumerate(years): if idx >= 1: train_years = years[:idx] train_df = trades[trades["sell_year"].isin(train_years)] test_df = trades[trades["sell_year"] == test_year] train_stats = _segment_stats(train_df) test_stats = _segment_stats(test_df) rows.append( { "scheme": "anchored_expanding", "train_start_year": train_years[0], "train_end_year": train_years[-1], "test_year": test_year, **{f"train_{k}": v for k, v in train_stats.items()}, **{f"test_{k}": v for k, v in test_stats.items()}, } ) if idx >= 3: train_years = years[idx - 3 : idx] train_df = trades[trades["sell_year"].isin(train_years)] test_df = trades[trades["sell_year"] == test_year] train_stats = _segment_stats(train_df) test_stats = _segment_stats(test_df) rows.append( { "scheme": "rolling_3y", "train_start_year": train_years[0], "train_end_year": train_years[-1], "test_year": test_year, **{f"train_{k}": v for k, v in train_stats.items()}, **{f"test_{k}": v for k, v in test_stats.items()}, } ) return pd.DataFrame(rows) def _build_family_stability(trades: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]: df = trades.copy() df["entry_family"] = df["buy_reason"].astype(str).str.split(":").str[0] family_year = ( df.groupby(["entry_family", "sell_year"], dropna=False) .apply( lambda g: pd.Series( { "trades": int(len(g)), "win_rate": float((g["return_pct"] > 0).mean()), "avg_return": float(g["return_pct"].mean()), "profit_factor": _profit_factor(g["return_pct"]), "compounded_return": float((1.0 + g["return_pct"]).prod() - 1.0), } ) ) .reset_index() ) eligible_families = ( df.groupby("entry_family") .size() .reset_index(name="total_trades") .query("total_trades >= 3")["entry_family"] .tolist() ) family_year = family_year[family_year["entry_family"].isin(eligible_families)].copy() family_summary = ( family_year.groupby("entry_family", dropna=False) .apply( lambda g: pd.Series( { "years_active": int(len(g)), "total_trades": int(g["trades"].sum()), "positive_years": int((g["avg_return"] > 0).sum()), "negative_years": int((g["avg_return"] < 0).sum()), "avg_yearly_avg_return": float(g["avg_return"].mean()), "min_yearly_avg_return": float(g["avg_return"].min()), "max_yearly_avg_return": float(g["avg_return"].max()), } ) ) .reset_index() .sort_values(["avg_yearly_avg_return", "total_trades"], ascending=[False, False]) ) return family_year, family_summary def main() -> None: base_dir = Path(__file__).resolve().parent trades = _load_csv(base_dir, "dragon_strategy_trades.csv").copy() trades["sell_dt"] = pd.to_datetime(trades["sell_date"]) trades["sell_year"] = trades["sell_dt"].dt.year.astype(int) walk_forward = _build_walk_forward(trades) family_year, family_summary = _build_family_stability(trades) walk_forward.to_csv(base_dir / "dragon_walk_forward_summary.csv", index=False, encoding="utf-8-sig") family_year.to_csv(base_dir / "dragon_walk_forward_family_year.csv", index=False, encoding="utf-8-sig") family_summary.to_csv(base_dir / "dragon_walk_forward_family_stability.csv", index=False, encoding="utf-8-sig") anchored = walk_forward[walk_forward["scheme"] == "anchored_expanding"].copy() rolling = walk_forward[walk_forward["scheme"] == "rolling_3y"].copy() lines = [ "# Dragon Walk-Forward Validation", "", "- Method: fixed current baseline rules, no refit, evaluate temporal stability by yearly out-of-sample slices.", "- Goal: verify whether the workbook-preserving baseline still behaves coherently outside any single full-sample summary.", "", "## Anchored Expanding Windows", ] for _, row in anchored.iterrows(): lines.append( f"- train `{int(row['train_start_year'])}-{int(row['train_end_year'])}` -> test `{int(row['test_year'])}`: " f"test trades `{int(row['test_trades'])}`, test avg_return `{_format_pct(float(row['test_avg_return']))}`, " f"test profit_factor `{_format_num(float(row['test_profit_factor']))}`, " f"test compounded_return `{_format_pct(float(row['test_compounded_return']))}`, " f"test max_drawdown `{_format_pct(float(row['test_max_drawdown']))}`" ) lines.extend(["", "## Rolling 3Y Windows"]) for _, row in rolling.iterrows(): lines.append( f"- train `{int(row['train_start_year'])}-{int(row['train_end_year'])}` -> test `{int(row['test_year'])}`: " f"test trades `{int(row['test_trades'])}`, test avg_return `{_format_pct(float(row['test_avg_return']))}`, " f"test profit_factor `{_format_num(float(row['test_profit_factor']))}`, " f"test compounded_return `{_format_pct(float(row['test_compounded_return']))}`, " f"test max_drawdown `{_format_pct(float(row['test_max_drawdown']))}`" ) lines.extend(["", "## Entry-Family Stability"]) for _, row in family_summary.head(8).iterrows(): lines.append( f"- `{row['entry_family']}`: years_active `{int(row['years_active'])}`, total_trades `{int(row['total_trades'])}`, " f"positive_years `{int(row['positive_years'])}`, negative_years `{int(row['negative_years'])}`, " f"avg_yearly_avg_return `{_format_pct(float(row['avg_yearly_avg_return']))}`, " f"min_yearly_avg_return `{_format_pct(float(row['min_yearly_avg_return']))}`" ) weakest = family_summary.sort_values(["avg_yearly_avg_return", "min_yearly_avg_return"]).head(5) lines.extend(["", "## Weak Entry-Family Stability"]) for _, row in weakest.iterrows(): lines.append( f"- `{row['entry_family']}`: years_active `{int(row['years_active'])}`, total_trades `{int(row['total_trades'])}`, " f"positive_years `{int(row['positive_years'])}`, negative_years `{int(row['negative_years'])}`, " f"avg_yearly_avg_return `{_format_pct(float(row['avg_yearly_avg_return']))}`, " f"min_yearly_avg_return `{_format_pct(float(row['min_yearly_avg_return']))}`" ) positive_anchored = int((anchored["test_avg_return"] > 0).sum()) if not anchored.empty else 0 negative_anchored = int((anchored["test_avg_return"] < 0).sum()) if not anchored.empty else 0 positive_rolling = int((rolling["test_avg_return"] > 0).sum()) if not rolling.empty else 0 negative_rolling = int((rolling["test_avg_return"] < 0).sum()) if not rolling.empty else 0 lines.extend( [ "", "## Quant Judgment", f"- Anchored walk-forward windows: positive years `{positive_anchored}`, negative years `{negative_anchored}`.", f"- Rolling 3Y windows: positive years `{positive_rolling}`, negative years `{negative_rolling}`.", "- This is a stability audit, not a parameter-search walk-forward. The strategy was held fixed throughout.", "- Families with repeated negative yearly averages are research candidates; families with broad multi-year persistence are baseline keepers.", ] ) (base_dir / "dragon_walk_forward_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()