position_manager.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """
  2. 动态仓位管理器 (Position Manager)
  3. 核心规则:
  4. 1. 总仓位上限 80%
  5. 2. 单品种最大仓位:沪深300 40% / 中证500 50% / 创业板50 60%
  6. 3. 波动率调整:>30%降仓40%,<15%加仓20%
  7. 4. 每周再平衡
  8. """
  9. from typing import Dict, Optional
  10. from dataclasses import dataclass
  11. from datetime import datetime
  12. import pandas as pd
  13. import numpy as np
  14. @dataclass
  15. class PositionConfig:
  16. """仓位配置"""
  17. symbol: str
  18. base_position: float # 基础仓位比例
  19. max_position: float # 最大仓位限制
  20. volatility: float # 当前波动率
  21. adjusted_position: float # 调整后仓位
  22. class PositionManager:
  23. """
  24. 动态仓位管理器
  25. 根据品种特性和市场波动率动态调整仓位
  26. """
  27. # 品种最大仓位限制
  28. SYMBOL_MAX_POSITION = {
  29. "csi300": 0.40, # 沪深300:低波动,最大40%
  30. "csi500": 0.50, # 中证500:中等波动,最大50%
  31. "chinext50": 0.60, # 创业板50:高波动,最大60%
  32. }
  33. # 总仓位上限
  34. TOTAL_MAX_POSITION = 0.80
  35. # 波动率调整阈值
  36. VOL_LOW_THRESHOLD = 0.15 # 低波动阈值
  37. VOL_HIGH_THRESHOLD = 0.30 # 高波动阈值
  38. VOL_ADJUST_LOW = 1.20 # 低波动加仓20%
  39. VOL_ADJUST_HIGH = 0.60 # 高波动降仓40%
  40. def __init__(
  41. self,
  42. total_max: float = 0.80,
  43. rebalance_freq: int = 5 # 每5天再平衡
  44. ):
  45. self.total_max = total_max
  46. self.rebalance_freq = rebalance_freq
  47. # 当前持仓
  48. self.current_positions: Dict[str, float] = {}
  49. self.total_position = 0.0
  50. # 上次再平衡日期
  51. self.last_rebalance: Optional[datetime] = None
  52. # 历史记录
  53. self.position_history: list = []
  54. def calculate_position(
  55. self,
  56. symbol: str,
  57. confidence: float,
  58. volatility: float,
  59. current_date: Optional[datetime] = None
  60. ) -> float:
  61. """
  62. 计算建议仓位
  63. Args:
  64. symbol: 品种代码
  65. confidence: 信号置信度 (0-1)
  66. volatility: 60日年化波动率
  67. current_date: 当前日期
  68. Returns:
  69. float: 建议仓位比例 (0-1)
  70. """
  71. # 1. 基础仓位 = 最大仓位 × 置信度
  72. max_pos = self.SYMBOL_MAX_POSITION.get(symbol, 0.40)
  73. base_position = max_pos * confidence
  74. # 2. 波动率调整
  75. vol_adjustment = self._calculate_volatility_adjustment(volatility)
  76. adjusted_position = base_position * vol_adjustment
  77. # 3. 检查总仓位限制
  78. other_positions = sum(
  79. pos for sym, pos in self.current_positions.items() if sym != symbol
  80. )
  81. available = self.total_max - other_positions
  82. # 4. 返回最终仓位(不超过可用额度)
  83. final_position = min(adjusted_position, available)
  84. final_position = max(0, final_position) # 不小于0
  85. # 记录
  86. self.current_positions[symbol] = final_position
  87. self.total_position = sum(self.current_positions.values())
  88. if current_date:
  89. self.position_history.append({
  90. "date": current_date,
  91. "symbol": symbol,
  92. "base": base_position,
  93. "vol_adjust": vol_adjustment,
  94. "adjusted": adjusted_position,
  95. "final": final_position,
  96. "total": self.total_position
  97. })
  98. return final_position
  99. def _calculate_volatility_adjustment(self, volatility: float) -> float:
  100. """
  101. 计算波动率调整系数
  102. 低波动加仓,高波动减仓
  103. """
  104. if volatility < self.VOL_LOW_THRESHOLD:
  105. # 低波动:加仓20%
  106. return self.VOL_ADJUST_LOW
  107. elif volatility > self.VOL_HIGH_THRESHOLD:
  108. # 高波动:降仓40%
  109. return self.VOL_ADJUST_HIGH
  110. else:
  111. # 中等波动:线性插值
  112. # 15% -> 1.2, 30% -> 0.6
  113. ratio = (volatility - self.VOL_LOW_THRESHOLD) / \
  114. (self.VOL_HIGH_THRESHOLD - self.VOL_LOW_THRESHOLD)
  115. return self.VOL_ADJUST_LOW - ratio * (self.VOL_ADJUST_LOW - self.VOL_ADJUST_HIGH)
  116. def should_rebalance(self, current_date: datetime) -> bool:
  117. """检查是否需要再平衡"""
  118. if self.last_rebalance is None:
  119. return True
  120. days_since = (current_date - self.last_rebalance).days
  121. return days_since >= self.rebalance_freq
  122. def rebalance(
  123. self,
  124. data_dict: Dict[str, pd.DataFrame],
  125. current_date: datetime
  126. ) -> Dict[str, float]:
  127. """
  128. 执行再平衡
  129. 根据最新波动率调整所有持仓
  130. """
  131. new_positions = {}
  132. for symbol, current_pos in self.current_positions.items():
  133. if symbol not in data_dict:
  134. continue
  135. df = data_dict[symbol]
  136. df = df[df.index <= current_date]
  137. if len(df) < 60:
  138. continue
  139. # 重新计算波动率
  140. returns = df['close'].pct_change().dropna()
  141. volatility = returns.iloc[-60:].std() * np.sqrt(252)
  142. # 重新计算仓位
  143. vol_adjust = self._calculate_volatility_adjustment(volatility)
  144. max_pos = self.SYMBOL_MAX_POSITION.get(symbol, 0.40)
  145. # 保持原置信度,调整波动率
  146. if current_pos > 0:
  147. confidence = current_pos / (max_pos * vol_adjust)
  148. confidence = min(1.0, confidence)
  149. else:
  150. confidence = 0
  151. new_pos = max_pos * confidence * vol_adjust
  152. new_positions[symbol] = new_pos
  153. # 归一化确保总仓位不超限制
  154. total = sum(new_positions.values())
  155. if total > self.total_max:
  156. scale = self.total_max / total
  157. new_positions = {k: v * scale for k, v in new_positions.items()}
  158. self.current_positions = new_positions
  159. self.total_position = sum(new_positions.values())
  160. self.last_rebalance = current_date
  161. return new_positions
  162. def reduce_position(self, symbol: str, reduction_pct: float) -> float:
  163. """
  164. 减仓指定比例
  165. 用于风控触发时的紧急减仓
  166. """
  167. if symbol not in self.current_positions:
  168. return 0.0
  169. current = self.current_positions[symbol]
  170. new_pos = current * (1 - reduction_pct)
  171. new_pos = max(0, new_pos)
  172. self.current_positions[symbol] = new_pos
  173. self.total_position = sum(self.current_positions.values())
  174. return new_pos
  175. def close_position(self, symbol: str) -> float:
  176. """清仓指定品种"""
  177. if symbol not in self.current_positions:
  178. return 0.0
  179. closed_amount = self.current_positions[symbol]
  180. self.current_positions[symbol] = 0.0
  181. self.total_position = sum(self.current_positions.values())
  182. return closed_amount
  183. def close_all_positions(self) -> Dict[str, float]:
  184. """清仓所有品种"""
  185. closed = self.current_positions.copy()
  186. self.current_positions = {}
  187. self.total_position = 0.0
  188. return closed
  189. def get_position_summary(self) -> Dict:
  190. """获取仓位摘要"""
  191. return {
  192. "total_position": self.total_position,
  193. "available": self.total_max - self.total_position,
  194. "positions": self.current_positions.copy(),
  195. "last_rebalance": self.last_rebalance
  196. }
  197. def can_open_position(
  198. self,
  199. symbol: str,
  200. desired_position: float
  201. ) -> bool:
  202. """检查是否可以开仓"""
  203. other_positions = sum(
  204. pos for sym, pos in self.current_positions.items() if sym != symbol
  205. )
  206. return other_positions + desired_position <= self.total_max