dragon_workbook.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from __future__ import annotations
  2. from dataclasses import dataclass
  3. from datetime import date, datetime
  4. from pathlib import Path
  5. from typing import Iterable, Optional
  6. from openpyxl import load_workbook
  7. BUY_CODEPOINT = 0x4E70
  8. SELL_CODEPOINT = 0x5356
  9. @dataclass(frozen=True)
  10. class WorkbookEvent:
  11. date: date
  12. index_close: Optional[float]
  13. prev_index: Optional[float]
  14. side: str
  15. capital: Optional[float]
  16. pnl: Optional[float]
  17. kdj: Optional[str]
  18. ql: Optional[str]
  19. note: str
  20. @dataclass(frozen=True)
  21. class SplitEvent:
  22. date: date
  23. side: str
  24. layer: str
  25. signal_reason: str
  26. index_close: Optional[float]
  27. prev_index: Optional[float]
  28. capital: Optional[float]
  29. pnl: Optional[float]
  30. kdj: Optional[str]
  31. ql: Optional[str]
  32. note: str
  33. def _normalize_optional_float(value: object) -> Optional[float]:
  34. if value is None:
  35. return None
  36. if isinstance(value, str):
  37. stripped = value.strip()
  38. if not stripped or stripped.startswith("#"):
  39. return None
  40. try:
  41. return float(value)
  42. except (TypeError, ValueError):
  43. return None
  44. def _normalize_side(value: object) -> Optional[str]:
  45. if value is None:
  46. return None
  47. if isinstance(value, str):
  48. stripped = value.strip()
  49. if not stripped:
  50. return None
  51. if len(stripped) == 1:
  52. codepoint = ord(stripped)
  53. if codepoint == BUY_CODEPOINT:
  54. return "BUY"
  55. if codepoint == SELL_CODEPOINT:
  56. return "SELL"
  57. if stripped in {"买", "BUY"}:
  58. return "BUY"
  59. if stripped in {"卖", "SELL"}:
  60. return "SELL"
  61. return None
  62. class DragonWorkbook:
  63. def __init__(self, workbook_path: Path):
  64. self.workbook_path = Path(workbook_path)
  65. self._events: Optional[list[WorkbookEvent]] = None
  66. self._annual_summary: Optional[list[dict[str, Optional[float]]]] = None
  67. def load_events(self) -> list[WorkbookEvent]:
  68. if self._events is not None:
  69. return self._events
  70. wb = load_workbook(self.workbook_path, data_only=True, read_only=True)
  71. ws = wb.worksheets[0]
  72. events: list[WorkbookEvent] = []
  73. for row in ws.iter_rows(min_row=4, values_only=True):
  74. raw_date = row[0]
  75. if not isinstance(raw_date, datetime):
  76. continue
  77. side = _normalize_side(row[3])
  78. if side is None:
  79. continue
  80. events.append(
  81. WorkbookEvent(
  82. date=raw_date.date(),
  83. index_close=_normalize_optional_float(row[1]),
  84. prev_index=_normalize_optional_float(row[2]),
  85. side=side,
  86. capital=_normalize_optional_float(row[4]),
  87. pnl=_normalize_optional_float(row[5]),
  88. kdj=(row[6].strip() if isinstance(row[6], str) else row[6]),
  89. ql=(row[7].strip() if isinstance(row[7], str) else row[7]),
  90. note=(row[8].strip() if isinstance(row[8], str) else ""),
  91. )
  92. )
  93. self._events = events
  94. return events
  95. def load_annual_summary(self) -> list[dict[str, Optional[float]]]:
  96. if self._annual_summary is not None:
  97. return self._annual_summary
  98. wb = load_workbook(self.workbook_path, data_only=True, read_only=True)
  99. ws = wb.worksheets[0]
  100. annual: list[dict[str, Optional[float]]] = []
  101. for row in ws.iter_rows(values_only=True):
  102. year = row[0]
  103. if not isinstance(year, int) or year < 2000 or year > 2100:
  104. continue
  105. if not isinstance(row[4], (int, float)):
  106. continue
  107. annual.append(
  108. {
  109. "year": year,
  110. "index_return": _normalize_optional_float(row[2]),
  111. "strategy_return": _normalize_optional_float(row[4]),
  112. }
  113. )
  114. self._annual_summary = annual
  115. return annual
  116. def split_layers(self) -> list[SplitEvent]:
  117. events = self.load_events()
  118. state = "flat"
  119. split: list[SplitEvent] = []
  120. for event in events:
  121. layer = "aux_signal"
  122. reason = "unclassified"
  123. if state == "flat" and event.side == "BUY":
  124. layer = "real_trade"
  125. reason = "entry"
  126. state = "long"
  127. elif state == "long" and event.side == "SELL":
  128. layer = "real_trade"
  129. reason = "exit"
  130. state = "flat"
  131. elif state == "flat" and event.side == "SELL":
  132. reason = "bearish_signal_after_exit"
  133. elif state == "long" and event.side == "BUY":
  134. reason = "bullish_signal_while_holding"
  135. split.append(
  136. SplitEvent(
  137. date=event.date,
  138. side=event.side,
  139. layer=layer,
  140. signal_reason=reason,
  141. index_close=event.index_close,
  142. prev_index=event.prev_index,
  143. capital=event.capital,
  144. pnl=event.pnl,
  145. kdj=event.kdj,
  146. ql=event.ql,
  147. note=event.note,
  148. )
  149. )
  150. return split
  151. def iter_real_trade_pairs(self) -> Iterable[dict[str, object]]:
  152. open_buy: Optional[SplitEvent] = None
  153. for event in self.split_layers():
  154. if event.layer != "real_trade":
  155. continue
  156. if event.side == "BUY":
  157. open_buy = event
  158. continue
  159. if open_buy is None:
  160. continue
  161. yield {
  162. "buy_date": open_buy.date.isoformat(),
  163. "buy_index": open_buy.index_close,
  164. "buy_note": open_buy.note,
  165. "sell_date": event.date.isoformat(),
  166. "sell_index": event.index_close,
  167. "sell_note": event.note,
  168. "holding_days": (event.date - open_buy.date).days,
  169. "return_pct": event.pnl,
  170. "ending_capital": event.capital,
  171. }
  172. open_buy = None