| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """
- 动态仓位管理器 (Position Manager)
- 核心规则:
- 1. 总仓位上限 80%
- 2. 单品种最大仓位:沪深300 40% / 中证500 50% / 创业板50 60%
- 3. 波动率调整:>30%降仓40%,<15%加仓20%
- 4. 每周再平衡
- """
- from typing import Dict, Optional
- from dataclasses import dataclass
- from datetime import datetime
- import pandas as pd
- import numpy as np
- @dataclass
- class PositionConfig:
- """仓位配置"""
- symbol: str
- base_position: float # 基础仓位比例
- max_position: float # 最大仓位限制
- volatility: float # 当前波动率
- adjusted_position: float # 调整后仓位
- class PositionManager:
- """
- 动态仓位管理器
- 根据品种特性和市场波动率动态调整仓位
- """
- # 品种最大仓位限制
- SYMBOL_MAX_POSITION = {
- "csi300": 0.40, # 沪深300:低波动,最大40%
- "csi500": 0.50, # 中证500:中等波动,最大50%
- "chinext50": 0.60, # 创业板50:高波动,最大60%
- }
- # 总仓位上限
- TOTAL_MAX_POSITION = 0.80
- # 波动率调整阈值
- VOL_LOW_THRESHOLD = 0.15 # 低波动阈值
- VOL_HIGH_THRESHOLD = 0.30 # 高波动阈值
- VOL_ADJUST_LOW = 1.20 # 低波动加仓20%
- VOL_ADJUST_HIGH = 0.60 # 高波动降仓40%
- def __init__(
- self,
- total_max: float = 0.80,
- rebalance_freq: int = 5 # 每5天再平衡
- ):
- self.total_max = total_max
- self.rebalance_freq = rebalance_freq
- # 当前持仓
- self.current_positions: Dict[str, float] = {}
- self.total_position = 0.0
- # 上次再平衡日期
- self.last_rebalance: Optional[datetime] = None
- # 历史记录
- self.position_history: list = []
- def calculate_position(
- self,
- symbol: str,
- confidence: float,
- volatility: float,
- current_date: Optional[datetime] = None
- ) -> float:
- """
- 计算建议仓位
- Args:
- symbol: 品种代码
- confidence: 信号置信度 (0-1)
- volatility: 60日年化波动率
- current_date: 当前日期
- Returns:
- float: 建议仓位比例 (0-1)
- """
- # 1. 基础仓位 = 最大仓位 × 置信度
- max_pos = self.SYMBOL_MAX_POSITION.get(symbol, 0.40)
- base_position = max_pos * confidence
- # 2. 波动率调整
- vol_adjustment = self._calculate_volatility_adjustment(volatility)
- adjusted_position = base_position * vol_adjustment
- # 3. 检查总仓位限制
- other_positions = sum(
- pos for sym, pos in self.current_positions.items() if sym != symbol
- )
- available = self.total_max - other_positions
- # 4. 返回最终仓位(不超过可用额度)
- final_position = min(adjusted_position, available)
- final_position = max(0, final_position) # 不小于0
- # 记录
- self.current_positions[symbol] = final_position
- self.total_position = sum(self.current_positions.values())
- if current_date:
- self.position_history.append({
- "date": current_date,
- "symbol": symbol,
- "base": base_position,
- "vol_adjust": vol_adjustment,
- "adjusted": adjusted_position,
- "final": final_position,
- "total": self.total_position
- })
- return final_position
- def _calculate_volatility_adjustment(self, volatility: float) -> float:
- """
- 计算波动率调整系数
- 低波动加仓,高波动减仓
- """
- if volatility < self.VOL_LOW_THRESHOLD:
- # 低波动:加仓20%
- return self.VOL_ADJUST_LOW
- elif volatility > self.VOL_HIGH_THRESHOLD:
- # 高波动:降仓40%
- return self.VOL_ADJUST_HIGH
- else:
- # 中等波动:线性插值
- # 15% -> 1.2, 30% -> 0.6
- ratio = (volatility - self.VOL_LOW_THRESHOLD) / \
- (self.VOL_HIGH_THRESHOLD - self.VOL_LOW_THRESHOLD)
- return self.VOL_ADJUST_LOW - ratio * (self.VOL_ADJUST_LOW - self.VOL_ADJUST_HIGH)
- def should_rebalance(self, current_date: datetime) -> bool:
- """检查是否需要再平衡"""
- if self.last_rebalance is None:
- return True
- days_since = (current_date - self.last_rebalance).days
- return days_since >= self.rebalance_freq
- def rebalance(
- self,
- data_dict: Dict[str, pd.DataFrame],
- current_date: datetime
- ) -> Dict[str, float]:
- """
- 执行再平衡
- 根据最新波动率调整所有持仓
- """
- new_positions = {}
- for symbol, current_pos in self.current_positions.items():
- if symbol not in data_dict:
- continue
- df = data_dict[symbol]
- df = df[df.index <= current_date]
- if len(df) < 60:
- continue
- # 重新计算波动率
- returns = df['close'].pct_change().dropna()
- volatility = returns.iloc[-60:].std() * np.sqrt(252)
- # 重新计算仓位
- vol_adjust = self._calculate_volatility_adjustment(volatility)
- max_pos = self.SYMBOL_MAX_POSITION.get(symbol, 0.40)
- # 保持原置信度,调整波动率
- if current_pos > 0:
- confidence = current_pos / (max_pos * vol_adjust)
- confidence = min(1.0, confidence)
- else:
- confidence = 0
- new_pos = max_pos * confidence * vol_adjust
- new_positions[symbol] = new_pos
- # 归一化确保总仓位不超限制
- total = sum(new_positions.values())
- if total > self.total_max:
- scale = self.total_max / total
- new_positions = {k: v * scale for k, v in new_positions.items()}
- self.current_positions = new_positions
- self.total_position = sum(new_positions.values())
- self.last_rebalance = current_date
- return new_positions
- def reduce_position(self, symbol: str, reduction_pct: float) -> float:
- """
- 减仓指定比例
- 用于风控触发时的紧急减仓
- """
- if symbol not in self.current_positions:
- return 0.0
- current = self.current_positions[symbol]
- new_pos = current * (1 - reduction_pct)
- new_pos = max(0, new_pos)
- self.current_positions[symbol] = new_pos
- self.total_position = sum(self.current_positions.values())
- return new_pos
- def close_position(self, symbol: str) -> float:
- """清仓指定品种"""
- if symbol not in self.current_positions:
- return 0.0
- closed_amount = self.current_positions[symbol]
- self.current_positions[symbol] = 0.0
- self.total_position = sum(self.current_positions.values())
- return closed_amount
- def close_all_positions(self) -> Dict[str, float]:
- """清仓所有品种"""
- closed = self.current_positions.copy()
- self.current_positions = {}
- self.total_position = 0.0
- return closed
- def get_position_summary(self) -> Dict:
- """获取仓位摘要"""
- return {
- "total_position": self.total_position,
- "available": self.total_max - self.total_position,
- "positions": self.current_positions.copy(),
- "last_rebalance": self.last_rebalance
- }
- def can_open_position(
- self,
- symbol: str,
- desired_position: float
- ) -> bool:
- """检查是否可以开仓"""
- other_positions = sum(
- pos for sym, pos in self.current_positions.items() if sym != symbol
- )
- return other_positions + desired_position <= self.total_max
|