instant.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """
  2. 瞬时生态识别器
  3. 基于分钟级数据计算买卖盘不平衡、大单流向、跳动率突变
  4. """
  5. from dataclasses import dataclass, field
  6. from typing import Dict, List, Optional
  7. from enum import Enum
  8. import numpy as np
  9. import pandas as pd
  10. class ImbalanceDirection(Enum):
  11. """买卖盘不平衡方向"""
  12. BID_DOMINANT = "bid_dominant" # 买盘占优
  13. ASK_DOMINANT = "ask_dominant" # 卖盘占优
  14. BALANCED = "balanced" # 平衡
  15. class TickActivity(Enum):
  16. """跳动率活跃度"""
  17. NORMAL = "normal"
  18. ELEVATED = "elevated" # 活跃度上升
  19. DEPRESSED = "depressed" # 活跃度下降
  20. SPIKE = "spike" # 突变
  21. @dataclass
  22. class InstantEcosystem:
  23. """瞬时生态数据结构"""
  24. timestamp: pd.Timestamp
  25. # 买卖盘不平衡
  26. imbalance_direction: ImbalanceDirection
  27. imbalance_ratio: float # 买卖盘量比
  28. imbalance_duration: int # 持续分钟数
  29. # 大单流向
  30. block_trade_flow: float # 大单净流入(万元)
  31. block_buy_count: int
  32. block_sell_count: int
  33. # 跳动率
  34. tick_activity: TickActivity
  35. tick_rate_change: float # 较前5分钟变化率
  36. current_tick_rate: int # 当前跳动率(笔/分钟)
  37. # 综合信号
  38. signals: List[str] = field(default_factory=list)
  39. def is_trading_opportunity(self) -> bool:
  40. """判断是否为交易机会"""
  41. return (
  42. self.imbalance_direction != ImbalanceDirection.BALANCED and
  43. abs(self.block_trade_flow) > 500 and # 500万元
  44. self.tick_activity != TickActivity.NORMAL
  45. )
  46. class InstantEcosystemIdentifier:
  47. """
  48. 瞬时生态识别器
  49. 基于分钟级tick数据计算:
  50. 1. 买卖盘不平衡度(Bid-Ask Imbalance)
  51. 2. 大单流向(Block Trade Flow)
  52. 3. 跳动率突变(Tick Rate Change)
  53. """
  54. def __init__(
  55. self,
  56. imbalance_ratio_threshold: float = 2.0,
  57. imbalance_duration: int = 3,
  58. block_trade_min_value: float = 500_000, # 50万元
  59. tick_rate_change_threshold: float = 0.5,
  60. tick_window_minutes: int = 5
  61. ):
  62. self.imbalance_ratio_threshold = imbalance_ratio_threshold
  63. self.imbalance_duration = imbalance_duration
  64. self.block_trade_min_value = block_trade_min_value
  65. self.tick_rate_change_threshold = tick_rate_change_threshold
  66. self.tick_window_minutes = tick_window_minutes
  67. def identify(
  68. self,
  69. tick_data: pd.DataFrame,
  70. order_book_data: Optional[pd.DataFrame] = None,
  71. timestamp: Optional[pd.Timestamp] = None
  72. ) -> InstantEcosystem:
  73. """
  74. 识别瞬时生态
  75. Args:
  76. tick_data: 分钟级tick数据,包含 ['price', 'volume', 'side', 'bid', 'ask']
  77. order_book_data: 订单簿数据(可选)
  78. timestamp: 当前时间戳
  79. Returns:
  80. InstantEcosystem: 瞬时生态识别结果
  81. """
  82. if timestamp is None:
  83. timestamp = pd.Timestamp.now()
  84. signals = []
  85. # 1. 计算买卖盘不平衡
  86. imbalance = self._calculate_imbalance(tick_data, order_book_data)
  87. if imbalance['direction'] == ImbalanceDirection.BID_DOMINANT:
  88. signals.append(f"买盘占优 (比率: {imbalance['ratio']:.2f})")
  89. elif imbalance['direction'] == ImbalanceDirection.ASK_DOMINANT:
  90. signals.append(f"卖盘占优 (比率: {imbalance['ratio']:.2f})")
  91. # 2. 计算大单流向
  92. block_flow = self._calculate_block_flow(tick_data)
  93. if abs(block_flow['net_flow']) > 1000: # 1000万元
  94. direction = "流入" if block_flow['net_flow'] > 0 else "流出"
  95. signals.append(f"大单{direction}: {abs(block_flow['net_flow']):.0f}万元")
  96. # 3. 计算跳动率突变
  97. tick_activity = self._calculate_tick_activity(tick_data)
  98. if tick_activity['activity'] == TickActivity.SPIKE:
  99. signals.append(f"跳动率突变: {tick_activity['change']:.1%}")
  100. elif tick_activity['activity'] == TickActivity.ELEVATED:
  101. signals.append(f"活跃度上升: {tick_activity['change']:.1%}")
  102. return InstantEcosystem(
  103. timestamp=timestamp,
  104. imbalance_direction=imbalance['direction'],
  105. imbalance_ratio=imbalance['ratio'],
  106. imbalance_duration=imbalance['duration'],
  107. block_trade_flow=block_flow['net_flow'],
  108. block_buy_count=block_flow['buy_count'],
  109. block_sell_count=block_flow['sell_count'],
  110. tick_activity=tick_activity['activity'],
  111. tick_rate_change=tick_activity['change'],
  112. current_tick_rate=tick_activity['current_rate'],
  113. signals=signals
  114. )
  115. def _calculate_imbalance(
  116. self,
  117. tick_data: pd.DataFrame,
  118. order_book_data: Optional[pd.DataFrame] = None
  119. ) -> Dict:
  120. """
  121. 计算买卖盘不平衡
  122. 方法1: 使用订单簿深度(如果有)
  123. 方法2: 使用tick数据中的主动买卖
  124. """
  125. # 方法1: 订单簿深度
  126. if order_book_data is not None and not order_book_data.empty:
  127. if 'bid_volume' in order_book_data and 'ask_volume' in order_book_data:
  128. bid_vol = order_book_data['bid_volume'].iloc[-1]
  129. ask_vol = order_book_data['ask_volume'].iloc[-1]
  130. if ask_vol > 0:
  131. ratio = bid_vol / ask_vol
  132. else:
  133. ratio = 1.0
  134. # 判断方向
  135. if ratio > self.imbalance_ratio_threshold:
  136. direction = ImbalanceDirection.BID_DOMINANT
  137. elif ratio < 1 / self.imbalance_ratio_threshold:
  138. direction = ImbalanceDirection.ASK_DOMINANT
  139. else:
  140. direction = ImbalanceDirection.BALANCED
  141. # 计算持续时间
  142. duration = self._calculate_imbalance_duration(
  143. order_book_data, direction
  144. )
  145. return {
  146. 'direction': direction,
  147. 'ratio': ratio,
  148. 'duration': duration
  149. }
  150. # 方法2: 使用tick数据的主动买卖
  151. if 'side' in tick_data.columns:
  152. buy_volume = tick_data[tick_data['side'] == 'buy']['volume'].sum()
  153. sell_volume = tick_data[tick_data['side'] == 'sell']['volume'].sum()
  154. else:
  155. # 通过价格与买卖价关系推断
  156. buy_volume = tick_data[
  157. tick_data['price'] >= tick_data.get('ask', tick_data['price'])
  158. ]['volume'].sum()
  159. sell_volume = tick_data[
  160. tick_data['price'] <= tick_data.get('bid', tick_data['price'])
  161. ]['volume'].sum()
  162. total_volume = buy_volume + sell_volume
  163. if total_volume == 0:
  164. return {
  165. 'direction': ImbalanceDirection.BALANCED,
  166. 'ratio': 1.0,
  167. 'duration': 0
  168. }
  169. ratio = buy_volume / sell_volume if sell_volume > 0 else 1.0
  170. if ratio > self.imbalance_ratio_threshold:
  171. direction = ImbalanceDirection.BID_DOMINANT
  172. elif ratio < 1 / self.imbalance_ratio_threshold:
  173. direction = ImbalanceDirection.ASK_DOMINANT
  174. else:
  175. direction = ImbalanceDirection.BALANCED
  176. return {
  177. 'direction': direction,
  178. 'ratio': ratio,
  179. 'duration': self.imbalance_duration # 简化处理
  180. }
  181. def _calculate_imbalance_duration(
  182. self,
  183. order_book_data: pd.DataFrame,
  184. current_direction: ImbalanceDirection
  185. ) -> int:
  186. """计算不平衡持续时间"""
  187. if len(order_book_data) < self.imbalance_duration:
  188. return len(order_book_data)
  189. duration = 0
  190. for i in range(1, min(len(order_book_data), 10) + 1):
  191. bid_vol = order_book_data['bid_volume'].iloc[-i]
  192. ask_vol = order_book_data['ask_volume'].iloc[-i]
  193. if ask_vol > 0:
  194. ratio = bid_vol / ask_vol
  195. else:
  196. ratio = 1.0
  197. if current_direction == ImbalanceDirection.BID_DOMINANT and ratio > self.imbalance_ratio_threshold:
  198. duration += 1
  199. elif current_direction == ImbalanceDirection.ASK_DOMINANT and ratio < 1 / self.imbalance_ratio_threshold:
  200. duration += 1
  201. elif current_direction == ImbalanceDirection.BALANCED:
  202. duration += 1
  203. else:
  204. break
  205. return duration
  206. def _calculate_block_flow(self, tick_data: pd.DataFrame) -> Dict:
  207. """
  208. 计算大单流向
  209. 返回净流入(万元)
  210. """
  211. if tick_data.empty:
  212. return {
  213. 'net_flow': 0.0,
  214. 'buy_count': 0,
  215. 'sell_count': 0
  216. }
  217. # 计算平均成交金额
  218. avg_value = (tick_data['price'] * tick_data['volume']).mean()
  219. # 识别大单
  220. tick_data = tick_data.copy()
  221. tick_data['value'] = tick_data['price'] * tick_data['volume']
  222. block_trades = tick_data[
  223. tick_data['value'] > self.block_trade_min_value
  224. ]
  225. if block_trades.empty:
  226. return {
  227. 'net_flow': 0.0,
  228. 'buy_count': 0,
  229. 'sell_count': 0
  230. }
  231. # 分类买卖
  232. if 'side' in block_trades.columns:
  233. buy_trades = block_trades[block_trades['side'] == 'buy']
  234. sell_trades = block_trades[block_trades['side'] == 'sell']
  235. else:
  236. # 通过价格位置推断
  237. buy_trades = block_trades[block_trades['price'] >= block_trades.get('ask', block_trades['price'])]
  238. sell_trades = block_trades[block_trades['price'] <= block_trades.get('bid', block_trades['price'])]
  239. # 计算净流入(万元)
  240. buy_flow = buy_trades['value'].sum() / 10_000
  241. sell_flow = sell_trades['value'].sum() / 10_000
  242. net_flow = buy_flow - sell_flow
  243. return {
  244. 'net_flow': net_flow,
  245. 'buy_count': len(buy_trades),
  246. 'sell_count': len(sell_trades)
  247. }
  248. def _calculate_tick_activity(self, tick_data: pd.DataFrame) -> Dict:
  249. """
  250. 计算跳动率突变
  251. 跳动率 = 单位时间内的成交笔数
  252. """
  253. if len(tick_data) < self.tick_window_minutes + 1:
  254. return {
  255. 'activity': TickActivity.NORMAL,
  256. 'change': 0.0,
  257. 'current_rate': len(tick_data)
  258. }
  259. # 计算当前跳动率(最近1分钟)
  260. current_rate = len(tick_data.iloc[-1:])
  261. # 计算前N分钟平均跳动率
  262. prev_data = tick_data.iloc[-(self.tick_window_minutes + 1):-1]
  263. prev_rate = len(prev_data) / self.tick_window_minutes if self.tick_window_minutes > 0 else 1
  264. # 计算变化率
  265. if prev_rate > 0:
  266. change = (current_rate - prev_rate) / prev_rate
  267. else:
  268. change = 0.0
  269. # 判断活跃度
  270. if abs(change) > self.tick_rate_change_threshold:
  271. activity = TickActivity.SPIKE
  272. elif change > 0.2:
  273. activity = TickActivity.ELEVATED
  274. elif change < -0.2:
  275. activity = TickActivity.DEPRESSED
  276. else:
  277. activity = TickActivity.NORMAL
  278. return {
  279. 'activity': activity,
  280. 'change': change,
  281. 'current_rate': current_rate
  282. }