dragon_execution_common.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. from __future__ import annotations
  2. import pandas as pd
  3. from dragon_shared import END_DATE, START_DATE, evaluation_years, profit_factor
  4. def apply_execution_model(trades: pd.DataFrame, model: str, cost_bps_side: float) -> pd.DataFrame:
  5. trades = trades.copy()
  6. entry_col = f"exec_{model}_entry"
  7. exit_col = f"exec_{model}_exit"
  8. cost = cost_bps_side / 10000.0
  9. trades["entry_exec_price"] = trades[entry_col].astype(float)
  10. trades["exit_exec_price"] = trades[exit_col].astype(float)
  11. trades = trades.dropna(subset=["entry_exec_price", "exit_exec_price"]).copy()
  12. trades["return_pct"] = (trades["exit_exec_price"] * (1.0 - cost)) / (trades["entry_exec_price"] * (1.0 + cost)) - 1.0
  13. trades["execution_model"] = model
  14. trades["cost_bps_side"] = cost_bps_side
  15. return trades
  16. def _max_drawdown(returns: pd.Series) -> tuple[float, int]:
  17. equity = (1.0 + returns.astype(float)).cumprod()
  18. peak = equity.cummax()
  19. dd = equity / peak - 1.0
  20. max_dd = float(dd.min()) if not dd.empty else float("nan")
  21. duration = 0
  22. max_duration = 0
  23. for value in dd:
  24. if value < 0:
  25. duration += 1
  26. max_duration = max(max_duration, duration)
  27. else:
  28. duration = 0
  29. return max_dd, max_duration
  30. def summary(branch: str, trades: pd.DataFrame) -> dict[str, object]:
  31. if trades.empty:
  32. return {
  33. "branch": branch,
  34. "execution_model": "",
  35. "cost_bps_side": float("nan"),
  36. "trades": 0,
  37. "win_rate": float("nan"),
  38. "avg_return": float("nan"),
  39. "profit_factor": float("nan"),
  40. "compounded_return": float("nan"),
  41. "cagr": float("nan"),
  42. "max_drawdown": float("nan"),
  43. "drawdown_duration_trades": 0,
  44. }
  45. returns = trades["return_pct"].astype(float)
  46. compounded = float((1.0 + returns).prod() - 1.0)
  47. years = evaluation_years(START_DATE, END_DATE)
  48. cagr = float((1.0 + compounded) ** (1.0 / years) - 1.0) if pd.notna(compounded) and compounded > -1.0 else float("nan")
  49. max_dd, dd_duration = _max_drawdown(returns)
  50. return {
  51. "branch": branch,
  52. "execution_model": str(trades["execution_model"].iloc[0]),
  53. "cost_bps_side": float(trades["cost_bps_side"].iloc[0]),
  54. "trades": int(len(trades)),
  55. "win_rate": float((returns > 0).mean()),
  56. "avg_return": float(returns.mean()),
  57. "profit_factor": profit_factor(returns),
  58. "compounded_return": compounded,
  59. "cagr": cagr,
  60. "max_drawdown": max_dd,
  61. "drawdown_duration_trades": dd_duration,
  62. }
  63. def _loss_streak(flags: pd.Series) -> int:
  64. best = 0
  65. cur = 0
  66. for flag in flags.astype(bool):
  67. if flag:
  68. cur += 1
  69. best = max(best, cur)
  70. else:
  71. cur = 0
  72. return best
  73. def _worst_rolling_sum(series: pd.Series, window: int) -> float:
  74. if len(series) < window:
  75. return float(series.sum()) if not series.empty else float("nan")
  76. return float(series.rolling(window).sum().min())
  77. def risk_cluster(branch: str, trades: pd.DataFrame) -> dict[str, object]:
  78. if trades.empty:
  79. return {
  80. "branch": branch,
  81. "execution_model": "",
  82. "cost_bps_side": float("nan"),
  83. "max_loss_streak": 0,
  84. "worst_3trade_sum": float("nan"),
  85. "worst_5trade_sum": float("nan"),
  86. "worst_10trade_sum": float("nan"),
  87. "avg_losing_trade": float("nan"),
  88. "tail_20pct_avg": float("nan"),
  89. "max_drawdown": float("nan"),
  90. "drawdown_duration_trades": 0,
  91. "short_loss_share": float("nan"),
  92. "worst_loss_family": "",
  93. "worst_loss_family_sum": 0.0,
  94. }
  95. returns = trades["return_pct"].astype(float).reset_index(drop=True)
  96. losses = returns[returns < 0]
  97. max_dd, dd_duration = _max_drawdown(returns)
  98. abs_losses = -losses.sum()
  99. short_losses = -trades.loc[(trades["holding_days"] <= 10) & (trades["return_pct"] < 0), "return_pct"].sum()
  100. family_losses = (
  101. trades[trades["return_pct"] < 0]
  102. .groupby("entry_family")["return_pct"]
  103. .sum()
  104. .sort_values()
  105. )
  106. worst_family = "" if family_losses.empty else str(family_losses.index[0])
  107. worst_family_loss = 0.0 if family_losses.empty else float(family_losses.iloc[0])
  108. return {
  109. "branch": branch,
  110. "execution_model": str(trades["execution_model"].iloc[0]),
  111. "cost_bps_side": float(trades["cost_bps_side"].iloc[0]),
  112. "max_loss_streak": _loss_streak(returns < 0),
  113. "worst_3trade_sum": _worst_rolling_sum(returns, 3),
  114. "worst_5trade_sum": _worst_rolling_sum(returns, 5),
  115. "worst_10trade_sum": _worst_rolling_sum(returns, 10),
  116. "avg_losing_trade": float(losses.mean()) if not losses.empty else float("nan"),
  117. "tail_20pct_avg": float(returns.nsmallest(max(1, int(len(returns) * 0.2))).mean()) if not returns.empty else float("nan"),
  118. "max_drawdown": max_dd,
  119. "drawdown_duration_trades": dd_duration,
  120. "short_loss_share": float(short_losses / abs_losses) if abs_losses > 0 else float("nan"),
  121. "worst_loss_family": worst_family,
  122. "worst_loss_family_sum": worst_family_loss,
  123. }