router.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """
  2. 智能订单路由器
  3. 根据微观结构选择最优执行策略
  4. """
  5. from dataclasses import dataclass
  6. from typing import Optional, Dict
  7. from enum import Enum
  8. class ExecutionStrategy(Enum):
  9. """执行策略类型"""
  10. MARKET = "market" # 市价单
  11. TWAP = "twap" # 时间加权平均
  12. VWAP = "vwap" # 成交量加权平均
  13. ICEBERG = "iceberg" # 冰山订单
  14. @dataclass
  15. class ExecutionPlan:
  16. """执行计划"""
  17. strategy: ExecutionStrategy
  18. num_slices: int # 拆单数量
  19. duration_minutes: int # 执行时长
  20. display_size: Optional[int] = None # 冰山订单显示量
  21. reasoning: str = ""
  22. class OrderRouter:
  23. """
  24. 智能订单路由器
  25. 根据市场微观结构选择最优执行策略:
  26. - 高流动性+低波动:市价单
  27. - 大单+低流动性:TWAP/VWAP
  28. - 不希望暴露意图:冰山订单
  29. """
  30. def route_order(
  31. self,
  32. order_size: int,
  33. avg_daily_volume: float,
  34. spread_pct: float,
  35. volatility: float,
  36. urgency: str = "normal"
  37. ) -> ExecutionPlan:
  38. """
  39. 路由订单
  40. Args:
  41. order_size: 订单数量
  42. avg_daily_volume: 日均成交量
  43. spread_pct: 买卖价差百分比
  44. volatility: 波动率
  45. urgency: 紧急程度 (high/normal/low)
  46. Returns:
  47. ExecutionPlan: 执行计划
  48. """
  49. adv_participation = order_size / avg_daily_volume if avg_daily_volume > 0 else 1.0
  50. # 高流动性且紧急:市价单
  51. if spread_pct < 0.001 and volatility < 0.02 and urgency == "high":
  52. return ExecutionPlan(
  53. strategy=ExecutionStrategy.MARKET,
  54. num_slices=1,
  55. duration_minutes=0,
  56. reasoning="高流动性+紧急,使用市价单"
  57. )
  58. # 大单:TWAP/VWAP
  59. if adv_participation > 0.05:
  60. return ExecutionPlan(
  61. strategy=ExecutionStrategy.TWAP,
  62. num_slices=20,
  63. duration_minutes=30,
  64. reasoning=f"大单占ADV{adv_participation:.1%},使用TWAP拆单"
  65. )
  66. # 中等订单:VWAP
  67. if adv_participation > 0.02:
  68. return ExecutionPlan(
  69. strategy=ExecutionStrategy.VWAP,
  70. num_slices=10,
  71. duration_minutes=15,
  72. reasoning=f"中等订单占ADV{adv_participation:.1%},使用VWAP"
  73. )
  74. # 默认:小单直接市价
  75. return ExecutionPlan(
  76. strategy=ExecutionStrategy.MARKET,
  77. num_slices=1,
  78. duration_minutes=0,
  79. reasoning="小单,直接市价成交"
  80. )