| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- from __future__ import annotations
- from pathlib import Path
- from typing import Optional
- import pandas as pd
- def _load_csv(path: Path) -> pd.DataFrame:
- return pd.read_csv(path, encoding="utf-8-sig")
- def _pct(value: Optional[float]) -> str:
- if value is None or pd.isna(value):
- return "n/a"
- return f"{float(value):.2%}"
- def _regime_label(c1: float) -> str:
- if c1 < 12:
- return "deep_oversold"
- if c1 < 20:
- return "oversold"
- if c1 < 35:
- return "rebound_low"
- if c1 < 60:
- return "rebound_mid"
- if c1 < 80:
- return "high_mid"
- return "hot"
- def _trade_chain_type(entry_aligned: bool, exit_aligned: bool) -> str:
- if not entry_aligned and not exit_aligned:
- return "isolated_extra_trade"
- if not entry_aligned and exit_aligned:
- return "bridge_to_aligned_sell"
- if entry_aligned and not exit_aligned:
- return "premature_exit_of_aligned_trade"
- return "aligned_trade"
- def _event_context(events: pd.DataFrame, center_date: str, window_days: int = 10) -> str:
- center = pd.Timestamp(center_date)
- start = (center - pd.Timedelta(days=window_days)).date().isoformat()
- end = (center + pd.Timedelta(days=window_days)).date().isoformat()
- subset = events[(events["date"] >= start) & (events["date"] <= end)].copy()
- if subset.empty:
- return ""
- labels = []
- for _, row in subset.sort_values("date").iterrows():
- layer = row.get("layer", "")
- side = row.get("side", "")
- note = row.get("note", "") if "note" in row else ""
- note_text = f":{note}" if isinstance(note, str) and note else ""
- labels.append(f"{row['date']} {side}({layer}){note_text}")
- return " | ".join(labels)
- def _calc_mfe_mae(indicators: pd.DataFrame, buy_date: str, sell_date: str, entry_price: float) -> tuple[Optional[float], Optional[float]]:
- window = indicators[(indicators["date"] >= buy_date) & (indicators["date"] <= sell_date)]
- if window.empty or entry_price is None or pd.isna(entry_price):
- return None, None
- mfe = window["high"].max() / entry_price - 1
- mae = window["low"].min() / entry_price - 1
- return float(mfe), float(mae)
- def _calc_forward_backward_returns(indicators: pd.DataFrame, event_date: str, days: int = 5) -> tuple[Optional[float], Optional[float]]:
- row = indicators[indicators["date"] == event_date]
- if row.empty:
- return None, None
- idx = row.index[0]
- close_now = float(indicators.loc[idx, "close"])
- pre = None
- post = None
- if idx - days >= indicators.index.min():
- close_prev = float(indicators.loc[idx - days, "close"])
- pre = close_now / close_prev - 1
- if idx + days <= indicators.index.max():
- close_next = float(indicators.loc[idx + days, "close"])
- post = close_next / close_now - 1
- return pre, post
- def _recommendation(
- chain_type: str,
- return_pct: float,
- mfe_pct: Optional[float],
- holding_days: int,
- ) -> tuple[str, str]:
- if chain_type == "premature_exit_of_aligned_trade":
- if return_pct <= 0.02:
- return "DELETE_CANDIDATE", "额外卖点提前截断了已对齐持仓,且收益补偿不足。"
- return "OBSERVE", "额外卖点发生在已对齐持仓内,但收益贡献不差,需要和风险控制一起评估。"
- if chain_type == "bridge_to_aligned_sell":
- if return_pct > 0.01:
- return "KEEP_BRIDGE", "额外买点承接到了后续对齐卖点,且桥接段本身有正收益。"
- return "OBSERVE_BRIDGE", "额外买点承接了后续对齐卖点,但桥接段收益一般,需要替代方案后再删。"
- if chain_type == "isolated_extra_trade":
- if return_pct > 0.05 and (mfe_pct is not None and mfe_pct > 0.08):
- return "KEEP_ALPHA", "虽然是额外交易,但收益明显,可能代表工作簿未显式记录的顺势 alpha。"
- if return_pct <= 0.02 and holding_days <= 15:
- return "DELETE_CANDIDATE", "额外交易自成闭环,且收益/持有质量偏弱,优先删除。"
- return "OBSERVE", "额外交易自成闭环,但收益质量不算差,先保留观察。"
- return "OBSERVE", "需要结合上下文进一步人工判断。"
- def _impact_text(chain_type: str, side: str, buy_date: str, sell_date: str, paired_aligned: bool) -> str:
- if side == "BUY" and chain_type == "bridge_to_aligned_sell":
- return f"若删除,最直接风险是丢失后续对齐卖点 {sell_date} 的持仓承接。"
- if side == "SELL" and chain_type == "premature_exit_of_aligned_trade":
- return f"若删除,理论上更接近工作簿原始持仓路径,可继续观察后续卖点是否自然保留。"
- if chain_type == "isolated_extra_trade":
- return f"该点与配对交易构成局部闭环,删除通常应连同 {buy_date}->{sell_date} 一并评估。"
- if paired_aligned:
- return "该点与对齐事件相连,删除需检查下游状态转移。"
- return "该点对下游对齐影响有限,更偏向局部收益质量问题。"
- def main() -> None:
- base_dir = Path(__file__).resolve().parent
- gaps = _load_csv(base_dir / "dragon_event_gaps.csv")
- trades = _load_csv(base_dir / "dragon_strategy_trades.csv")
- strategy_events = _load_csv(base_dir / "dragon_strategy_events.csv")
- workbook_layers = _load_csv(base_dir / "dragon_workbook_layers.csv")
- indicators = _load_csv(base_dir / "dragon_indicator_snapshot.csv")
- indicators = indicators.sort_values("date").reset_index(drop=True)
- workbook_real_buy = set(
- workbook_layers[(workbook_layers["layer"] == "real_trade") & (workbook_layers["side"] == "BUY")]["date"]
- )
- workbook_real_sell = set(
- workbook_layers[(workbook_layers["layer"] == "real_trade") & (workbook_layers["side"] == "SELL")]["date"]
- )
- residuals = gaps[
- (gaps["diagnostic_type"] == "extra_in_strategy")
- & (gaps["target_layer"] == "real_trade")
- ].copy()
- residuals["side"] = residuals["target_side"]
- residuals["rule"] = residuals["source_reason"]
- residuals = residuals[["date", "side", "rule", "a1", "b1", "c1"]].sort_values(["date", "side"])
- rows: list[dict[str, object]] = []
- for _, residual in residuals.iterrows():
- side = residual["side"]
- event_date = residual["date"]
- trade = None
- if side == "BUY":
- match = trades[trades["buy_date"] == event_date]
- else:
- match = trades[trades["sell_date"] == event_date]
- if not match.empty:
- trade = match.iloc[0]
- if trade is None:
- continue
- buy_date = str(trade["buy_date"])
- sell_date = str(trade["sell_date"])
- buy_reason = str(trade["buy_reason"])
- sell_reason = str(trade["sell_reason"])
- holding_days = int(trade["holding_days"])
- return_pct = float(trade["return_pct"])
- buy_price = float(trade["buy_price"])
- entry_aligned = buy_date in workbook_real_buy
- exit_aligned = sell_date in workbook_real_sell
- chain_type = _trade_chain_type(entry_aligned, exit_aligned)
- paired_aligned = exit_aligned if side == "BUY" else entry_aligned
- regime = _regime_label(float(residual["c1"]))
- mfe_pct, mae_pct = _calc_mfe_mae(indicators, buy_date, sell_date, buy_price)
- pre_5d_return, post_5d_return = _calc_forward_backward_returns(indicators, event_date, days=5)
- recommendation, recommendation_reason = _recommendation(
- chain_type=chain_type,
- return_pct=return_pct,
- mfe_pct=mfe_pct,
- holding_days=holding_days,
- )
- impact_text = _impact_text(
- chain_type=chain_type,
- side=side,
- buy_date=buy_date,
- sell_date=sell_date,
- paired_aligned=paired_aligned,
- )
- workbook_context = _event_context(workbook_layers, event_date, window_days=10)
- strategy_context = _event_context(strategy_events, event_date, window_days=10)
- rows.append(
- {
- "date": event_date,
- "side": side,
- "rule": residual["rule"],
- "regime": regime,
- "buy_date": buy_date,
- "buy_reason": buy_reason,
- "sell_date": sell_date,
- "sell_reason": sell_reason,
- "holding_days": holding_days,
- "return_pct": return_pct,
- "mfe_pct": mfe_pct,
- "mae_pct": mae_pct,
- "event_a1": float(residual["a1"]),
- "event_b1": float(residual["b1"]),
- "event_c1": float(residual["c1"]),
- "pre_5d_return": pre_5d_return,
- "post_5d_return": post_5d_return,
- "entry_is_workbook_real": entry_aligned,
- "exit_is_workbook_real": exit_aligned,
- "chain_type": chain_type,
- "paired_aligned_event_date": sell_date if side == "BUY" and exit_aligned else buy_date if side == "SELL" and entry_aligned else "",
- "delete_impact": impact_text,
- "recommendation": recommendation,
- "recommendation_reason": recommendation_reason,
- "workbook_context": workbook_context,
- "strategy_context": strategy_context,
- }
- )
- attribution = pd.DataFrame(rows).sort_values(["recommendation", "date"]).reset_index(drop=True)
- attribution.to_csv(base_dir / "dragon_residual_trade_attribution.csv", index=False, encoding="utf-8-sig")
- lines = [
- "# Dragon Residual Trade Review",
- "",
- "## Snapshot",
- f"- Residual real-trade rows reviewed: `{len(attribution)}`",
- f"- DELETE_CANDIDATE: `{int((attribution['recommendation'] == 'DELETE_CANDIDATE').sum())}`",
- f"- KEEP_BRIDGE / KEEP_ALPHA: `{int(attribution['recommendation'].isin(['KEEP_BRIDGE', 'KEEP_ALPHA']).sum())}`",
- f"- OBSERVE / OBSERVE_BRIDGE: `{int(attribution['recommendation'].isin(['OBSERVE', 'OBSERVE_BRIDGE']).sum())}`",
- "",
- "## Recommendation Summary",
- ]
- for label in ["DELETE_CANDIDATE", "KEEP_BRIDGE", "KEEP_ALPHA", "OBSERVE_BRIDGE", "OBSERVE"]:
- subset = attribution[attribution["recommendation"] == label]
- if subset.empty:
- continue
- lines.append(f"### {label}")
- for _, row in subset.iterrows():
- lines.append(
- f"- `{row['date']}` `{row['side']}` `{row['rule']}` | trade `{row['buy_date']} -> {row['sell_date']}` | "
- f"ret `{_pct(row['return_pct'])}` mfe `{_pct(row['mfe_pct'])}` mae `{_pct(row['mae_pct'])}` | "
- f"{row['recommendation_reason']}"
- )
- lines.append("")
- lines.extend(["## Detailed Cards", ""])
- for _, row in attribution.sort_values("date").iterrows():
- lines.extend(
- [
- f"### {row['date']} {row['side']} {row['rule']}",
- f"- Regime: `{row['regime']}`",
- f"- Trade: `{row['buy_date']} -> {row['sell_date']}` | buy `{row['buy_reason']}` | sell `{row['sell_reason']}`",
- f"- Holding / Return: `{int(row['holding_days'])}` days / `{_pct(row['return_pct'])}`",
- f"- MFE / MAE: `{_pct(row['mfe_pct'])}` / `{_pct(row['mae_pct'])}`",
- f"- Event indicators: `a1={row['event_a1']:.4f}` `b1={row['event_b1']:.4f}` `c1={row['event_c1']:.2f}`",
- f"- Pre/Post 5d return: `{_pct(row['pre_5d_return'])}` / `{_pct(row['post_5d_return'])}`",
- f"- Chain type: `{row['chain_type']}` | entry aligned `{bool(row['entry_is_workbook_real'])}` | exit aligned `{bool(row['exit_is_workbook_real'])}`",
- f"- Delete impact: {row['delete_impact']}",
- f"- Recommendation: `{row['recommendation']}` | {row['recommendation_reason']}",
- f"- Workbook context: {row['workbook_context'] or 'n/a'}",
- f"- Strategy context: {row['strategy_context'] or 'n/a'}",
- "",
- ]
- )
- (base_dir / "dragon_residual_trade_review.md").write_text("\n".join(lines), encoding="utf-8")
- if __name__ == "__main__":
- main()
|