""" 多维度风险管理体系 (Multi-Layer Risk Manager) 三层止损保护: 1. 单笔硬止损:-8% 2. 单日损失限制:-5%(减仓50%) 3. 账户熔断:-15%(清仓+冷静一周) 移动止盈: - 15%利润:保本 - 30%利润:锁定15% - 10%回撤:止盈出场 """ from typing import Dict, Optional, List from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum class RiskStatus(Enum): """风险状态""" NORMAL = "normal" WARNING = "warning" COOLING = "cooling" CIRCUIT_BREAKER = "circuit_breaker" @dataclass class RiskCheckResult: """风险检查结果""" can_trade: bool should_close_position: bool should_reduce_position: bool reduction_pct: float status: RiskStatus message: str @dataclass class PositionRisk: """持仓风险状态""" symbol: str entry_price: float current_price: float highest_price: float position_size: float unrealized_pnl_pct: float drawdown_from_peak: float class MultiLayerRiskManager: """ 多维度风险管理体系 实现三层止损 + 移动止盈 + 空仓机制 """ # 止损阈值 SINGLE_STOP_PCT = 0.08 # 单笔-8% DAILY_LOSS_PCT = 0.05 # 单日-5% CIRCUIT_BREAKER_PCT = 0.15 # 账户-15% # 移动止盈阈值 PROFIT_LOCK_1 = 0.15 # 15%保本 PROFIT_LOCK_2 = 0.30 # 30%锁定15% TRAILING_STOP = 0.10 # 10%回撤止盈 # 冷静期 COOLING_DAYS = 5 CIRCUIT_BREAKER_DAYS = 5 def __init__(self): # 当前状态 self.status = RiskStatus.NORMAL self.cooling_end_date: Optional[datetime] = None # 账户状态 self.peak_equity = 0.0 self.current_equity = 0.0 self.daily_pnl = 0.0 self.last_reset_date: Optional[datetime] = None # 持仓跟踪 self.positions: Dict[str, PositionRisk] = {} # 历史记录 self.risk_events: List[Dict] = [] def initialize(self, initial_equity: float, current_date: datetime): """初始化风险管理体系""" self.peak_equity = initial_equity self.current_equity = initial_equity self.last_reset_date = current_date def update_equity(self, equity: float, current_date: datetime): """更新账户权益""" # 重置每日盈亏 if self.last_reset_date != current_date: self.daily_pnl = 0.0 self.last_reset_date = current_date # 计算当日盈亏 self.daily_pnl = equity - self.current_equity self.current_equity = equity # 更新峰值 if equity > self.peak_equity: self.peak_equity = equity def register_position( self, symbol: str, entry_price: float, position_size: float, current_date: datetime ): """注册新持仓""" self.positions[symbol] = PositionRisk( symbol=symbol, entry_price=entry_price, current_price=entry_price, highest_price=entry_price, position_size=position_size, unrealized_pnl_pct=0.0, drawdown_from_peak=0.0 ) def update_position(self, symbol: str, current_price: float): """更新持仓状态""" if symbol not in self.positions: return pos = self.positions[symbol] pos.current_price = current_price pos.highest_price = max(pos.highest_price, current_price) # 计算盈亏 pos.unrealized_pnl_pct = (current_price - pos.entry_price) / pos.entry_price # 计算从峰值的回撤 pos.drawdown_from_peak = (pos.highest_price - current_price) / pos.highest_price def check_risk( self, symbol: str, current_date: datetime ) -> RiskCheckResult: """ 执行风险检查 按优先级检查:熔断 > 单日 > 单笔 > 止盈 """ # 1. 检查熔断状态 if self.status == RiskStatus.CIRCUIT_BREAKER: if self.cooling_end_date and current_date < self.cooling_end_date: return RiskCheckResult( can_trade=False, should_close_position=True, should_reduce_position=False, reduction_pct=0.0, status=self.status, message="Circuit breaker active, trading halted" ) else: # 熔断期结束,恢复 self.status = RiskStatus.NORMAL self.cooling_end_date = None # 2. 检查账户熔断 total_drawdown = (self.current_equity - self.peak_equity) / self.peak_equity if total_drawdown <= -self.CIRCUIT_BREAKER_PCT: self._trigger_circuit_breaker(current_date) return RiskCheckResult( can_trade=False, should_close_position=True, should_reduce_position=False, reduction_pct=0.0, status=self.status, message=f"Circuit breaker triggered: drawdown {total_drawdown:.2%}" ) # 3. 检查单日损失 daily_return = self.daily_pnl / self.current_equity if self.current_equity > 0 else 0 if daily_return <= -self.DAILY_LOSS_PCT: return RiskCheckResult( can_trade=True, should_close_position=False, should_reduce_position=True, reduction_pct=0.5, status=RiskStatus.WARNING, message=f"Daily loss limit: {daily_return:.2%}" ) # 4. 检查持仓风险 if symbol in self.positions: pos = self.positions[symbol] # 单笔硬止损 if pos.unrealized_pnl_pct <= -self.SINGLE_STOP_PCT: return RiskCheckResult( can_trade=True, should_close_position=True, should_reduce_position=False, reduction_pct=0.0, status=RiskStatus.WARNING, message=f"Single position stop: {pos.unrealized_pnl_pct:.2%}" ) # 移动止盈 trailing_check = self._check_trailing_stop(pos) if trailing_check: return trailing_check return RiskCheckResult( can_trade=True, should_close_position=False, should_reduce_position=False, reduction_pct=0.0, status=RiskStatus.NORMAL, message="Risk check passed" ) def _check_trailing_stop(self, pos: PositionRisk) -> Optional[RiskCheckResult]: """检查移动止盈条件""" profit = pos.unrealized_pnl_pct # 盈利15%:保本 if profit >= self.PROFIT_LOCK_1: # 检查是否回撤到成本价 if pos.current_price <= pos.entry_price: return RiskCheckResult( can_trade=True, should_close_position=True, should_reduce_position=False, reduction_pct=0.0, status=RiskStatus.WARNING, message="Breakeven stop triggered" ) # 盈利30%:锁定15%利润 if profit >= self.PROFIT_LOCK_2: lock_price = pos.entry_price * (1 + self.PROFIT_LOCK_1) if pos.current_price <= lock_price: return RiskCheckResult( can_trade=True, should_close_position=True, should_reduce_position=False, reduction_pct=0.0, status=RiskStatus.WARNING, message="Profit lock stop triggered" ) # 从峰值回撤10% if pos.drawdown_from_peak >= self.TRAILING_STOP: return RiskCheckResult( can_trade=True, should_close_position=True, should_reduce_position=False, reduction_pct=0.0, status=RiskStatus.WARNING, message=f"Trailing stop: {pos.drawdown_from_peak:.2%} from peak" ) return None def _trigger_circuit_breaker(self, current_date: datetime): """触发熔断""" self.status = RiskStatus.CIRCUIT_BREAKER self.cooling_end_date = current_date + timedelta(days=self.CIRCUIT_BREAKER_DAYS) self.risk_events.append({ "date": current_date, "type": "circuit_breaker", "drawdown": (self.current_equity - self.peak_equity) / self.peak_equity, "equity": self.current_equity }) def close_position(self, symbol: str): """关闭持仓记录""" if symbol in self.positions: del self.positions[symbol] def should_stay_cash( self, scores: Dict[str, float], regime: str, winter_days: int, current_date: datetime ) -> bool: """ 判断是否应空仓 条件: 1. 所有品种评分 < 60 2. Winter生态持续 > 30天 3. 当月已实现亏损 > 5% """ # 检查评分 all_below_threshold = all(score < 60 for score in scores.values()) # 检查Winter持续时间 extended_winter = (regime == "winter" and winter_days > 30) if all_below_threshold or extended_winter: return True return False def get_risk_summary(self) -> Dict: """获取风险摘要""" return { "status": self.status.value, "current_equity": self.current_equity, "peak_equity": self.peak_equity, "drawdown": (self.current_equity - self.peak_equity) / self.peak_equity, "daily_pnl": self.daily_pnl, "daily_return": self.daily_pnl / self.current_equity if self.current_equity > 0 else 0, "positions": len(self.positions), "cooling_end": self.cooling_end_date }