selector.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. """
  2. 品种选择器 (Dragon Selector)
  3. 基于三维度评分系统选择最优交易品种:
  4. 1. 趋势强度 (40分): 价格与均线的关系
  5. 2. 生态匹配度 (30分): 品种与当前生态的适配
  6. 3. 波动率调整 (30分): 低波动加分,高波动减分
  7. """
  8. from typing import Dict, List, Optional, Tuple
  9. from dataclasses import dataclass
  10. from datetime import datetime
  11. import pandas as pd
  12. import numpy as np
  13. from core.ecosystem import UnifiedEcosystem, MacroRegime
  14. @dataclass
  15. class SymbolScore:
  16. """品种评分结果"""
  17. symbol: str
  18. trend_score: float # 0-40
  19. ecosystem_score: float # 0-30
  20. volatility_score: float # 0-30
  21. total_score: float # 0-100
  22. # 详细数据
  23. price_vs_20ma: float # 价格相对于20日均线的位置
  24. price_vs_60ma: float # 价格相对于60日均线的位置
  25. ma20_slope: float # 20日均线斜率
  26. ma60_slope: float # 60日均线斜率
  27. volatility_60d: float # 60日波动率
  28. class DragonSelector:
  29. """
  30. 品种选择器
  31. 每日对三个品种进行评分,选择得分最高且超过阈值的品种交易
  32. """
  33. # 品种生态偏好配置
  34. REGIME_PREFERENCE = {
  35. "csi300": {
  36. MacroRegime.SUMMER: 0,
  37. MacroRegime.AUTUMN: 5,
  38. MacroRegime.WINTER: 8,
  39. MacroRegime.SPRING: 3,
  40. },
  41. "csi500": {
  42. MacroRegime.SUMMER: 3,
  43. MacroRegime.AUTUMN: 3,
  44. MacroRegime.WINTER: 5,
  45. MacroRegime.SPRING: 8,
  46. },
  47. "chinext50": {
  48. MacroRegime.SUMMER: 10,
  49. MacroRegime.AUTUMN: 0,
  50. MacroRegime.WINTER: -10,
  51. MacroRegime.SPRING: 5,
  52. },
  53. }
  54. # 评分阈值
  55. SCORE_THRESHOLD = 60
  56. # 切换成本
  57. SWITCHING_COST = 0.001 # 0.1%
  58. def __init__(
  59. self,
  60. score_threshold: float = 60,
  61. switching_cost: float = 0.001,
  62. volatility_lookback: int = 60
  63. ):
  64. self.score_threshold = score_threshold
  65. self.switching_cost = switching_cost
  66. self.volatility_lookback = volatility_lookback
  67. # 当前选中的品种
  68. self.current_symbol: Optional[str] = None
  69. # 历史评分记录
  70. self.score_history: List[Dict] = []
  71. def select(
  72. self,
  73. data_dict: Dict[str, pd.DataFrame],
  74. ecosystem: UnifiedEcosystem,
  75. current_date: Optional[datetime] = None
  76. ) -> Tuple[Optional[str], Optional[SymbolScore]]:
  77. """
  78. 选择最优品种
  79. Args:
  80. data_dict: 三个品种的数据字典
  81. ecosystem: 当前市场生态
  82. current_date: 当前日期(回测用)
  83. Returns:
  84. (selected_symbol, score_details): 选中的品种和评分详情
  85. """
  86. scores = []
  87. # 计算每个品种的评分
  88. for symbol in data_dict.keys():
  89. score = self._calculate_score(
  90. symbol,
  91. data_dict[symbol],
  92. ecosystem,
  93. current_date
  94. )
  95. scores.append(score)
  96. # 按总分排序
  97. scores.sort(key=lambda x: x.total_score, reverse=True)
  98. # 记录历史
  99. self.score_history.append({
  100. "date": current_date,
  101. "scores": scores,
  102. "ecosystem": ecosystem.macro.regime.value if ecosystem else "unknown"
  103. })
  104. # 选择最高分品种
  105. best_score = scores[0]
  106. # 检查是否超过阈值
  107. if best_score.total_score < self.score_threshold:
  108. return None, best_score
  109. selected = best_score.symbol
  110. # 检查是否需要切换
  111. if self.current_symbol is not None and self.current_symbol != selected:
  112. # 考虑切换成本,只有当得分差 > 15 时才切换
  113. current_score = next((s for s in scores if s.symbol == self.current_symbol), None)
  114. if current_score:
  115. score_diff = best_score.total_score - current_score.total_score
  116. if score_diff < 15:
  117. # 不切换,继续持有可能表现更好的品种
  118. selected = self.current_symbol
  119. best_score = current_score
  120. self.current_symbol = selected
  121. return selected, best_score
  122. def _calculate_score(
  123. self,
  124. symbol: str,
  125. df: pd.DataFrame,
  126. ecosystem: UnifiedEcosystem,
  127. current_date: Optional[datetime] = None
  128. ) -> SymbolScore:
  129. """计算单个品种的评分"""
  130. # 获取数据窗口
  131. if current_date is not None:
  132. df = df[df.index <= current_date]
  133. if len(df) < 60:
  134. # 数据不足,返回低分
  135. return SymbolScore(
  136. symbol=symbol,
  137. trend_score=0,
  138. ecosystem_score=0,
  139. volatility_score=0,
  140. total_score=0,
  141. price_vs_20ma=0,
  142. price_vs_60ma=0,
  143. ma20_slope=0,
  144. ma60_slope=0,
  145. volatility_60d=0
  146. )
  147. close = df['close']
  148. # 计算均线
  149. ma20 = close.rolling(20).mean()
  150. ma60 = close.rolling(60).mean()
  151. current_price = close.iloc[-1]
  152. current_ma20 = ma20.iloc[-1]
  153. current_ma60 = ma60.iloc[-1]
  154. # 计算相对位置
  155. price_vs_20ma = (current_price - current_ma20) / current_ma20
  156. price_vs_60ma = (current_price - current_ma60) / current_ma60
  157. # 计算均线斜率(20日变化率)
  158. ma20_slope = (ma20.iloc[-1] - ma20.iloc[-20]) / ma20.iloc[-20] if len(ma20) >= 20 else 0
  159. ma60_slope = (ma60.iloc[-1] - ma60.iloc[-20]) / ma60.iloc[-20] if len(ma60) >= 20 else 0
  160. # 1. 趋势强度评分 (0-40分)
  161. trend_score = self._calculate_trend_score(
  162. price_vs_20ma, price_vs_60ma, ma20_slope, ma60_slope
  163. )
  164. # 2. 生态匹配度评分 (0-30分)
  165. ecosystem_score = self._calculate_ecosystem_score(symbol, ecosystem)
  166. # 3. 波动率调整评分 (0-30分)
  167. returns = close.pct_change().dropna()
  168. volatility_60d = returns.iloc[-60:].std() * np.sqrt(252)
  169. volatility_score = self._calculate_volatility_score(volatility_60d)
  170. # 总分
  171. total_score = trend_score + ecosystem_score + volatility_score
  172. return SymbolScore(
  173. symbol=symbol,
  174. trend_score=trend_score,
  175. ecosystem_score=ecosystem_score,
  176. volatility_score=volatility_score,
  177. total_score=total_score,
  178. price_vs_20ma=price_vs_20ma,
  179. price_vs_60ma=price_vs_60ma,
  180. ma20_slope=ma20_slope,
  181. ma60_slope=ma60_slope,
  182. volatility_60d=volatility_60d
  183. )
  184. def _calculate_trend_score(
  185. self,
  186. price_vs_20ma: float,
  187. price_vs_60ma: float,
  188. ma20_slope: float,
  189. ma60_slope: float
  190. ) -> float:
  191. """
  192. 计算趋势强度评分 (0-40分)
  193. 完美趋势:价格 > 20MA > 60MA,且均线向上
  194. """
  195. score = 0.0
  196. # 价格位置 (0-15分)
  197. if price_vs_20ma > 0.03: # 价格在20日均线上方3%
  198. score += 15
  199. elif price_vs_20ma > 0:
  200. score += 10
  201. elif price_vs_20ma > -0.02:
  202. score += 5
  203. # 均线排列 (0-15分)
  204. if price_vs_60ma > 0.05: # 价格在60日均线上方5%
  205. score += 15
  206. elif price_vs_60ma > 0:
  207. score += 10
  208. elif price_vs_60ma > -0.03:
  209. score += 5
  210. # 均线斜率 (0-10分)
  211. if ma20_slope > 0.001 and ma60_slope > 0: # 均线向上
  212. score += 10
  213. elif ma20_slope > 0:
  214. score += 5
  215. return min(40, score)
  216. def _calculate_ecosystem_score(
  217. self,
  218. symbol: str,
  219. ecosystem: UnifiedEcosystem
  220. ) -> float:
  221. """
  222. 计算生态匹配度评分 (0-30分)
  223. 基础分20分,根据品种-生态偏好调整
  224. """
  225. base_score = 20.0
  226. if ecosystem and hasattr(ecosystem, 'macro'):
  227. regime = ecosystem.macro.regime
  228. preference = self.REGIME_PREFERENCE.get(symbol, {}).get(regime, 0)
  229. # 加上偏好分数
  230. score = base_score + preference
  231. # 限制在0-30分
  232. return max(0, min(30, score))
  233. return base_score
  234. def _calculate_volatility_score(self, volatility: float) -> float:
  235. """
  236. 计算波动率调整评分 (0-30分)
  237. 低波动加分,高波动减分
  238. """
  239. if volatility < 0.15: # 波动率 < 15%
  240. return 30
  241. elif volatility < 0.20: # 波动率 15-20%
  242. return 25
  243. elif volatility < 0.25: # 波动率 20-25%
  244. return 20
  245. elif volatility < 0.30: # 波动率 25-30%
  246. return 15
  247. elif volatility < 0.35: # 波动率 30-35%
  248. return 10
  249. else: # 波动率 > 35%
  250. return 5
  251. def should_switch(
  252. self,
  253. current_symbol: str,
  254. new_symbol: str,
  255. current_score: float,
  256. new_score: float
  257. ) -> bool:
  258. """
  259. 判断是否值得切换品种
  260. 考虑切换成本,只有当预期收益 > 成本时才切换
  261. """
  262. if current_symbol == new_symbol:
  263. return False
  264. # 得分差需要超过15分才值得切换
  265. score_diff = new_score - current_score
  266. # 切换成本约0.1%,需要预期有额外收益才切换
  267. return score_diff > 15
  268. def get_selection_summary(self) -> Dict:
  269. """获取选择器统计摘要"""
  270. if not self.score_history:
  271. return {}
  272. total_days = len(self.score_history)
  273. # 统计每个品种被选中的次数
  274. symbol_counts = {}
  275. for record in self.score_history:
  276. # 找到最高分品种
  277. scores = record['scores']
  278. if scores:
  279. best = max(scores, key=lambda x: x.total_score)
  280. if best.total_score >= self.score_threshold:
  281. symbol_counts[best.symbol] = symbol_counts.get(best.symbol, 0) + 1
  282. return {
  283. "total_days": total_days,
  284. "symbol_selections": symbol_counts,
  285. "cash_days": total_days - sum(symbol_counts.values())
  286. }