| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- """
- 多维度风险管理体系 (Multi-Layer Risk Manager)
- 三层止损保护:
- 1. 单笔硬止损:-8%
- 2. 单日损失限制:-5%(减仓50%)
- 3. 账户熔断:-15%(清仓+冷静一周)
- 移动止盈:
- - 15%利润:保本
- - 30%利润:锁定15%
- - 10%回撤:止盈出场
- """
- from typing import Dict, Optional, List
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta
- from enum import Enum
- class RiskStatus(Enum):
- """风险状态"""
- NORMAL = "normal"
- WARNING = "warning"
- COOLING = "cooling"
- CIRCUIT_BREAKER = "circuit_breaker"
- @dataclass
- class RiskCheckResult:
- """风险检查结果"""
- can_trade: bool
- should_close_position: bool
- should_reduce_position: bool
- reduction_pct: float
- status: RiskStatus
- message: str
- @dataclass
- class PositionRisk:
- """持仓风险状态"""
- symbol: str
- entry_price: float
- current_price: float
- highest_price: float
- position_size: float
- unrealized_pnl_pct: float
- drawdown_from_peak: float
- class MultiLayerRiskManager:
- """
- 多维度风险管理体系
- 实现三层止损 + 移动止盈 + 空仓机制
- """
- # 止损阈值
- SINGLE_STOP_PCT = 0.08 # 单笔-8%
- DAILY_LOSS_PCT = 0.05 # 单日-5%
- CIRCUIT_BREAKER_PCT = 0.15 # 账户-15%
- # 移动止盈阈值
- PROFIT_LOCK_1 = 0.15 # 15%保本
- PROFIT_LOCK_2 = 0.30 # 30%锁定15%
- TRAILING_STOP = 0.10 # 10%回撤止盈
- # 冷静期
- COOLING_DAYS = 5
- CIRCUIT_BREAKER_DAYS = 5
- def __init__(self):
- # 当前状态
- self.status = RiskStatus.NORMAL
- self.cooling_end_date: Optional[datetime] = None
- # 账户状态
- self.peak_equity = 0.0
- self.current_equity = 0.0
- self.daily_pnl = 0.0
- self.last_reset_date: Optional[datetime] = None
- # 持仓跟踪
- self.positions: Dict[str, PositionRisk] = {}
- # 历史记录
- self.risk_events: List[Dict] = []
- def initialize(self, initial_equity: float, current_date: datetime):
- """初始化风险管理体系"""
- self.peak_equity = initial_equity
- self.current_equity = initial_equity
- self.last_reset_date = current_date
- def update_equity(self, equity: float, current_date: datetime):
- """更新账户权益"""
- # 重置每日盈亏
- if self.last_reset_date != current_date:
- self.daily_pnl = 0.0
- self.last_reset_date = current_date
- # 计算当日盈亏
- self.daily_pnl = equity - self.current_equity
- self.current_equity = equity
- # 更新峰值
- if equity > self.peak_equity:
- self.peak_equity = equity
- def register_position(
- self,
- symbol: str,
- entry_price: float,
- position_size: float,
- current_date: datetime
- ):
- """注册新持仓"""
- self.positions[symbol] = PositionRisk(
- symbol=symbol,
- entry_price=entry_price,
- current_price=entry_price,
- highest_price=entry_price,
- position_size=position_size,
- unrealized_pnl_pct=0.0,
- drawdown_from_peak=0.0
- )
- def update_position(self, symbol: str, current_price: float):
- """更新持仓状态"""
- if symbol not in self.positions:
- return
- pos = self.positions[symbol]
- pos.current_price = current_price
- pos.highest_price = max(pos.highest_price, current_price)
- # 计算盈亏
- pos.unrealized_pnl_pct = (current_price - pos.entry_price) / pos.entry_price
- # 计算从峰值的回撤
- pos.drawdown_from_peak = (pos.highest_price - current_price) / pos.highest_price
- def check_risk(
- self,
- symbol: str,
- current_date: datetime
- ) -> RiskCheckResult:
- """
- 执行风险检查
- 按优先级检查:熔断 > 单日 > 单笔 > 止盈
- """
- # 1. 检查熔断状态
- if self.status == RiskStatus.CIRCUIT_BREAKER:
- if self.cooling_end_date and current_date < self.cooling_end_date:
- return RiskCheckResult(
- can_trade=False,
- should_close_position=True,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=self.status,
- message="Circuit breaker active, trading halted"
- )
- else:
- # 熔断期结束,恢复
- self.status = RiskStatus.NORMAL
- self.cooling_end_date = None
- # 2. 检查账户熔断
- total_drawdown = (self.current_equity - self.peak_equity) / self.peak_equity
- if total_drawdown <= -self.CIRCUIT_BREAKER_PCT:
- self._trigger_circuit_breaker(current_date)
- return RiskCheckResult(
- can_trade=False,
- should_close_position=True,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=self.status,
- message=f"Circuit breaker triggered: drawdown {total_drawdown:.2%}"
- )
- # 3. 检查单日损失
- daily_return = self.daily_pnl / self.current_equity if self.current_equity > 0 else 0
- if daily_return <= -self.DAILY_LOSS_PCT:
- return RiskCheckResult(
- can_trade=True,
- should_close_position=False,
- should_reduce_position=True,
- reduction_pct=0.5,
- status=RiskStatus.WARNING,
- message=f"Daily loss limit: {daily_return:.2%}"
- )
- # 4. 检查持仓风险
- if symbol in self.positions:
- pos = self.positions[symbol]
- # 单笔硬止损
- if pos.unrealized_pnl_pct <= -self.SINGLE_STOP_PCT:
- return RiskCheckResult(
- can_trade=True,
- should_close_position=True,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=RiskStatus.WARNING,
- message=f"Single position stop: {pos.unrealized_pnl_pct:.2%}"
- )
- # 移动止盈
- trailing_check = self._check_trailing_stop(pos)
- if trailing_check:
- return trailing_check
- return RiskCheckResult(
- can_trade=True,
- should_close_position=False,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=RiskStatus.NORMAL,
- message="Risk check passed"
- )
- def _check_trailing_stop(self, pos: PositionRisk) -> Optional[RiskCheckResult]:
- """检查移动止盈条件"""
- profit = pos.unrealized_pnl_pct
- # 盈利15%:保本
- if profit >= self.PROFIT_LOCK_1:
- # 检查是否回撤到成本价
- if pos.current_price <= pos.entry_price:
- return RiskCheckResult(
- can_trade=True,
- should_close_position=True,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=RiskStatus.WARNING,
- message="Breakeven stop triggered"
- )
- # 盈利30%:锁定15%利润
- if profit >= self.PROFIT_LOCK_2:
- lock_price = pos.entry_price * (1 + self.PROFIT_LOCK_1)
- if pos.current_price <= lock_price:
- return RiskCheckResult(
- can_trade=True,
- should_close_position=True,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=RiskStatus.WARNING,
- message="Profit lock stop triggered"
- )
- # 从峰值回撤10%
- if pos.drawdown_from_peak >= self.TRAILING_STOP:
- return RiskCheckResult(
- can_trade=True,
- should_close_position=True,
- should_reduce_position=False,
- reduction_pct=0.0,
- status=RiskStatus.WARNING,
- message=f"Trailing stop: {pos.drawdown_from_peak:.2%} from peak"
- )
- return None
- def _trigger_circuit_breaker(self, current_date: datetime):
- """触发熔断"""
- self.status = RiskStatus.CIRCUIT_BREAKER
- self.cooling_end_date = current_date + timedelta(days=self.CIRCUIT_BREAKER_DAYS)
- self.risk_events.append({
- "date": current_date,
- "type": "circuit_breaker",
- "drawdown": (self.current_equity - self.peak_equity) / self.peak_equity,
- "equity": self.current_equity
- })
- def close_position(self, symbol: str):
- """关闭持仓记录"""
- if symbol in self.positions:
- del self.positions[symbol]
- def should_stay_cash(
- self,
- scores: Dict[str, float],
- regime: str,
- winter_days: int,
- current_date: datetime
- ) -> bool:
- """
- 判断是否应空仓
- 条件:
- 1. 所有品种评分 < 60
- 2. Winter生态持续 > 30天
- 3. 当月已实现亏损 > 5%
- """
- # 检查评分
- all_below_threshold = all(score < 60 for score in scores.values())
- # 检查Winter持续时间
- extended_winter = (regime == "winter" and winter_days > 30)
- if all_below_threshold or extended_winter:
- return True
- return False
- def get_risk_summary(self) -> Dict:
- """获取风险摘要"""
- return {
- "status": self.status.value,
- "current_equity": self.current_equity,
- "peak_equity": self.peak_equity,
- "drawdown": (self.current_equity - self.peak_equity) / self.peak_equity,
- "daily_pnl": self.daily_pnl,
- "daily_return": self.daily_pnl / self.current_equity if self.current_equity > 0 else 0,
- "positions": len(self.positions),
- "cooling_end": self.cooling_end_date
- }
|