| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- """
- 智能订单路由器
- 根据微观结构选择最优执行策略
- """
- from dataclasses import dataclass
- from typing import Optional, Dict
- from enum import Enum
- class ExecutionStrategy(Enum):
- """执行策略类型"""
- MARKET = "market" # 市价单
- TWAP = "twap" # 时间加权平均
- VWAP = "vwap" # 成交量加权平均
- ICEBERG = "iceberg" # 冰山订单
- @dataclass
- class ExecutionPlan:
- """执行计划"""
- strategy: ExecutionStrategy
- num_slices: int # 拆单数量
- duration_minutes: int # 执行时长
- display_size: Optional[int] = None # 冰山订单显示量
- reasoning: str = ""
- class OrderRouter:
- """
- 智能订单路由器
- 根据市场微观结构选择最优执行策略:
- - 高流动性+低波动:市价单
- - 大单+低流动性:TWAP/VWAP
- - 不希望暴露意图:冰山订单
- """
- def route_order(
- self,
- order_size: int,
- avg_daily_volume: float,
- spread_pct: float,
- volatility: float,
- urgency: str = "normal"
- ) -> ExecutionPlan:
- """
- 路由订单
- Args:
- order_size: 订单数量
- avg_daily_volume: 日均成交量
- spread_pct: 买卖价差百分比
- volatility: 波动率
- urgency: 紧急程度 (high/normal/low)
- Returns:
- ExecutionPlan: 执行计划
- """
- adv_participation = order_size / avg_daily_volume if avg_daily_volume > 0 else 1.0
- # 高流动性且紧急:市价单
- if spread_pct < 0.001 and volatility < 0.02 and urgency == "high":
- return ExecutionPlan(
- strategy=ExecutionStrategy.MARKET,
- num_slices=1,
- duration_minutes=0,
- reasoning="高流动性+紧急,使用市价单"
- )
- # 大单:TWAP/VWAP
- if adv_participation > 0.05:
- return ExecutionPlan(
- strategy=ExecutionStrategy.TWAP,
- num_slices=20,
- duration_minutes=30,
- reasoning=f"大单占ADV{adv_participation:.1%},使用TWAP拆单"
- )
- # 中等订单:VWAP
- if adv_participation > 0.02:
- return ExecutionPlan(
- strategy=ExecutionStrategy.VWAP,
- num_slices=10,
- duration_minutes=15,
- reasoning=f"中等订单占ADV{adv_participation:.1%},使用VWAP"
- )
- # 默认:小单直接市价
- return ExecutionPlan(
- strategy=ExecutionStrategy.MARKET,
- num_slices=1,
- duration_minutes=0,
- reasoning="小单,直接市价成交"
- )
|