""" 品种选择器 (Dragon Selector) 基于三维度评分系统选择最优交易品种: 1. 趋势强度 (40分): 价格与均线的关系 2. 生态匹配度 (30分): 品种与当前生态的适配 3. 波动率调整 (30分): 低波动加分,高波动减分 """ from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from datetime import datetime import pandas as pd import numpy as np from core.ecosystem import UnifiedEcosystem, MacroRegime @dataclass class SymbolScore: """品种评分结果""" symbol: str trend_score: float # 0-40 ecosystem_score: float # 0-30 volatility_score: float # 0-30 total_score: float # 0-100 # 详细数据 price_vs_20ma: float # 价格相对于20日均线的位置 price_vs_60ma: float # 价格相对于60日均线的位置 ma20_slope: float # 20日均线斜率 ma60_slope: float # 60日均线斜率 volatility_60d: float # 60日波动率 class DragonSelector: """ 品种选择器 每日对三个品种进行评分,选择得分最高且超过阈值的品种交易 """ # 品种生态偏好配置 REGIME_PREFERENCE = { "csi300": { MacroRegime.SUMMER: 0, MacroRegime.AUTUMN: 5, MacroRegime.WINTER: 8, MacroRegime.SPRING: 3, }, "csi500": { MacroRegime.SUMMER: 3, MacroRegime.AUTUMN: 3, MacroRegime.WINTER: 5, MacroRegime.SPRING: 8, }, "chinext50": { MacroRegime.SUMMER: 10, MacroRegime.AUTUMN: 0, MacroRegime.WINTER: -10, MacroRegime.SPRING: 5, }, } # 评分阈值 SCORE_THRESHOLD = 60 # 切换成本 SWITCHING_COST = 0.001 # 0.1% def __init__( self, score_threshold: float = 60, switching_cost: float = 0.001, volatility_lookback: int = 60 ): self.score_threshold = score_threshold self.switching_cost = switching_cost self.volatility_lookback = volatility_lookback # 当前选中的品种 self.current_symbol: Optional[str] = None # 历史评分记录 self.score_history: List[Dict] = [] def select( self, data_dict: Dict[str, pd.DataFrame], ecosystem: UnifiedEcosystem, current_date: Optional[datetime] = None ) -> Tuple[Optional[str], Optional[SymbolScore]]: """ 选择最优品种 Args: data_dict: 三个品种的数据字典 ecosystem: 当前市场生态 current_date: 当前日期(回测用) Returns: (selected_symbol, score_details): 选中的品种和评分详情 """ scores = [] # 计算每个品种的评分 for symbol in data_dict.keys(): score = self._calculate_score( symbol, data_dict[symbol], ecosystem, current_date ) scores.append(score) # 按总分排序 scores.sort(key=lambda x: x.total_score, reverse=True) # 记录历史 self.score_history.append({ "date": current_date, "scores": scores, "ecosystem": ecosystem.macro.regime.value if ecosystem else "unknown" }) # 选择最高分品种 best_score = scores[0] # 检查是否超过阈值 if best_score.total_score < self.score_threshold: return None, best_score selected = best_score.symbol # 检查是否需要切换 if self.current_symbol is not None and self.current_symbol != selected: # 考虑切换成本,只有当得分差 > 15 时才切换 current_score = next((s for s in scores if s.symbol == self.current_symbol), None) if current_score: score_diff = best_score.total_score - current_score.total_score if score_diff < 15: # 不切换,继续持有可能表现更好的品种 selected = self.current_symbol best_score = current_score self.current_symbol = selected return selected, best_score def _calculate_score( self, symbol: str, df: pd.DataFrame, ecosystem: UnifiedEcosystem, current_date: Optional[datetime] = None ) -> SymbolScore: """计算单个品种的评分""" # 获取数据窗口 if current_date is not None: df = df[df.index <= current_date] if len(df) < 60: # 数据不足,返回低分 return SymbolScore( symbol=symbol, trend_score=0, ecosystem_score=0, volatility_score=0, total_score=0, price_vs_20ma=0, price_vs_60ma=0, ma20_slope=0, ma60_slope=0, volatility_60d=0 ) close = df['close'] # 计算均线 ma20 = close.rolling(20).mean() ma60 = close.rolling(60).mean() current_price = close.iloc[-1] current_ma20 = ma20.iloc[-1] current_ma60 = ma60.iloc[-1] # 计算相对位置 price_vs_20ma = (current_price - current_ma20) / current_ma20 price_vs_60ma = (current_price - current_ma60) / current_ma60 # 计算均线斜率(20日变化率) ma20_slope = (ma20.iloc[-1] - ma20.iloc[-20]) / ma20.iloc[-20] if len(ma20) >= 20 else 0 ma60_slope = (ma60.iloc[-1] - ma60.iloc[-20]) / ma60.iloc[-20] if len(ma60) >= 20 else 0 # 1. 趋势强度评分 (0-40分) trend_score = self._calculate_trend_score( price_vs_20ma, price_vs_60ma, ma20_slope, ma60_slope ) # 2. 生态匹配度评分 (0-30分) ecosystem_score = self._calculate_ecosystem_score(symbol, ecosystem) # 3. 波动率调整评分 (0-30分) returns = close.pct_change().dropna() volatility_60d = returns.iloc[-60:].std() * np.sqrt(252) volatility_score = self._calculate_volatility_score(volatility_60d) # 总分 total_score = trend_score + ecosystem_score + volatility_score return SymbolScore( symbol=symbol, trend_score=trend_score, ecosystem_score=ecosystem_score, volatility_score=volatility_score, total_score=total_score, price_vs_20ma=price_vs_20ma, price_vs_60ma=price_vs_60ma, ma20_slope=ma20_slope, ma60_slope=ma60_slope, volatility_60d=volatility_60d ) def _calculate_trend_score( self, price_vs_20ma: float, price_vs_60ma: float, ma20_slope: float, ma60_slope: float ) -> float: """ 计算趋势强度评分 (0-40分) 完美趋势:价格 > 20MA > 60MA,且均线向上 """ score = 0.0 # 价格位置 (0-15分) if price_vs_20ma > 0.03: # 价格在20日均线上方3% score += 15 elif price_vs_20ma > 0: score += 10 elif price_vs_20ma > -0.02: score += 5 # 均线排列 (0-15分) if price_vs_60ma > 0.05: # 价格在60日均线上方5% score += 15 elif price_vs_60ma > 0: score += 10 elif price_vs_60ma > -0.03: score += 5 # 均线斜率 (0-10分) if ma20_slope > 0.001 and ma60_slope > 0: # 均线向上 score += 10 elif ma20_slope > 0: score += 5 return min(40, score) def _calculate_ecosystem_score( self, symbol: str, ecosystem: UnifiedEcosystem ) -> float: """ 计算生态匹配度评分 (0-30分) 基础分20分,根据品种-生态偏好调整 """ base_score = 20.0 if ecosystem and hasattr(ecosystem, 'macro'): regime = ecosystem.macro.regime preference = self.REGIME_PREFERENCE.get(symbol, {}).get(regime, 0) # 加上偏好分数 score = base_score + preference # 限制在0-30分 return max(0, min(30, score)) return base_score def _calculate_volatility_score(self, volatility: float) -> float: """ 计算波动率调整评分 (0-30分) 低波动加分,高波动减分 """ if volatility < 0.15: # 波动率 < 15% return 30 elif volatility < 0.20: # 波动率 15-20% return 25 elif volatility < 0.25: # 波动率 20-25% return 20 elif volatility < 0.30: # 波动率 25-30% return 15 elif volatility < 0.35: # 波动率 30-35% return 10 else: # 波动率 > 35% return 5 def should_switch( self, current_symbol: str, new_symbol: str, current_score: float, new_score: float ) -> bool: """ 判断是否值得切换品种 考虑切换成本,只有当预期收益 > 成本时才切换 """ if current_symbol == new_symbol: return False # 得分差需要超过15分才值得切换 score_diff = new_score - current_score # 切换成本约0.1%,需要预期有额外收益才切换 return score_diff > 15 def get_selection_summary(self) -> Dict: """获取选择器统计摘要""" if not self.score_history: return {} total_days = len(self.score_history) # 统计每个品种被选中的次数 symbol_counts = {} for record in self.score_history: # 找到最高分品种 scores = record['scores'] if scores: best = max(scores, key=lambda x: x.total_score) if best.total_score >= self.score_threshold: symbol_counts[best.symbol] = symbol_counts.get(best.symbol, 0) + 1 return { "total_days": total_days, "symbol_selections": symbol_counts, "cash_days": total_days - sum(symbol_counts.values()) }