dragon_threshold_perturbation.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from __future__ import annotations
  2. from pathlib import Path
  3. import pandas as pd
  4. from dragon_indicators import DragonIndicatorConfig, DragonIndicatorEngine
  5. from dragon_strategy import DragonRuleEngine
  6. from dragon_strategy_config import StrategyConfig
  7. from dragon_workbook import DragonWorkbook
  8. def _find_workbook(base_dir: Path) -> Path:
  9. matches = sorted(base_dir.glob("*.xlsx"))
  10. if not matches:
  11. raise FileNotFoundError(f"No workbook found in {base_dir}")
  12. return matches[0]
  13. def _load_workbook_events(workbook_path: Path) -> pd.DataFrame:
  14. workbook = DragonWorkbook(workbook_path)
  15. return pd.DataFrame(
  16. [
  17. {
  18. "date": event.date.isoformat(),
  19. "side": event.side,
  20. "layer": event.layer,
  21. }
  22. for event in workbook.split_layers()
  23. ]
  24. )
  25. def _event_overlap(workbook_events: pd.DataFrame, strategy_events: pd.DataFrame, side: str, layer: str) -> tuple[int, int, int]:
  26. wb = set(workbook_events[(workbook_events["side"] == side) & (workbook_events["layer"] == layer)]["date"])
  27. st = set(strategy_events[(strategy_events["side"] == side) & (strategy_events["layer"] == layer)]["date"])
  28. hit = wb & st
  29. return len(hit), len(wb - st), len(st - wb)
  30. def _profit_factor(series: pd.Series) -> float:
  31. gross_profit = series[series > 0].sum()
  32. gross_loss = -series[series < 0].sum()
  33. if gross_loss == 0:
  34. return float("inf") if gross_profit > 0 else 0.0
  35. return float(gross_profit / gross_loss)
  36. def _run_experiment(
  37. label: str,
  38. parameter: str,
  39. variant: str,
  40. config: StrategyConfig,
  41. workbook_events: pd.DataFrame,
  42. indicator_df: pd.DataFrame,
  43. first_workbook_date: str,
  44. last_workbook_date: str,
  45. ) -> dict[str, object]:
  46. strategy = DragonRuleEngine(config=config)
  47. events, trades = strategy.run(indicator_df)
  48. events = events[(events["date"] >= first_workbook_date) & (events["date"] <= last_workbook_date)].copy()
  49. trades = trades[
  50. (trades["buy_date"] >= first_workbook_date)
  51. & (trades["buy_date"] <= last_workbook_date)
  52. & (trades["sell_date"] >= first_workbook_date)
  53. & (trades["sell_date"] <= last_workbook_date)
  54. ].copy()
  55. real_buy_overlap, real_buy_missing, real_buy_extra = _event_overlap(workbook_events, events, "BUY", "real_trade")
  56. real_sell_overlap, real_sell_missing, real_sell_extra = _event_overlap(workbook_events, events, "SELL", "real_trade")
  57. aux_sell_overlap, aux_sell_missing, aux_sell_extra = _event_overlap(workbook_events, events, "SELL", "aux_signal")
  58. return {
  59. "experiment": label,
  60. "parameter": parameter,
  61. "variant": variant,
  62. "value": getattr(config, parameter) if parameter != "baseline" else "baseline",
  63. "trades": int(len(trades)),
  64. "win_rate": float((trades["return_pct"] > 0).mean()) if not trades.empty else float("nan"),
  65. "avg_return": float(trades["return_pct"].mean()) if not trades.empty else float("nan"),
  66. "profit_factor": _profit_factor(trades["return_pct"]) if not trades.empty else float("nan"),
  67. "real_buy_overlap": int(real_buy_overlap),
  68. "real_buy_missing": int(real_buy_missing),
  69. "real_buy_extra": int(real_buy_extra),
  70. "real_sell_overlap": int(real_sell_overlap),
  71. "real_sell_missing": int(real_sell_missing),
  72. "real_sell_extra": int(real_sell_extra),
  73. "aux_sell_overlap": int(aux_sell_overlap),
  74. "aux_sell_missing": int(aux_sell_missing),
  75. "aux_sell_extra": int(aux_sell_extra),
  76. }
  77. def main() -> None:
  78. base_dir = Path(__file__).resolve().parent
  79. workbook_path = _find_workbook(base_dir)
  80. workbook_events = _load_workbook_events(workbook_path)
  81. first_workbook_date = pd.to_datetime(workbook_events["date"]).min().date().isoformat()
  82. last_workbook_date = pd.to_datetime(workbook_events["date"]).max().date().isoformat()
  83. engine = DragonIndicatorEngine(DragonIndicatorConfig(start_date="2015-01-01", end_date="2026-01-31"))
  84. indicator_df = engine.compute(engine.fetch_daily_data())
  85. baseline = StrategyConfig()
  86. perturbations = {
  87. "post_exit_confirmation_window_days": [8, 10, 12],
  88. "aux_sell_high_zone_kdj_only_block_c1": [83.0, 85.0, 87.0],
  89. "glued_high_weak_rebound_high_c1": [66.0, 68.0, 70.0],
  90. "glued_high_weak_rebound_high_b1": [-0.10, -0.08, -0.06],
  91. "deep_oversold_entry_c1_max": [15.0, 16.0, 17.0],
  92. "deep_oversold_entry_b1_min": [-0.12, -0.10, -0.08],
  93. "predictive_b1_break_short_b1_max": [-0.15, -0.13, -0.11],
  94. "predictive_b1_break_long_b1_max": [-0.14, -0.12, -0.10],
  95. }
  96. rows = [
  97. _run_experiment(
  98. "baseline",
  99. "baseline",
  100. "baseline",
  101. baseline,
  102. workbook_events,
  103. indicator_df,
  104. first_workbook_date,
  105. last_workbook_date,
  106. )
  107. ]
  108. for parameter, values in perturbations.items():
  109. for idx, value in enumerate(values):
  110. variant = "lower" if idx == 0 else "baseline" if idx == 1 else "upper"
  111. config = baseline.with_updates(**{parameter: value})
  112. rows.append(
  113. _run_experiment(
  114. f"{parameter}:{variant}",
  115. parameter,
  116. variant,
  117. config,
  118. workbook_events,
  119. indicator_df,
  120. first_workbook_date,
  121. last_workbook_date,
  122. )
  123. )
  124. result_df = pd.DataFrame(rows)
  125. baseline_row = result_df[result_df["experiment"] == "baseline"].iloc[0]
  126. for col in ["trades", "win_rate", "avg_return", "profit_factor", "real_buy_overlap", "real_sell_overlap", "aux_sell_overlap"]:
  127. result_df[f"delta_{col}"] = result_df[col] - baseline_row[col]
  128. result_df.to_csv(base_dir / "dragon_threshold_perturbation.csv", index=False, encoding="utf-8-sig")
  129. summary_rows: list[dict[str, object]] = []
  130. for parameter in perturbations:
  131. subset = result_df[result_df["parameter"] == parameter].copy()
  132. summary_rows.append(
  133. {
  134. "parameter": parameter,
  135. "baseline_value": subset[subset["variant"] == "baseline"].iloc[0]["value"],
  136. "real_buy_overlap_min": int(subset["real_buy_overlap"].min()),
  137. "real_sell_overlap_min": int(subset["real_sell_overlap"].min()),
  138. "avg_return_range": float(subset["avg_return"].max() - subset["avg_return"].min()),
  139. "profit_factor_range": float(subset["profit_factor"].max() - subset["profit_factor"].min()),
  140. "aux_sell_overlap_range": int(subset["aux_sell_overlap"].max() - subset["aux_sell_overlap"].min()),
  141. "stable_real_alignment": bool(
  142. (subset["real_buy_overlap"] == baseline_row["real_buy_overlap"]).all()
  143. and (subset["real_sell_overlap"] == baseline_row["real_sell_overlap"]).all()
  144. ),
  145. }
  146. )
  147. summary_df = pd.DataFrame(summary_rows).sort_values(["stable_real_alignment", "avg_return_range"], ascending=[True, False])
  148. summary_df.to_csv(base_dir / "dragon_threshold_sensitivity_summary.csv", index=False, encoding="utf-8-sig")
  149. lines = [
  150. "# Dragon Threshold Perturbation",
  151. "",
  152. "## Baseline",
  153. f"- trades: `{int(baseline_row['trades'])}`",
  154. f"- avg_return: `{baseline_row['avg_return']:.2%}`",
  155. f"- profit_factor: `{baseline_row['profit_factor']:.2f}`",
  156. f"- real BUY overlap: `{int(baseline_row['real_buy_overlap'])}`",
  157. f"- real SELL overlap: `{int(baseline_row['real_sell_overlap'])}`",
  158. "",
  159. "## Sensitivity Summary",
  160. ]
  161. for _, row in summary_df.iterrows():
  162. lines.append(
  163. f"- `{row['parameter']}`: stable_real_alignment `{bool(row['stable_real_alignment'])}`, "
  164. f"avg_return_range `{row['avg_return_range']:.2%}`, profit_factor_range `{row['profit_factor_range']:.2f}`, "
  165. f"aux_sell_overlap_range `{int(row['aux_sell_overlap_range'])}`"
  166. )
  167. fragile = summary_df[~summary_df["stable_real_alignment"]]
  168. lines.extend(["", "## Fragile Parameters"])
  169. if fragile.empty:
  170. lines.append("- None in this first perturbation pack.")
  171. else:
  172. for _, row in fragile.iterrows():
  173. lines.append(
  174. f"- `{row['parameter']}`: minimum real BUY overlap `{int(row['real_buy_overlap_min'])}`, minimum real SELL overlap `{int(row['real_sell_overlap_min'])}`"
  175. )
  176. robust = summary_df[summary_df["stable_real_alignment"]].sort_values("avg_return_range").head(6)
  177. lines.extend(["", "## Relatively Robust Parameters"])
  178. for _, row in robust.iterrows():
  179. lines.append(
  180. f"- `{row['parameter']}`: avg_return_range `{row['avg_return_range']:.2%}`, profit_factor_range `{row['profit_factor_range']:.2f}`"
  181. )
  182. (base_dir / "dragon_threshold_perturbation.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
  183. if __name__ == "__main__":
  184. main()