| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Optional
- import pandas as pd
- from dragon_branch_configs import alpha_first_glued_selective_veto_config, alpha_first_selective_veto_config
- from dragon_strategy import DragonRuleEngine
- def _load_csv(base_dir: Path, name: str) -> pd.DataFrame:
- return pd.read_csv(base_dir / name, encoding="utf-8-sig")
- def _load_indicator_snapshot(base_dir: Path) -> pd.DataFrame:
- df = _load_csv(base_dir, "dragon_indicator_snapshot.csv")
- df["date"] = pd.to_datetime(df["date"])
- return df.sort_values("date").reset_index(drop=True)
- def _profit_factor(series: pd.Series) -> float:
- gross_profit = series[series > 0].sum()
- gross_loss = -series[series < 0].sum()
- if gross_loss == 0:
- return float("inf") if gross_profit > 0 else 0.0
- return float(gross_profit / gross_loss)
- def _pct(value: Optional[float]) -> str:
- if value is None or pd.isna(value):
- return "n/a"
- if value == float("inf"):
- return "inf"
- return f"{float(value):.2%}"
- def _fmt_num(value: Optional[float]) -> str:
- if value is None or pd.isna(value):
- return "n/a"
- if value == float("inf"):
- return "inf"
- return f"{float(value):.2f}"
- def _holding_bucket(days: int) -> str:
- if days <= 5:
- return "00-05d"
- if days <= 10:
- return "06-10d"
- if days <= 20:
- return "11-20d"
- if days <= 40:
- return "21-40d"
- return "41d+"
- def _build_trade_quality(trades: pd.DataFrame, indicators: pd.DataFrame) -> pd.DataFrame:
- trades = trades.copy()
- trades["buy_dt"] = pd.to_datetime(trades["buy_date"])
- trades["sell_dt"] = pd.to_datetime(trades["sell_date"])
- trades["holding_bucket"] = trades["holding_days"].astype(int).map(_holding_bucket)
- pos_lookup = {dt.date().isoformat(): idx for idx, dt in enumerate(indicators["date"])}
- buy_a1: list[float] = []
- buy_b1: list[float] = []
- buy_c1: list[float] = []
- sell_a1: list[float] = []
- sell_b1: list[float] = []
- sell_c1: list[float] = []
- mfe_list: list[float] = []
- mae_list: list[float] = []
- exit_followthrough_list: list[float] = []
- exit_rebound_list: list[float] = []
- entry_forward_5d_list: list[float] = []
- indicator_by_date = indicators.set_index(indicators["date"].dt.date)
- for _, trade in trades.iterrows():
- buy_date = pd.Timestamp(trade["buy_date"]).date()
- sell_date = pd.Timestamp(trade["sell_date"]).date()
- entry_price = float(trade["buy_price"])
- exit_price = float(trade["sell_price"])
- buy_row = indicator_by_date.loc[buy_date]
- sell_row = indicator_by_date.loc[sell_date]
- buy_a1.append(float(buy_row["a1"]))
- buy_b1.append(float(buy_row["b1"]))
- buy_c1.append(float(buy_row["c1"]))
- sell_a1.append(float(sell_row["a1"]))
- sell_b1.append(float(sell_row["b1"]))
- sell_c1.append(float(sell_row["c1"]))
- window = indicators[(indicators["date"] >= pd.Timestamp(trade["buy_date"])) & (indicators["date"] <= pd.Timestamp(trade["sell_date"]))]
- max_high = float(window["high"].max())
- min_low = float(window["low"].min())
- mfe_list.append(max_high / entry_price - 1.0)
- mae_list.append(min_low / entry_price - 1.0)
- sell_idx = pos_lookup[trade["sell_date"]]
- future = indicators.iloc[sell_idx + 1 : sell_idx + 6]
- if future.empty:
- exit_followthrough_list.append(float("nan"))
- exit_rebound_list.append(float("nan"))
- else:
- exit_followthrough_list.append(float(future["low"].min()) / exit_price - 1.0)
- exit_rebound_list.append(float(future["high"].max()) / exit_price - 1.0)
- buy_idx = pos_lookup[trade["buy_date"]]
- entry_future = indicators.iloc[buy_idx + 1 : buy_idx + 6]
- if entry_future.empty:
- entry_forward_5d_list.append(float("nan"))
- else:
- entry_forward_5d_list.append(float(entry_future["close"].iloc[-1]) / entry_price - 1.0)
- trades["buy_a1"] = buy_a1
- trades["buy_b1"] = buy_b1
- trades["buy_c1"] = buy_c1
- trades["sell_a1"] = sell_a1
- trades["sell_b1"] = sell_b1
- trades["sell_c1"] = sell_c1
- trades["mfe_pct"] = mfe_list
- trades["mae_pct"] = mae_list
- trades["exit_followthrough_5d_pct"] = exit_followthrough_list
- trades["exit_rebound_5d_pct"] = exit_rebound_list
- trades["entry_forward_5d_pct"] = entry_forward_5d_list
- return trades
- def _run_branch(indicators: pd.DataFrame, config) -> pd.DataFrame:
- indicator_indexed = indicators.set_index("date", drop=False)
- engine = DragonRuleEngine(config=config)
- _, trades = engine.run(indicator_indexed)
- trades = trades.copy()
- return _build_trade_quality(trades, indicators)
- def _trade_key(df: pd.DataFrame) -> set[tuple[str, str, str, str]]:
- return set(zip(df["buy_date"], df["sell_date"], df["buy_reason"], df["sell_reason"]))
- def _veto_bucket(c1: float, b1: float) -> str:
- if c1 >= 40 and b1 >= 0.10:
- return "hot_positive_b1"
- if 23 <= c1 < 28 and b1 <= 0.02:
- return "low_weak_range"
- return "other"
- def _recommendation(row: pd.Series) -> tuple[str, str]:
- ret = float(row["return_pct"])
- holding = int(row["holding_days"])
- mfe = float(row["mfe_pct"])
- replacement_ret = row.get("replacement_return_pct")
- if ret > 0:
- if ret <= 0.01 and holding <= 15:
- if pd.notna(replacement_ret) and float(replacement_ret) <= ret:
- return "OBSERVE_VETO", "原交易仅微利,但当前替代路径没有更强,建议进一步细化 hot 过滤而不是直接全盘保留。"
- return "OBSERVE_VETO", "原交易为微利短单,暂不应视为强 alpha,先保留为观察样本。"
- return "OVER_VETO", "原交易本身为明显盈利单,当前过滤过度。"
- if holding <= 10 and ret < 0 and float(row["exit_followthrough_5d_pct"]) <= 0:
- return "KEEP_VETO", "短持仓亏损且卖出后继续走弱,属于应优先清理的噪音单。"
- if ret < 0 and mfe <= 0.02:
- return "KEEP_VETO", "持仓期间几乎没有有效盈利空间,删除逻辑合理。"
- return "KEEP_VETO", "总体为负收益短单,保留 veto 更符合当前 alpha-first 目标。"
- def _summary_line(group: pd.DataFrame) -> str:
- if group.empty:
- return "n/a"
- return (
- f"trades `{len(group)}`, win_rate `{_pct(float((group['return_pct'] > 0).mean()))}`, "
- f"avg_return `{_pct(float(group['return_pct'].mean()))}`, avg_mfe `{_pct(float(group['mfe_pct'].mean()))}`, "
- f"avg_mae `{_pct(float(group['mae_pct'].mean()))}`, avg_holding `{float(group['holding_days'].mean()):.2f}`"
- )
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- indicators = _load_indicator_snapshot(base_dir)
- workbook_events = _load_csv(base_dir, "true_trade_events.csv")
- alpha_trades = _run_branch(indicators, alpha_first_selective_veto_config())
- glued_trades = _run_branch(indicators, alpha_first_glued_selective_veto_config())
- workbook_buy = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "BUY")]["date"])
- workbook_sell = set(workbook_events[(workbook_events["layer"] == "real_trade") & (workbook_events["side"] == "SELL")]["date"])
- alpha_set = _trade_key(alpha_trades)
- glued_set = _trade_key(glued_trades)
- removed = pd.DataFrame(
- sorted(alpha_set - glued_set),
- columns=["buy_date", "sell_date", "buy_reason", "sell_reason"],
- )
- rows: list[dict[str, object]] = []
- for _, removed_row in removed.iterrows():
- match = alpha_trades[
- (alpha_trades["buy_date"] == removed_row["buy_date"])
- & (alpha_trades["sell_date"] == removed_row["sell_date"])
- & (alpha_trades["buy_reason"] == removed_row["buy_reason"])
- & (alpha_trades["sell_reason"] == removed_row["sell_reason"])
- ]
- if match.empty:
- continue
- trade = match.iloc[0]
- sell_dt = pd.Timestamp(trade["sell_date"])
- replacement = glued_trades[
- (pd.to_datetime(glued_trades["buy_date"]) > sell_dt)
- & (pd.to_datetime(glued_trades["buy_date"]) <= sell_dt + pd.Timedelta(days=10))
- ].sort_values("buy_date")
- replacement_row = replacement.iloc[0] if not replacement.empty else None
- rec, rec_reason = _recommendation(
- pd.Series(
- {
- **trade.to_dict(),
- "replacement_return_pct": None if replacement_row is None else float(replacement_row["return_pct"]),
- }
- )
- )
- rows.append(
- {
- "buy_date": trade["buy_date"],
- "sell_date": trade["sell_date"],
- "buy_reason": trade["buy_reason"],
- "sell_reason": trade["sell_reason"],
- "holding_bucket": trade["holding_bucket"],
- "holding_days": int(trade["holding_days"]),
- "return_pct": float(trade["return_pct"]),
- "mfe_pct": float(trade["mfe_pct"]),
- "mae_pct": float(trade["mae_pct"]),
- "entry_forward_5d_pct": float(trade["entry_forward_5d_pct"]),
- "exit_followthrough_5d_pct": float(trade["exit_followthrough_5d_pct"]),
- "exit_rebound_5d_pct": float(trade["exit_rebound_5d_pct"]),
- "buy_a1": float(trade["buy_a1"]),
- "buy_b1": float(trade["buy_b1"]),
- "buy_c1": float(trade["buy_c1"]),
- "sell_a1": float(trade["sell_a1"]),
- "sell_b1": float(trade["sell_b1"]),
- "sell_c1": float(trade["sell_c1"]),
- "veto_bucket": _veto_bucket(float(trade["buy_c1"]), float(trade["buy_b1"])),
- "buy_aligned_with_workbook": trade["buy_date"] in workbook_buy,
- "sell_aligned_with_workbook": trade["sell_date"] in workbook_sell,
- "replacement_buy_date": "" if replacement_row is None else str(replacement_row["buy_date"]),
- "replacement_sell_date": "" if replacement_row is None else str(replacement_row["sell_date"]),
- "replacement_buy_reason": "" if replacement_row is None else str(replacement_row["buy_reason"]),
- "replacement_sell_reason": "" if replacement_row is None else str(replacement_row["sell_reason"]),
- "replacement_return_pct": float("nan") if replacement_row is None else float(replacement_row["return_pct"]),
- "replacement_gap_days": float("nan")
- if replacement_row is None
- else int((pd.Timestamp(replacement_row["buy_date"]) - sell_dt).days),
- "recommendation": rec,
- "recommendation_reason": rec_reason,
- }
- )
- attribution = pd.DataFrame(rows).sort_values(["veto_bucket", "buy_date"]).reset_index(drop=True)
- attribution.to_csv(base_dir / "dragon_glued_veto_attribution.csv", index=False, encoding="utf-8-sig")
- bucket_summary = (
- attribution.groupby("veto_bucket", dropna=False)
- .agg(
- trades=("buy_date", "count"),
- win_rate=("return_pct", lambda s: float((s > 0).mean())),
- avg_return=("return_pct", "mean"),
- avg_mfe=("mfe_pct", "mean"),
- avg_mae=("mae_pct", "mean"),
- avg_holding_days=("holding_days", "mean"),
- keep_veto_count=("recommendation", lambda s: int((s == "KEEP_VETO").sum())),
- observe_veto_count=("recommendation", lambda s: int((s == "OBSERVE_VETO").sum())),
- over_veto_count=("recommendation", lambda s: int((s == "OVER_VETO").sum())),
- )
- .reset_index()
- .sort_values(["trades", "avg_return"], ascending=[False, True])
- )
- bucket_summary.to_csv(base_dir / "dragon_glued_veto_bucket_summary.csv", index=False, encoding="utf-8-sig")
- lines = [
- "# Dragon Glued Veto Attribution Review",
- "",
- "## Snapshot",
- f"- removed trades vs current alpha-first: `{len(attribution)}`",
- f"- total avg_return of removed set: `{_pct(float(attribution['return_pct'].mean()))}`",
- f"- total win_rate of removed set: `{_pct(float((attribution['return_pct'] > 0).mean()))}`",
- f"- removed-set profit_factor: `{_fmt_num(_profit_factor(attribution['return_pct']))}`",
- "",
- "## Bucket Summary",
- ]
- for _, row in bucket_summary.iterrows():
- lines.append(
- f"- `{row['veto_bucket']}`: trades `{int(row['trades'])}`, win_rate `{_pct(float(row['win_rate']))}`, "
- f"avg_return `{_pct(float(row['avg_return']))}`, avg_mfe `{_pct(float(row['avg_mfe']))}`, "
- f"avg_mae `{_pct(float(row['avg_mae']))}`, avg_holding `{float(row['avg_holding_days']):.2f}`, "
- f"KEEP/OBSERVE/OVER = `{int(row['keep_veto_count'])}/{int(row['observe_veto_count'])}/{int(row['over_veto_count'])}`"
- )
- lines.extend(
- [
- "",
- "## Quant Judgment",
- f"- `low_weak_range`: {_summary_line(attribution[attribution['veto_bucket'] == 'low_weak_range'])}.",
- f"- `hot_positive_b1`: {_summary_line(attribution[attribution['veto_bucket'] == 'hot_positive_b1'])}.",
- "- `low_weak_range` is now a clean promotion candidate: all removed trades are short, losing, and there is no positive sample in this bucket.",
- "- `hot_positive_b1` is directionally correct but not fully clean: most removed trades are weak, but one micro-profit sample remains and should be used as the first refinement target.",
- "- Immediate next research action: keep the low bucket intact, and narrow the hot bucket rather than rolling back the whole glued veto branch.",
- "",
- "## Detailed Cards",
- ]
- )
- for _, row in attribution.iterrows():
- replacement = "none"
- if isinstance(row["replacement_buy_date"], str) and row["replacement_buy_date"]:
- replacement = (
- f"{row['replacement_buy_date']} -> {row['replacement_sell_date']} / "
- f"{row['replacement_buy_reason']} -> {row['replacement_sell_reason']} / "
- f"{_pct(row['replacement_return_pct'])}"
- )
- lines.extend(
- [
- f"### {row['buy_date']} -> {row['sell_date']}",
- f"- Bucket: `{row['veto_bucket']}`",
- f"- Trade: `{row['buy_reason']} -> {row['sell_reason']}` | `{int(row['holding_days'])}` days | return `{_pct(row['return_pct'])}`",
- f"- MFE / MAE: `{_pct(row['mfe_pct'])}` / `{_pct(row['mae_pct'])}`",
- f"- Entry 5d / Exit followthrough 5d: `{_pct(row['entry_forward_5d_pct'])}` / `{_pct(row['exit_followthrough_5d_pct'])}`",
- f"- Entry indicators: `a1={float(row['buy_a1']):.4f}` `b1={float(row['buy_b1']):.4f}` `c1={float(row['buy_c1']):.2f}`",
- f"- Workbook aligned: buy `{bool(row['buy_aligned_with_workbook'])}` / sell `{bool(row['sell_aligned_with_workbook'])}`",
- f"- Candidate replacement within 10d after exit: `{replacement}`",
- f"- Recommendation: `{row['recommendation']}` | {row['recommendation_reason']}",
- "",
- ]
- )
- (base_dir / "dragon_glued_veto_review.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
- if __name__ == "__main__":
- main()
|