| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- from __future__ import annotations
- import pandas as pd
- from dragon_shared import END_DATE, START_DATE, evaluation_years, profit_factor
- def apply_execution_model(trades: pd.DataFrame, model: str, cost_bps_side: float) -> pd.DataFrame:
- trades = trades.copy()
- entry_col = f"exec_{model}_entry"
- exit_col = f"exec_{model}_exit"
- cost = cost_bps_side / 10000.0
- trades["entry_exec_price"] = trades[entry_col].astype(float)
- trades["exit_exec_price"] = trades[exit_col].astype(float)
- trades = trades.dropna(subset=["entry_exec_price", "exit_exec_price"]).copy()
- trades["return_pct"] = (trades["exit_exec_price"] * (1.0 - cost)) / (trades["entry_exec_price"] * (1.0 + cost)) - 1.0
- trades["execution_model"] = model
- trades["cost_bps_side"] = cost_bps_side
- return trades
- def _max_drawdown(returns: pd.Series) -> tuple[float, int]:
- equity = (1.0 + returns.astype(float)).cumprod()
- peak = equity.cummax()
- dd = equity / peak - 1.0
- max_dd = float(dd.min()) if not dd.empty else float("nan")
- duration = 0
- max_duration = 0
- for value in dd:
- if value < 0:
- duration += 1
- max_duration = max(max_duration, duration)
- else:
- duration = 0
- return max_dd, max_duration
- def summary(branch: str, trades: pd.DataFrame) -> dict[str, object]:
- if trades.empty:
- return {
- "branch": branch,
- "execution_model": "",
- "cost_bps_side": float("nan"),
- "trades": 0,
- "win_rate": float("nan"),
- "avg_return": float("nan"),
- "profit_factor": float("nan"),
- "compounded_return": float("nan"),
- "cagr": float("nan"),
- "max_drawdown": float("nan"),
- "drawdown_duration_trades": 0,
- }
- returns = trades["return_pct"].astype(float)
- compounded = float((1.0 + returns).prod() - 1.0)
- years = evaluation_years(START_DATE, END_DATE)
- cagr = float((1.0 + compounded) ** (1.0 / years) - 1.0) if pd.notna(compounded) and compounded > -1.0 else float("nan")
- max_dd, dd_duration = _max_drawdown(returns)
- return {
- "branch": branch,
- "execution_model": str(trades["execution_model"].iloc[0]),
- "cost_bps_side": float(trades["cost_bps_side"].iloc[0]),
- "trades": int(len(trades)),
- "win_rate": float((returns > 0).mean()),
- "avg_return": float(returns.mean()),
- "profit_factor": profit_factor(returns),
- "compounded_return": compounded,
- "cagr": cagr,
- "max_drawdown": max_dd,
- "drawdown_duration_trades": dd_duration,
- }
- def _loss_streak(flags: pd.Series) -> int:
- best = 0
- cur = 0
- for flag in flags.astype(bool):
- if flag:
- cur += 1
- best = max(best, cur)
- else:
- cur = 0
- return best
- def _worst_rolling_sum(series: pd.Series, window: int) -> float:
- if len(series) < window:
- return float(series.sum()) if not series.empty else float("nan")
- return float(series.rolling(window).sum().min())
- def risk_cluster(branch: str, trades: pd.DataFrame) -> dict[str, object]:
- if trades.empty:
- return {
- "branch": branch,
- "execution_model": "",
- "cost_bps_side": float("nan"),
- "max_loss_streak": 0,
- "worst_3trade_sum": float("nan"),
- "worst_5trade_sum": float("nan"),
- "worst_10trade_sum": float("nan"),
- "avg_losing_trade": float("nan"),
- "tail_20pct_avg": float("nan"),
- "max_drawdown": float("nan"),
- "drawdown_duration_trades": 0,
- "short_loss_share": float("nan"),
- "worst_loss_family": "",
- "worst_loss_family_sum": 0.0,
- }
- returns = trades["return_pct"].astype(float).reset_index(drop=True)
- losses = returns[returns < 0]
- max_dd, dd_duration = _max_drawdown(returns)
- abs_losses = -losses.sum()
- short_losses = -trades.loc[(trades["holding_days"] <= 10) & (trades["return_pct"] < 0), "return_pct"].sum()
- family_losses = (
- trades[trades["return_pct"] < 0]
- .groupby("entry_family")["return_pct"]
- .sum()
- .sort_values()
- )
- worst_family = "" if family_losses.empty else str(family_losses.index[0])
- worst_family_loss = 0.0 if family_losses.empty else float(family_losses.iloc[0])
- return {
- "branch": branch,
- "execution_model": str(trades["execution_model"].iloc[0]),
- "cost_bps_side": float(trades["cost_bps_side"].iloc[0]),
- "max_loss_streak": _loss_streak(returns < 0),
- "worst_3trade_sum": _worst_rolling_sum(returns, 3),
- "worst_5trade_sum": _worst_rolling_sum(returns, 5),
- "worst_10trade_sum": _worst_rolling_sum(returns, 10),
- "avg_losing_trade": float(losses.mean()) if not losses.empty else float("nan"),
- "tail_20pct_avg": float(returns.nsmallest(max(1, int(len(returns) * 0.2))).mean()) if not returns.empty else float("nan"),
- "max_drawdown": max_dd,
- "drawdown_duration_trades": dd_duration,
- "short_loss_share": float(short_losses / abs_losses) if abs_losses > 0 else float("nan"),
- "worst_loss_family": worst_family,
- "worst_loss_family_sum": worst_family_loss,
- }
|