""" 动态仓位管理器 (Position Manager) 核心规则: 1. 总仓位上限 80% 2. 单品种最大仓位:沪深300 40% / 中证500 50% / 创业板50 60% 3. 波动率调整:>30%降仓40%,<15%加仓20% 4. 每周再平衡 """ from typing import Dict, Optional from dataclasses import dataclass from datetime import datetime import pandas as pd import numpy as np @dataclass class PositionConfig: """仓位配置""" symbol: str base_position: float # 基础仓位比例 max_position: float # 最大仓位限制 volatility: float # 当前波动率 adjusted_position: float # 调整后仓位 class PositionManager: """ 动态仓位管理器 根据品种特性和市场波动率动态调整仓位 """ # 品种最大仓位限制 SYMBOL_MAX_POSITION = { "csi300": 0.40, # 沪深300:低波动,最大40% "csi500": 0.50, # 中证500:中等波动,最大50% "chinext50": 0.60, # 创业板50:高波动,最大60% } # 总仓位上限 TOTAL_MAX_POSITION = 0.80 # 波动率调整阈值 VOL_LOW_THRESHOLD = 0.15 # 低波动阈值 VOL_HIGH_THRESHOLD = 0.30 # 高波动阈值 VOL_ADJUST_LOW = 1.20 # 低波动加仓20% VOL_ADJUST_HIGH = 0.60 # 高波动降仓40% def __init__( self, total_max: float = 0.80, rebalance_freq: int = 5 # 每5天再平衡 ): self.total_max = total_max self.rebalance_freq = rebalance_freq # 当前持仓 self.current_positions: Dict[str, float] = {} self.total_position = 0.0 # 上次再平衡日期 self.last_rebalance: Optional[datetime] = None # 历史记录 self.position_history: list = [] def calculate_position( self, symbol: str, confidence: float, volatility: float, current_date: Optional[datetime] = None ) -> float: """ 计算建议仓位 Args: symbol: 品种代码 confidence: 信号置信度 (0-1) volatility: 60日年化波动率 current_date: 当前日期 Returns: float: 建议仓位比例 (0-1) """ # 1. 基础仓位 = 最大仓位 × 置信度 max_pos = self.SYMBOL_MAX_POSITION.get(symbol, 0.40) base_position = max_pos * confidence # 2. 波动率调整 vol_adjustment = self._calculate_volatility_adjustment(volatility) adjusted_position = base_position * vol_adjustment # 3. 检查总仓位限制 other_positions = sum( pos for sym, pos in self.current_positions.items() if sym != symbol ) available = self.total_max - other_positions # 4. 返回最终仓位(不超过可用额度) final_position = min(adjusted_position, available) final_position = max(0, final_position) # 不小于0 # 记录 self.current_positions[symbol] = final_position self.total_position = sum(self.current_positions.values()) if current_date: self.position_history.append({ "date": current_date, "symbol": symbol, "base": base_position, "vol_adjust": vol_adjustment, "adjusted": adjusted_position, "final": final_position, "total": self.total_position }) return final_position def _calculate_volatility_adjustment(self, volatility: float) -> float: """ 计算波动率调整系数 低波动加仓,高波动减仓 """ if volatility < self.VOL_LOW_THRESHOLD: # 低波动:加仓20% return self.VOL_ADJUST_LOW elif volatility > self.VOL_HIGH_THRESHOLD: # 高波动:降仓40% return self.VOL_ADJUST_HIGH else: # 中等波动:线性插值 # 15% -> 1.2, 30% -> 0.6 ratio = (volatility - self.VOL_LOW_THRESHOLD) / \ (self.VOL_HIGH_THRESHOLD - self.VOL_LOW_THRESHOLD) return self.VOL_ADJUST_LOW - ratio * (self.VOL_ADJUST_LOW - self.VOL_ADJUST_HIGH) def should_rebalance(self, current_date: datetime) -> bool: """检查是否需要再平衡""" if self.last_rebalance is None: return True days_since = (current_date - self.last_rebalance).days return days_since >= self.rebalance_freq def rebalance( self, data_dict: Dict[str, pd.DataFrame], current_date: datetime ) -> Dict[str, float]: """ 执行再平衡 根据最新波动率调整所有持仓 """ new_positions = {} for symbol, current_pos in self.current_positions.items(): if symbol not in data_dict: continue df = data_dict[symbol] df = df[df.index <= current_date] if len(df) < 60: continue # 重新计算波动率 returns = df['close'].pct_change().dropna() volatility = returns.iloc[-60:].std() * np.sqrt(252) # 重新计算仓位 vol_adjust = self._calculate_volatility_adjustment(volatility) max_pos = self.SYMBOL_MAX_POSITION.get(symbol, 0.40) # 保持原置信度,调整波动率 if current_pos > 0: confidence = current_pos / (max_pos * vol_adjust) confidence = min(1.0, confidence) else: confidence = 0 new_pos = max_pos * confidence * vol_adjust new_positions[symbol] = new_pos # 归一化确保总仓位不超限制 total = sum(new_positions.values()) if total > self.total_max: scale = self.total_max / total new_positions = {k: v * scale for k, v in new_positions.items()} self.current_positions = new_positions self.total_position = sum(new_positions.values()) self.last_rebalance = current_date return new_positions def reduce_position(self, symbol: str, reduction_pct: float) -> float: """ 减仓指定比例 用于风控触发时的紧急减仓 """ if symbol not in self.current_positions: return 0.0 current = self.current_positions[symbol] new_pos = current * (1 - reduction_pct) new_pos = max(0, new_pos) self.current_positions[symbol] = new_pos self.total_position = sum(self.current_positions.values()) return new_pos def close_position(self, symbol: str) -> float: """清仓指定品种""" if symbol not in self.current_positions: return 0.0 closed_amount = self.current_positions[symbol] self.current_positions[symbol] = 0.0 self.total_position = sum(self.current_positions.values()) return closed_amount def close_all_positions(self) -> Dict[str, float]: """清仓所有品种""" closed = self.current_positions.copy() self.current_positions = {} self.total_position = 0.0 return closed def get_position_summary(self) -> Dict: """获取仓位摘要""" return { "total_position": self.total_position, "available": self.total_max - self.total_position, "positions": self.current_positions.copy(), "last_rebalance": self.last_rebalance } def can_open_position( self, symbol: str, desired_position: float ) -> bool: """检查是否可以开仓""" other_positions = sum( pos for sym, pos in self.current_positions.items() if sym != symbol ) return other_positions + desired_position <= self.total_max