""" 智能订单路由器 根据微观结构选择最优执行策略 """ from dataclasses import dataclass from typing import Optional, Dict from enum import Enum class ExecutionStrategy(Enum): """执行策略类型""" MARKET = "market" # 市价单 TWAP = "twap" # 时间加权平均 VWAP = "vwap" # 成交量加权平均 ICEBERG = "iceberg" # 冰山订单 @dataclass class ExecutionPlan: """执行计划""" strategy: ExecutionStrategy num_slices: int # 拆单数量 duration_minutes: int # 执行时长 display_size: Optional[int] = None # 冰山订单显示量 reasoning: str = "" class OrderRouter: """ 智能订单路由器 根据市场微观结构选择最优执行策略: - 高流动性+低波动:市价单 - 大单+低流动性:TWAP/VWAP - 不希望暴露意图:冰山订单 """ def route_order( self, order_size: int, avg_daily_volume: float, spread_pct: float, volatility: float, urgency: str = "normal" ) -> ExecutionPlan: """ 路由订单 Args: order_size: 订单数量 avg_daily_volume: 日均成交量 spread_pct: 买卖价差百分比 volatility: 波动率 urgency: 紧急程度 (high/normal/low) Returns: ExecutionPlan: 执行计划 """ adv_participation = order_size / avg_daily_volume if avg_daily_volume > 0 else 1.0 # 高流动性且紧急:市价单 if spread_pct < 0.001 and volatility < 0.02 and urgency == "high": return ExecutionPlan( strategy=ExecutionStrategy.MARKET, num_slices=1, duration_minutes=0, reasoning="高流动性+紧急,使用市价单" ) # 大单:TWAP/VWAP if adv_participation > 0.05: return ExecutionPlan( strategy=ExecutionStrategy.TWAP, num_slices=20, duration_minutes=30, reasoning=f"大单占ADV{adv_participation:.1%},使用TWAP拆单" ) # 中等订单:VWAP if adv_participation > 0.02: return ExecutionPlan( strategy=ExecutionStrategy.VWAP, num_slices=10, duration_minutes=15, reasoning=f"中等订单占ADV{adv_participation:.1%},使用VWAP" ) # 默认:小单直接市价 return ExecutionPlan( strategy=ExecutionStrategy.MARKET, num_slices=1, duration_minutes=0, reasoning="小单,直接市价成交" )