| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193 |
- from __future__ import annotations
- from dataclasses import dataclass
- from datetime import date, datetime
- from pathlib import Path
- from typing import Iterable, Optional
- from openpyxl import load_workbook
- BUY_CODEPOINT = 0x4E70
- SELL_CODEPOINT = 0x5356
- @dataclass(frozen=True)
- class WorkbookEvent:
- date: date
- index_close: Optional[float]
- prev_index: Optional[float]
- side: str
- capital: Optional[float]
- pnl: Optional[float]
- kdj: Optional[str]
- ql: Optional[str]
- note: str
- @dataclass(frozen=True)
- class SplitEvent:
- date: date
- side: str
- layer: str
- signal_reason: str
- index_close: Optional[float]
- prev_index: Optional[float]
- capital: Optional[float]
- pnl: Optional[float]
- kdj: Optional[str]
- ql: Optional[str]
- note: str
- def _normalize_optional_float(value: object) -> Optional[float]:
- if value is None:
- return None
- if isinstance(value, str):
- stripped = value.strip()
- if not stripped or stripped.startswith("#"):
- return None
- try:
- return float(value)
- except (TypeError, ValueError):
- return None
- def _normalize_side(value: object) -> Optional[str]:
- if value is None:
- return None
- if isinstance(value, str):
- stripped = value.strip()
- if not stripped:
- return None
- if len(stripped) == 1:
- codepoint = ord(stripped)
- if codepoint == BUY_CODEPOINT:
- return "BUY"
- if codepoint == SELL_CODEPOINT:
- return "SELL"
- if stripped in {"买", "BUY"}:
- return "BUY"
- if stripped in {"卖", "SELL"}:
- return "SELL"
- return None
- class DragonWorkbook:
- def __init__(self, workbook_path: Path):
- self.workbook_path = Path(workbook_path)
- self._events: Optional[list[WorkbookEvent]] = None
- self._annual_summary: Optional[list[dict[str, Optional[float]]]] = None
- def load_events(self) -> list[WorkbookEvent]:
- if self._events is not None:
- return self._events
- wb = load_workbook(self.workbook_path, data_only=True, read_only=True)
- ws = wb.worksheets[0]
- events: list[WorkbookEvent] = []
- for row in ws.iter_rows(min_row=4, values_only=True):
- raw_date = row[0]
- if not isinstance(raw_date, datetime):
- continue
- side = _normalize_side(row[3])
- if side is None:
- continue
- events.append(
- WorkbookEvent(
- date=raw_date.date(),
- index_close=_normalize_optional_float(row[1]),
- prev_index=_normalize_optional_float(row[2]),
- side=side,
- capital=_normalize_optional_float(row[4]),
- pnl=_normalize_optional_float(row[5]),
- kdj=(row[6].strip() if isinstance(row[6], str) else row[6]),
- ql=(row[7].strip() if isinstance(row[7], str) else row[7]),
- note=(row[8].strip() if isinstance(row[8], str) else ""),
- )
- )
- self._events = events
- return events
- def load_annual_summary(self) -> list[dict[str, Optional[float]]]:
- if self._annual_summary is not None:
- return self._annual_summary
- wb = load_workbook(self.workbook_path, data_only=True, read_only=True)
- ws = wb.worksheets[0]
- annual: list[dict[str, Optional[float]]] = []
- for row in ws.iter_rows(values_only=True):
- year = row[0]
- if not isinstance(year, int) or year < 2000 or year > 2100:
- continue
- if not isinstance(row[4], (int, float)):
- continue
- annual.append(
- {
- "year": year,
- "index_return": _normalize_optional_float(row[2]),
- "strategy_return": _normalize_optional_float(row[4]),
- }
- )
- self._annual_summary = annual
- return annual
- def split_layers(self) -> list[SplitEvent]:
- events = self.load_events()
- state = "flat"
- split: list[SplitEvent] = []
- for event in events:
- layer = "aux_signal"
- reason = "unclassified"
- if state == "flat" and event.side == "BUY":
- layer = "real_trade"
- reason = "entry"
- state = "long"
- elif state == "long" and event.side == "SELL":
- layer = "real_trade"
- reason = "exit"
- state = "flat"
- elif state == "flat" and event.side == "SELL":
- reason = "bearish_signal_after_exit"
- elif state == "long" and event.side == "BUY":
- reason = "bullish_signal_while_holding"
- split.append(
- SplitEvent(
- date=event.date,
- side=event.side,
- layer=layer,
- signal_reason=reason,
- index_close=event.index_close,
- prev_index=event.prev_index,
- capital=event.capital,
- pnl=event.pnl,
- kdj=event.kdj,
- ql=event.ql,
- note=event.note,
- )
- )
- return split
- def iter_real_trade_pairs(self) -> Iterable[dict[str, object]]:
- open_buy: Optional[SplitEvent] = None
- for event in self.split_layers():
- if event.layer != "real_trade":
- continue
- if event.side == "BUY":
- open_buy = event
- continue
- if open_buy is None:
- continue
- yield {
- "buy_date": open_buy.date.isoformat(),
- "buy_index": open_buy.index_close,
- "buy_note": open_buy.note,
- "sell_date": event.date.isoformat(),
- "sell_index": event.index_close,
- "sell_note": event.note,
- "holding_days": (event.date - open_buy.date).days,
- "return_pct": event.pnl,
- "ending_capital": event.capital,
- }
- open_buy = None
|