dragon_walk_forward_validation.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. from __future__ import annotations
  2. from pathlib import Path
  3. import pandas as pd
  4. def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
  5. return pd.read_csv(base_dir / name, encoding="utf-8-sig")
  6. def _profit_factor(series: pd.Series) -> float:
  7. gross_profit = series[series > 0].sum()
  8. gross_loss = -series[series < 0].sum()
  9. if gross_loss == 0:
  10. return float("inf") if gross_profit > 0 else 0.0
  11. return float(gross_profit / gross_loss)
  12. def _max_drawdown(series: pd.Series) -> float:
  13. if series.empty:
  14. return float("nan")
  15. equity = (1.0 + series).cumprod()
  16. running_max = equity.cummax()
  17. drawdown = equity / running_max - 1.0
  18. return float(drawdown.min())
  19. def _segment_stats(df: pd.DataFrame) -> dict[str, float | int]:
  20. if df.empty:
  21. return {
  22. "trades": 0,
  23. "win_rate": float("nan"),
  24. "avg_return": float("nan"),
  25. "median_return": float("nan"),
  26. "profit_factor": float("nan"),
  27. "compounded_return": float("nan"),
  28. "max_drawdown": float("nan"),
  29. }
  30. returns = df["return_pct"].astype(float)
  31. return {
  32. "trades": int(len(df)),
  33. "win_rate": float((returns > 0).mean()),
  34. "avg_return": float(returns.mean()),
  35. "median_return": float(returns.median()),
  36. "profit_factor": _profit_factor(returns),
  37. "compounded_return": float((1.0 + returns).prod() - 1.0),
  38. "max_drawdown": _max_drawdown(returns),
  39. }
  40. def _format_pct(value: float) -> str:
  41. if pd.isna(value):
  42. return "NA"
  43. if value == float("inf"):
  44. return "inf"
  45. return f"{value:.2%}"
  46. def _format_num(value: float) -> str:
  47. if pd.isna(value):
  48. return "NA"
  49. if value == float("inf"):
  50. return "inf"
  51. return f"{value:.2f}"
  52. def _build_walk_forward(trades: pd.DataFrame) -> pd.DataFrame:
  53. years = sorted(int(year) for year in trades["sell_year"].unique())
  54. rows: list[dict[str, object]] = []
  55. for idx, test_year in enumerate(years):
  56. if idx >= 1:
  57. train_years = years[:idx]
  58. train_df = trades[trades["sell_year"].isin(train_years)]
  59. test_df = trades[trades["sell_year"] == test_year]
  60. train_stats = _segment_stats(train_df)
  61. test_stats = _segment_stats(test_df)
  62. rows.append(
  63. {
  64. "scheme": "anchored_expanding",
  65. "train_start_year": train_years[0],
  66. "train_end_year": train_years[-1],
  67. "test_year": test_year,
  68. **{f"train_{k}": v for k, v in train_stats.items()},
  69. **{f"test_{k}": v for k, v in test_stats.items()},
  70. }
  71. )
  72. if idx >= 3:
  73. train_years = years[idx - 3 : idx]
  74. train_df = trades[trades["sell_year"].isin(train_years)]
  75. test_df = trades[trades["sell_year"] == test_year]
  76. train_stats = _segment_stats(train_df)
  77. test_stats = _segment_stats(test_df)
  78. rows.append(
  79. {
  80. "scheme": "rolling_3y",
  81. "train_start_year": train_years[0],
  82. "train_end_year": train_years[-1],
  83. "test_year": test_year,
  84. **{f"train_{k}": v for k, v in train_stats.items()},
  85. **{f"test_{k}": v for k, v in test_stats.items()},
  86. }
  87. )
  88. return pd.DataFrame(rows)
  89. def _build_family_stability(trades: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
  90. df = trades.copy()
  91. df["entry_family"] = df["buy_reason"].astype(str).str.split(":").str[0]
  92. family_year = (
  93. df.groupby(["entry_family", "sell_year"], dropna=False)
  94. .apply(
  95. lambda g: pd.Series(
  96. {
  97. "trades": int(len(g)),
  98. "win_rate": float((g["return_pct"] > 0).mean()),
  99. "avg_return": float(g["return_pct"].mean()),
  100. "profit_factor": _profit_factor(g["return_pct"]),
  101. "compounded_return": float((1.0 + g["return_pct"]).prod() - 1.0),
  102. }
  103. )
  104. )
  105. .reset_index()
  106. )
  107. eligible_families = (
  108. df.groupby("entry_family")
  109. .size()
  110. .reset_index(name="total_trades")
  111. .query("total_trades >= 3")["entry_family"]
  112. .tolist()
  113. )
  114. family_year = family_year[family_year["entry_family"].isin(eligible_families)].copy()
  115. family_summary = (
  116. family_year.groupby("entry_family", dropna=False)
  117. .apply(
  118. lambda g: pd.Series(
  119. {
  120. "years_active": int(len(g)),
  121. "total_trades": int(g["trades"].sum()),
  122. "positive_years": int((g["avg_return"] > 0).sum()),
  123. "negative_years": int((g["avg_return"] < 0).sum()),
  124. "avg_yearly_avg_return": float(g["avg_return"].mean()),
  125. "min_yearly_avg_return": float(g["avg_return"].min()),
  126. "max_yearly_avg_return": float(g["avg_return"].max()),
  127. }
  128. )
  129. )
  130. .reset_index()
  131. .sort_values(["avg_yearly_avg_return", "total_trades"], ascending=[False, False])
  132. )
  133. return family_year, family_summary
  134. def main() -> None:
  135. base_dir = Path(__file__).resolve().parent
  136. trades = _load_csv(base_dir, "dragon_strategy_trades.csv").copy()
  137. trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
  138. trades["sell_year"] = trades["sell_dt"].dt.year.astype(int)
  139. walk_forward = _build_walk_forward(trades)
  140. family_year, family_summary = _build_family_stability(trades)
  141. walk_forward.to_csv(base_dir / "dragon_walk_forward_summary.csv", index=False, encoding="utf-8-sig")
  142. family_year.to_csv(base_dir / "dragon_walk_forward_family_year.csv", index=False, encoding="utf-8-sig")
  143. family_summary.to_csv(base_dir / "dragon_walk_forward_family_stability.csv", index=False, encoding="utf-8-sig")
  144. anchored = walk_forward[walk_forward["scheme"] == "anchored_expanding"].copy()
  145. rolling = walk_forward[walk_forward["scheme"] == "rolling_3y"].copy()
  146. lines = [
  147. "# Dragon Walk-Forward Validation",
  148. "",
  149. "- Method: fixed current baseline rules, no refit, evaluate temporal stability by yearly out-of-sample slices.",
  150. "- Goal: verify whether the workbook-preserving baseline still behaves coherently outside any single full-sample summary.",
  151. "",
  152. "## Anchored Expanding Windows",
  153. ]
  154. for _, row in anchored.iterrows():
  155. lines.append(
  156. f"- train `{int(row['train_start_year'])}-{int(row['train_end_year'])}` -> test `{int(row['test_year'])}`: "
  157. f"test trades `{int(row['test_trades'])}`, test avg_return `{_format_pct(float(row['test_avg_return']))}`, "
  158. f"test profit_factor `{_format_num(float(row['test_profit_factor']))}`, "
  159. f"test compounded_return `{_format_pct(float(row['test_compounded_return']))}`, "
  160. f"test max_drawdown `{_format_pct(float(row['test_max_drawdown']))}`"
  161. )
  162. lines.extend(["", "## Rolling 3Y Windows"])
  163. for _, row in rolling.iterrows():
  164. lines.append(
  165. f"- train `{int(row['train_start_year'])}-{int(row['train_end_year'])}` -> test `{int(row['test_year'])}`: "
  166. f"test trades `{int(row['test_trades'])}`, test avg_return `{_format_pct(float(row['test_avg_return']))}`, "
  167. f"test profit_factor `{_format_num(float(row['test_profit_factor']))}`, "
  168. f"test compounded_return `{_format_pct(float(row['test_compounded_return']))}`, "
  169. f"test max_drawdown `{_format_pct(float(row['test_max_drawdown']))}`"
  170. )
  171. lines.extend(["", "## Entry-Family Stability"])
  172. for _, row in family_summary.head(8).iterrows():
  173. lines.append(
  174. f"- `{row['entry_family']}`: years_active `{int(row['years_active'])}`, total_trades `{int(row['total_trades'])}`, "
  175. f"positive_years `{int(row['positive_years'])}`, negative_years `{int(row['negative_years'])}`, "
  176. f"avg_yearly_avg_return `{_format_pct(float(row['avg_yearly_avg_return']))}`, "
  177. f"min_yearly_avg_return `{_format_pct(float(row['min_yearly_avg_return']))}`"
  178. )
  179. weakest = family_summary.sort_values(["avg_yearly_avg_return", "min_yearly_avg_return"]).head(5)
  180. lines.extend(["", "## Weak Entry-Family Stability"])
  181. for _, row in weakest.iterrows():
  182. lines.append(
  183. f"- `{row['entry_family']}`: years_active `{int(row['years_active'])}`, total_trades `{int(row['total_trades'])}`, "
  184. f"positive_years `{int(row['positive_years'])}`, negative_years `{int(row['negative_years'])}`, "
  185. f"avg_yearly_avg_return `{_format_pct(float(row['avg_yearly_avg_return']))}`, "
  186. f"min_yearly_avg_return `{_format_pct(float(row['min_yearly_avg_return']))}`"
  187. )
  188. positive_anchored = int((anchored["test_avg_return"] > 0).sum()) if not anchored.empty else 0
  189. negative_anchored = int((anchored["test_avg_return"] < 0).sum()) if not anchored.empty else 0
  190. positive_rolling = int((rolling["test_avg_return"] > 0).sum()) if not rolling.empty else 0
  191. negative_rolling = int((rolling["test_avg_return"] < 0).sum()) if not rolling.empty else 0
  192. lines.extend(
  193. [
  194. "",
  195. "## Quant Judgment",
  196. f"- Anchored walk-forward windows: positive years `{positive_anchored}`, negative years `{negative_anchored}`.",
  197. f"- Rolling 3Y windows: positive years `{positive_rolling}`, negative years `{negative_rolling}`.",
  198. "- This is a stability audit, not a parameter-search walk-forward. The strategy was held fixed throughout.",
  199. "- Families with repeated negative yearly averages are research candidates; families with broad multi-year persistence are baseline keepers.",
  200. ]
  201. )
  202. (base_dir / "dragon_walk_forward_report.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  203. if __name__ == "__main__":
  204. main()