from __future__ import annotations from pathlib import Path from typing import Optional import pandas as pd from dragon_branch_configs import alpha_first_glued_selective_veto_config, alpha_first_selective_veto_config from dragon_strategy import DragonRuleEngine def _load_csv(base_dir: Path, name: str) -> pd.DataFrame: return pd.read_csv(base_dir / name, encoding="utf-8-sig") def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame: df = _load_csv(base_dir, "dragon_indicator_snapshot.csv") df["date"] = pd.to_datetime(df["date"]) return df.sort_values("date").reset_index(drop=True) def _profit_factor(series: pd.Series) -> float: gross_profit = series[series > 0].sum() gross_loss = -series[series < 0].sum() if gross_loss == 0: return float("inf") if gross_profit > 0 else 0.0 return float(gross_profit / gross_loss) def _pct(value: Optional[float]) -> str: if value is None or pd.isna(value): return "n/a" if value == float("inf"): return "inf" return f"{float(value):.2%}" def _fmt_num(value: Optional[float]) -> str: if value is None or pd.isna(value): return "n/a" if value == float("inf"): return "inf" return f"{float(value):.2f}" def _holding_bucket(days: int) -> str: if days <= 5: return "00-05d" if days <= 10: return "06-10d" if days <= 20: return "11-20d" if days <= 40: return "21-40d" return "41d+" def _build_trade_quality(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame: trades = trades.copy() trades["buy_dt"] = pd.to_datetime(trades["buy_date"]) trades["sell_dt"] = pd.to_datetime(trades["sell_date"]) trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket) pos_lookup = {dt.date().isoformat(): idx for idx, dt in enumerate(indicators["date"])} buy_a1: list[float] = [] buy_b1: list[float] = [] buy_c1: list[float] = [] sell_a1: list[float] = [] sell_b1: list[float] = [] sell_c1: list[float] = [] mfe_list: list[float] = [] mae_list: list[float] = [] exit_followthrough_list: list[float] = [] exit_rebound_list: list[float] = [] entry_forward_5d_list: list[float] = [] indicator_by_date = indicators.set_index(indicators["date"].dt.date) for _, trade in trades.iterrows(): buy_date = pd.Timestamp(trade["buy_date"]).date() sell_date = pd.Timestamp(trade["sell_date"]).date() entry_price = float(trade["buy_price"]) exit_price = float(trade["sell_price"]) buy_row = indicator_by_date.loc[buy_date] sell_row = indicator_by_date.loc[sell_date] buy_a1.append(float(buy_row["a1"])) buy_b1.append(float(buy_row["b1"])) buy_c1.append(float(buy_row["c1"])) sell_a1.append(float(sell_row["a1"])) sell_b1.append(float(sell_row["b1"])) sell_c1.append(float(sell_row["c1"])) window = indicators[(indicators["date"] >= pd.Timestamp(trade["buy_date"])) & (indicators["date"] <= pd.Timestamp(trade["sell_date"]))] max_high = float(window["high"].max()) min_low = float(window["low"].min()) mfe_list.append(max_high / entry_price - 1.0) mae_list.append(min_low / entry_price - 1.0) sell_idx = pos_lookup[trade["sell_date"]] future = indicators.iloc[sell_idx + 1 : sell_idx + 6] if future.empty: exit_followthrough_list.append(float("nan")) exit_rebound_list.append(float("nan")) else: exit_followthrough_list.append(float(future["low"].min()) / exit_price - 1.0) exit_rebound_list.append(float(future["high"].max()) / exit_price - 1.0) buy_idx = pos_lookup[trade["buy_date"]] entry_future = indicators.iloc[buy_idx + 1 : buy_idx + 6] if entry_future.empty: entry_forward_5d_list.append(float("nan")) else: entry_forward_5d_list.append(float(entry_future["close"].iloc[-1]) / entry_price - 1.0) trades["buy_a1"] = buy_a1 trades["buy_b1"] = buy_b1 trades["buy_c1"] = buy_c1 trades["sell_a1"] = sell_a1 trades["sell_b1"] = sell_b1 trades["sell_c1"] = sell_c1 trades["mfe_pct"] = mfe_list trades["mae_pct"] = mae_list trades["exit_followthrough_5d_pct"] = exit_followthrough_list trades["exit_rebound_5d_pct"] = exit_rebound_list trades["entry_forward_5d_pct"] = entry_forward_5d_list return trades def _run_branch(indicators: pd.DataFrame, config) -> pd.DataFrame: indicator_indexed = indicators.set_index("date", drop=False) engine = DragonRuleEngine(config=config) _, trades = engine.run(indicator_indexed) trades = trades.copy() return _build_trade_quality(trades, indicators) def _trade_key(df: pd.DataFrame) -> set[tuple[str, str, str, str]]: return set(zip(df["buy_date"], df["sell_date"], df["buy_reason"], df["sell_reason"])) def _veto_bucket(c1: float, b1: float) -> str: if c1 >= 40 and b1 >= 0.10: return "hot_positive_b1" if 23 <= c1 < 28 and b1 <= 0.02: return "low_weak_range" return "other" def _recommendation(row: pd.Series) -> tuple[str, str]: ret = float(row["return_pct"]) holding = int(row["holding_days"]) mfe = float(row["mfe_pct"]) replacement_ret = row.get("replacement_return_pct") if ret > 0: if ret <= 0.01 and holding <= 15: if pd.notna(replacement_ret) and float(replacement_ret) <= ret: return "OBSERVE_VETO", "原交易仅微利,但当前替代路径没有更强,建议进一步细化 hot 过滤而不是直接全盘保留。" return "OBSERVE_VETO", "原交易为微利短单,暂不应视为强 alpha,先保留为观察样本。" return "OVER_VETO", "原交易本身为明显盈利单,当前过滤过度。" if holding <= 10 and ret < 0 and float(row["exit_followthrough_5d_pct"]) <= 0: return "KEEP_VETO", "短持仓亏损且卖出后继续走弱,属于应优先清理的噪音单。" if ret < 0 and mfe <= 0.02: return "KEEP_VETO", "持仓期间几乎没有有效盈利空间,删除逻辑合理。" return "KEEP_VETO", "总体为负收益短单,保留 veto 更符合当前 alpha-first 目标。" def _summary_line(group: pd.DataFrame) -> str: if group.empty: return "n/a" return ( f"trades `{len(group)}`, win_rate `{_pct(float((group['return_pct'] > 0).mean()))}`, " f"avg_return `{_pct(float(group['return_pct'].mean()))}`, avg_mfe `{_pct(float(group['mfe_pct'].mean()))}`, " f"avg_mae `{_pct(float(group['mae_pct'].mean()))}`, avg_holding `{float(group['holding_days'].mean()):.2f}`" ) def main() -> None: base_dir = Path(__file__).resolve().parent indicators = _load_indicator_snapshot(base_dir) workbook_events = _load_csv(base_dir, "true_trade_events.csv") alpha_trades = _run_branch(indicators, alpha_first_selective_veto_config()) glued_trades = _run_branch(indicators, alpha_first_glued_selective_veto_config()) workbook_buy = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "BUY")]["date"]) workbook_sell = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "SELL")]["date"]) alpha_set = _trade_key(alpha_trades) glued_set = _trade_key(glued_trades) removed = pd.DataFrame( sorted(alpha_set - glued_set), columns=["buy_date", "sell_date", "buy_reason", "sell_reason"], ) rows: list[dict[str, object]] = [] for _, removed_row in removed.iterrows(): match = alpha_trades[ (alpha_trades["buy_date"] == removed_row["buy_date"]) & (alpha_trades["sell_date"] == removed_row["sell_date"]) & (alpha_trades["buy_reason"] == removed_row["buy_reason"]) & (alpha_trades["sell_reason"] == removed_row["sell_reason"]) ] if match.empty: continue trade = match.iloc[0] sell_dt = pd.Timestamp(trade["sell_date"]) replacement = glued_trades[ (pd.to_datetime(glued_trades["buy_date"]) > sell_dt) & (pd.to_datetime(glued_trades["buy_date"]) <= sell_dt + pd.Timedelta(days=10)) ].sort_values("buy_date") replacement_row = replacement.iloc[0] if not replacement.empty else None rec, rec_reason = _recommendation( pd.Series( { **trade.to_dict(), "replacement_return_pct": None if replacement_row is None else float(replacement_row["return_pct"]), } ) ) rows.append( { "buy_date": trade["buy_date"], "sell_date": trade["sell_date"], "buy_reason": trade["buy_reason"], "sell_reason": trade["sell_reason"], "holding_bucket": trade["holding_bucket"], "holding_days": int(trade["holding_days"]), "return_pct": float(trade["return_pct"]), "mfe_pct": float(trade["mfe_pct"]), "mae_pct": float(trade["mae_pct"]), "entry_forward_5d_pct": float(trade["entry_forward_5d_pct"]), "exit_followthrough_5d_pct": float(trade["exit_followthrough_5d_pct"]), "exit_rebound_5d_pct": float(trade["exit_rebound_5d_pct"]), "buy_a1": float(trade["buy_a1"]), "buy_b1": float(trade["buy_b1"]), "buy_c1": float(trade["buy_c1"]), "sell_a1": float(trade["sell_a1"]), "sell_b1": float(trade["sell_b1"]), "sell_c1": float(trade["sell_c1"]), "veto_bucket": _veto_bucket(float(trade["buy_c1"]), float(trade["buy_b1"])), "buy_aligned_with_workbook": trade["buy_date"] in workbook_buy, "sell_aligned_with_workbook": trade["sell_date"] in workbook_sell, "replacement_buy_date": "" if replacement_row is None else str(replacement_row["buy_date"]), "replacement_sell_date": "" if replacement_row is None else str(replacement_row["sell_date"]), "replacement_buy_reason": "" if replacement_row is None else str(replacement_row["buy_reason"]), "replacement_sell_reason": "" if replacement_row is None else str(replacement_row["sell_reason"]), "replacement_return_pct": float("nan") if replacement_row is None else float(replacement_row["return_pct"]), "replacement_gap_days": float("nan") if replacement_row is None else int((pd.Timestamp(replacement_row["buy_date"]) - sell_dt).days), "recommendation": rec, "recommendation_reason": rec_reason, } ) attribution = pd.DataFrame(rows).sort_values(["veto_bucket", "buy_date"]).reset_index(drop=True) attribution.to_csv(base_dir / "dragon_glued_veto_attribution.csv", index=False, encoding="utf-8-sig") bucket_summary = ( attribution.groupby("veto_bucket", dropna=False) .agg( trades=("buy_date", "count"), win_rate=("return_pct", lambda s: float((s > 0).mean())), avg_return=("return_pct", "mean"), avg_mfe=("mfe_pct", "mean"), avg_mae=("mae_pct", "mean"), avg_holding_days=("holding_days", "mean"), keep_veto_count=("recommendation", lambda s: int((s == "KEEP_VETO").sum())), observe_veto_count=("recommendation", lambda s: int((s == "OBSERVE_VETO").sum())), over_veto_count=("recommendation", lambda s: int((s == "OVER_VETO").sum())), ) .reset_index() .sort_values(["trades", "avg_return"], ascending=[False, True]) ) bucket_summary.to_csv(base_dir / "dragon_glued_veto_bucket_summary.csv", index=False, encoding="utf-8-sig") lines = [ "# Dragon Glued Veto Attribution Review", "", "## Snapshot", f"- removed trades vs current alpha-first: `{len(attribution)}`", f"- total avg_return of removed set: `{_pct(float(attribution['return_pct'].mean()))}`", f"- total win_rate of removed set: `{_pct(float((attribution['return_pct'] > 0).mean()))}`", f"- removed-set profit_factor: `{_fmt_num(_profit_factor(attribution['return_pct']))}`", "", "## Bucket Summary", ] for _, row in bucket_summary.iterrows(): lines.append( f"- `{row['veto_bucket']}`: trades `{int(row['trades'])}`, win_rate `{_pct(float(row['win_rate']))}`, " f"avg_return `{_pct(float(row['avg_return']))}`, avg_mfe `{_pct(float(row['avg_mfe']))}`, " f"avg_mae `{_pct(float(row['avg_mae']))}`, avg_holding `{float(row['avg_holding_days']):.2f}`, " f"KEEP/OBSERVE/OVER = `{int(row['keep_veto_count'])}/{int(row['observe_veto_count'])}/{int(row['over_veto_count'])}`" ) lines.extend( [ "", "## Quant Judgment", f"- `low_weak_range`: {_summary_line(attribution[attribution['veto_bucket'] == 'low_weak_range'])}.", f"- `hot_positive_b1`: {_summary_line(attribution[attribution['veto_bucket'] == 'hot_positive_b1'])}.", "- `low_weak_range` is now a clean promotion candidate: all removed trades are short, losing, and there is no positive sample in this bucket.", "- `hot_positive_b1` is directionally correct but not fully clean: most removed trades are weak, but one micro-profit sample remains and should be used as the first refinement target.", "- Immediate next research action: keep the low bucket intact, and narrow the hot bucket rather than rolling back the whole glued veto branch.", "", "## Detailed Cards", ] ) for _, row in attribution.iterrows(): replacement = "none" if isinstance(row["replacement_buy_date"], str) and row["replacement_buy_date"]: replacement = ( f"{row['replacement_buy_date']} -> {row['replacement_sell_date']} / " f"{row['replacement_buy_reason']} -> {row['replacement_sell_reason']} / " f"{_pct(row['replacement_return_pct'])}" ) lines.extend( [ f"### {row['buy_date']} -> {row['sell_date']}", f"- Bucket: `{row['veto_bucket']}`", f"- Trade: `{row['buy_reason']} -> {row['sell_reason']}` | `{int(row['holding_days'])}` days | return `{_pct(row['return_pct'])}`", f"- MFE / MAE: `{_pct(row['mfe_pct'])}` / `{_pct(row['mae_pct'])}`", f"- Entry 5d / Exit followthrough 5d: `{_pct(row['entry_forward_5d_pct'])}` / `{_pct(row['exit_followthrough_5d_pct'])}`", f"- Entry indicators: `a1={float(row['buy_a1']):.4f}` `b1={float(row['buy_b1']):.4f}` `c1={float(row['buy_c1']):.2f}`", f"- Workbook aligned: buy `{bool(row['buy_aligned_with_workbook'])}` / sell `{bool(row['sell_aligned_with_workbook'])}`", f"- Candidate replacement within 10d after exit: `{replacement}`", f"- Recommendation: `{row['recommendation']}` | {row['recommendation_reason']}", "", ] ) (base_dir / "dragon_glued_veto_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8") if __name__ == "__main__": main()