| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- """
- 品种选择器 (Dragon Selector)
- 基于三维度评分系统选择最优交易品种:
- 1. 趋势强度 (40分): 价格与均线的关系
- 2. 生态匹配度 (30分): 品种与当前生态的适配
- 3. 波动率调整 (30分): 低波动加分,高波动减分
- """
- from typing import Dict, List, Optional, Tuple
- from dataclasses import dataclass
- from datetime import datetime
- import pandas as pd
- import numpy as np
- from core.ecosystem import UnifiedEcosystem, MacroRegime
- @dataclass
- class SymbolScore:
- """品种评分结果"""
- symbol: str
- trend_score: float # 0-40
- ecosystem_score: float # 0-30
- volatility_score: float # 0-30
- total_score: float # 0-100
- # 详细数据
- price_vs_20ma: float # 价格相对于20日均线的位置
- price_vs_60ma: float # 价格相对于60日均线的位置
- ma20_slope: float # 20日均线斜率
- ma60_slope: float # 60日均线斜率
- volatility_60d: float # 60日波动率
- class DragonSelector:
- """
- 品种选择器
- 每日对三个品种进行评分,选择得分最高且超过阈值的品种交易
- """
- # 品种生态偏好配置
- REGIME_PREFERENCE = {
- "csi300": {
- MacroRegime.SUMMER: 0,
- MacroRegime.AUTUMN: 5,
- MacroRegime.WINTER: 8,
- MacroRegime.SPRING: 3,
- },
- "csi500": {
- MacroRegime.SUMMER: 3,
- MacroRegime.AUTUMN: 3,
- MacroRegime.WINTER: 5,
- MacroRegime.SPRING: 8,
- },
- "chinext50": {
- MacroRegime.SUMMER: 10,
- MacroRegime.AUTUMN: 0,
- MacroRegime.WINTER: -10,
- MacroRegime.SPRING: 5,
- },
- }
- # 评分阈值
- SCORE_THRESHOLD = 60
- # 切换成本
- SWITCHING_COST = 0.001 # 0.1%
- def __init__(
- self,
- score_threshold: float = 60,
- switching_cost: float = 0.001,
- volatility_lookback: int = 60
- ):
- self.score_threshold = score_threshold
- self.switching_cost = switching_cost
- self.volatility_lookback = volatility_lookback
- # 当前选中的品种
- self.current_symbol: Optional[str] = None
- # 历史评分记录
- self.score_history: List[Dict] = []
- def select(
- self,
- data_dict: Dict[str, pd.DataFrame],
- ecosystem: UnifiedEcosystem,
- current_date: Optional[datetime] = None
- ) -> Tuple[Optional[str], Optional[SymbolScore]]:
- """
- 选择最优品种
- Args:
- data_dict: 三个品种的数据字典
- ecosystem: 当前市场生态
- current_date: 当前日期(回测用)
- Returns:
- (selected_symbol, score_details): 选中的品种和评分详情
- """
- scores = []
- # 计算每个品种的评分
- for symbol in data_dict.keys():
- score = self._calculate_score(
- symbol,
- data_dict[symbol],
- ecosystem,
- current_date
- )
- scores.append(score)
- # 按总分排序
- scores.sort(key=lambda x: x.total_score, reverse=True)
- # 记录历史
- self.score_history.append({
- "date": current_date,
- "scores": scores,
- "ecosystem": ecosystem.macro.regime.value if ecosystem else "unknown"
- })
- # 选择最高分品种
- best_score = scores[0]
- # 检查是否超过阈值
- if best_score.total_score < self.score_threshold:
- return None, best_score
- selected = best_score.symbol
- # 检查是否需要切换
- if self.current_symbol is not None and self.current_symbol != selected:
- # 考虑切换成本,只有当得分差 > 15 时才切换
- current_score = next((s for s in scores if s.symbol == self.current_symbol), None)
- if current_score:
- score_diff = best_score.total_score - current_score.total_score
- if score_diff < 15:
- # 不切换,继续持有可能表现更好的品种
- selected = self.current_symbol
- best_score = current_score
- self.current_symbol = selected
- return selected, best_score
- def _calculate_score(
- self,
- symbol: str,
- df: pd.DataFrame,
- ecosystem: UnifiedEcosystem,
- current_date: Optional[datetime] = None
- ) -> SymbolScore:
- """计算单个品种的评分"""
- # 获取数据窗口
- if current_date is not None:
- df = df[df.index <= current_date]
- if len(df) < 60:
- # 数据不足,返回低分
- return SymbolScore(
- symbol=symbol,
- trend_score=0,
- ecosystem_score=0,
- volatility_score=0,
- total_score=0,
- price_vs_20ma=0,
- price_vs_60ma=0,
- ma20_slope=0,
- ma60_slope=0,
- volatility_60d=0
- )
- close = df['close']
- # 计算均线
- ma20 = close.rolling(20).mean()
- ma60 = close.rolling(60).mean()
- current_price = close.iloc[-1]
- current_ma20 = ma20.iloc[-1]
- current_ma60 = ma60.iloc[-1]
- # 计算相对位置
- price_vs_20ma = (current_price - current_ma20) / current_ma20
- price_vs_60ma = (current_price - current_ma60) / current_ma60
- # 计算均线斜率(20日变化率)
- ma20_slope = (ma20.iloc[-1] - ma20.iloc[-20]) / ma20.iloc[-20] if len(ma20) >= 20 else 0
- ma60_slope = (ma60.iloc[-1] - ma60.iloc[-20]) / ma60.iloc[-20] if len(ma60) >= 20 else 0
- # 1. 趋势强度评分 (0-40分)
- trend_score = self._calculate_trend_score(
- price_vs_20ma, price_vs_60ma, ma20_slope, ma60_slope
- )
- # 2. 生态匹配度评分 (0-30分)
- ecosystem_score = self._calculate_ecosystem_score(symbol, ecosystem)
- # 3. 波动率调整评分 (0-30分)
- returns = close.pct_change().dropna()
- volatility_60d = returns.iloc[-60:].std() * np.sqrt(252)
- volatility_score = self._calculate_volatility_score(volatility_60d)
- # 总分
- total_score = trend_score + ecosystem_score + volatility_score
- return SymbolScore(
- symbol=symbol,
- trend_score=trend_score,
- ecosystem_score=ecosystem_score,
- volatility_score=volatility_score,
- total_score=total_score,
- price_vs_20ma=price_vs_20ma,
- price_vs_60ma=price_vs_60ma,
- ma20_slope=ma20_slope,
- ma60_slope=ma60_slope,
- volatility_60d=volatility_60d
- )
- def _calculate_trend_score(
- self,
- price_vs_20ma: float,
- price_vs_60ma: float,
- ma20_slope: float,
- ma60_slope: float
- ) -> float:
- """
- 计算趋势强度评分 (0-40分)
- 完美趋势:价格 > 20MA > 60MA,且均线向上
- """
- score = 0.0
- # 价格位置 (0-15分)
- if price_vs_20ma > 0.03: # 价格在20日均线上方3%
- score += 15
- elif price_vs_20ma > 0:
- score += 10
- elif price_vs_20ma > -0.02:
- score += 5
- # 均线排列 (0-15分)
- if price_vs_60ma > 0.05: # 价格在60日均线上方5%
- score += 15
- elif price_vs_60ma > 0:
- score += 10
- elif price_vs_60ma > -0.03:
- score += 5
- # 均线斜率 (0-10分)
- if ma20_slope > 0.001 and ma60_slope > 0: # 均线向上
- score += 10
- elif ma20_slope > 0:
- score += 5
- return min(40, score)
- def _calculate_ecosystem_score(
- self,
- symbol: str,
- ecosystem: UnifiedEcosystem
- ) -> float:
- """
- 计算生态匹配度评分 (0-30分)
- 基础分20分,根据品种-生态偏好调整
- """
- base_score = 20.0
- if ecosystem and hasattr(ecosystem, 'macro'):
- regime = ecosystem.macro.regime
- preference = self.REGIME_PREFERENCE.get(symbol, {}).get(regime, 0)
- # 加上偏好分数
- score = base_score + preference
- # 限制在0-30分
- return max(0, min(30, score))
- return base_score
- def _calculate_volatility_score(self, volatility: float) -> float:
- """
- 计算波动率调整评分 (0-30分)
- 低波动加分,高波动减分
- """
- if volatility < 0.15: # 波动率 < 15%
- return 30
- elif volatility < 0.20: # 波动率 15-20%
- return 25
- elif volatility < 0.25: # 波动率 20-25%
- return 20
- elif volatility < 0.30: # 波动率 25-30%
- return 15
- elif volatility < 0.35: # 波动率 30-35%
- return 10
- else: # 波动率 > 35%
- return 5
- def should_switch(
- self,
- current_symbol: str,
- new_symbol: str,
- current_score: float,
- new_score: float
- ) -> bool:
- """
- 判断是否值得切换品种
- 考虑切换成本,只有当预期收益 > 成本时才切换
- """
- if current_symbol == new_symbol:
- return False
- # 得分差需要超过15分才值得切换
- score_diff = new_score - current_score
- # 切换成本约0.1%,需要预期有额外收益才切换
- return score_diff > 15
- def get_selection_summary(self) -> Dict:
- """获取选择器统计摘要"""
- if not self.score_history:
- return {}
- total_days = len(self.score_history)
- # 统计每个品种被选中的次数
- symbol_counts = {}
- for record in self.score_history:
- # 找到最高分品种
- scores = record['scores']
- if scores:
- best = max(scores, key=lambda x: x.total_score)
- if best.total_score >= self.score_threshold:
- symbol_counts[best.symbol] = symbol_counts.get(best.symbol, 0) + 1
- return {
- "total_days": total_days,
- "symbol_selections": symbol_counts,
- "cash_days": total_days - sum(symbol_counts.values())
- }
|