| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- """
- 瞬时生态识别器
- 基于分钟级数据计算买卖盘不平衡、大单流向、跳动率突变
- """
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional
- from enum import Enum
- import numpy as np
- import pandas as pd
- class ImbalanceDirection(Enum):
- """买卖盘不平衡方向"""
- BID_DOMINANT = "bid_dominant" # 买盘占优
- ASK_DOMINANT = "ask_dominant" # 卖盘占优
- BALANCED = "balanced" # 平衡
- class TickActivity(Enum):
- """跳动率活跃度"""
- NORMAL = "normal"
- ELEVATED = "elevated" # 活跃度上升
- DEPRESSED = "depressed" # 活跃度下降
- SPIKE = "spike" # 突变
- @dataclass
- class InstantEcosystem:
- """瞬时生态数据结构"""
- timestamp: pd.Timestamp
- # 买卖盘不平衡
- imbalance_direction: ImbalanceDirection
- imbalance_ratio: float # 买卖盘量比
- imbalance_duration: int # 持续分钟数
- # 大单流向
- block_trade_flow: float # 大单净流入(万元)
- block_buy_count: int
- block_sell_count: int
- # 跳动率
- tick_activity: TickActivity
- tick_rate_change: float # 较前5分钟变化率
- current_tick_rate: int # 当前跳动率(笔/分钟)
- # 综合信号
- signals: List[str] = field(default_factory=list)
- def is_trading_opportunity(self) -> bool:
- """判断是否为交易机会"""
- return (
- self.imbalance_direction != ImbalanceDirection.BALANCED and
- abs(self.block_trade_flow) > 500 and # 500万元
- self.tick_activity != TickActivity.NORMAL
- )
- class InstantEcosystemIdentifier:
- """
- 瞬时生态识别器
- 基于分钟级tick数据计算:
- 1. 买卖盘不平衡度(Bid-Ask Imbalance)
- 2. 大单流向(Block Trade Flow)
- 3. 跳动率突变(Tick Rate Change)
- """
- def __init__(
- self,
- imbalance_ratio_threshold: float = 2.0,
- imbalance_duration: int = 3,
- block_trade_min_value: float = 500_000, # 50万元
- tick_rate_change_threshold: float = 0.5,
- tick_window_minutes: int = 5
- ):
- self.imbalance_ratio_threshold = imbalance_ratio_threshold
- self.imbalance_duration = imbalance_duration
- self.block_trade_min_value = block_trade_min_value
- self.tick_rate_change_threshold = tick_rate_change_threshold
- self.tick_window_minutes = tick_window_minutes
- def identify(
- self,
- tick_data: pd.DataFrame,
- order_book_data: Optional[pd.DataFrame] = None,
- timestamp: Optional[pd.Timestamp] = None
- ) -> InstantEcosystem:
- """
- 识别瞬时生态
- Args:
- tick_data: 分钟级tick数据,包含 ['price', 'volume', 'side', 'bid', 'ask']
- order_book_data: 订单簿数据(可选)
- timestamp: 当前时间戳
- Returns:
- InstantEcosystem: 瞬时生态识别结果
- """
- if timestamp is None:
- timestamp = pd.Timestamp.now()
- signals = []
- # 1. 计算买卖盘不平衡
- imbalance = self._calculate_imbalance(tick_data, order_book_data)
- if imbalance['direction'] == ImbalanceDirection.BID_DOMINANT:
- signals.append(f"买盘占优 (比率: {imbalance['ratio']:.2f})")
- elif imbalance['direction'] == ImbalanceDirection.ASK_DOMINANT:
- signals.append(f"卖盘占优 (比率: {imbalance['ratio']:.2f})")
- # 2. 计算大单流向
- block_flow = self._calculate_block_flow(tick_data)
- if abs(block_flow['net_flow']) > 1000: # 1000万元
- direction = "流入" if block_flow['net_flow'] > 0 else "流出"
- signals.append(f"大单{direction}: {abs(block_flow['net_flow']):.0f}万元")
- # 3. 计算跳动率突变
- tick_activity = self._calculate_tick_activity(tick_data)
- if tick_activity['activity'] == TickActivity.SPIKE:
- signals.append(f"跳动率突变: {tick_activity['change']:.1%}")
- elif tick_activity['activity'] == TickActivity.ELEVATED:
- signals.append(f"活跃度上升: {tick_activity['change']:.1%}")
- return InstantEcosystem(
- timestamp=timestamp,
- imbalance_direction=imbalance['direction'],
- imbalance_ratio=imbalance['ratio'],
- imbalance_duration=imbalance['duration'],
- block_trade_flow=block_flow['net_flow'],
- block_buy_count=block_flow['buy_count'],
- block_sell_count=block_flow['sell_count'],
- tick_activity=tick_activity['activity'],
- tick_rate_change=tick_activity['change'],
- current_tick_rate=tick_activity['current_rate'],
- signals=signals
- )
- def _calculate_imbalance(
- self,
- tick_data: pd.DataFrame,
- order_book_data: Optional[pd.DataFrame] = None
- ) -> Dict:
- """
- 计算买卖盘不平衡
- 方法1: 使用订单簿深度(如果有)
- 方法2: 使用tick数据中的主动买卖
- """
- # 方法1: 订单簿深度
- if order_book_data is not None and not order_book_data.empty:
- if 'bid_volume' in order_book_data and 'ask_volume' in order_book_data:
- bid_vol = order_book_data['bid_volume'].iloc[-1]
- ask_vol = order_book_data['ask_volume'].iloc[-1]
- if ask_vol > 0:
- ratio = bid_vol / ask_vol
- else:
- ratio = 1.0
- # 判断方向
- if ratio > self.imbalance_ratio_threshold:
- direction = ImbalanceDirection.BID_DOMINANT
- elif ratio < 1 / self.imbalance_ratio_threshold:
- direction = ImbalanceDirection.ASK_DOMINANT
- else:
- direction = ImbalanceDirection.BALANCED
- # 计算持续时间
- duration = self._calculate_imbalance_duration(
- order_book_data, direction
- )
- return {
- 'direction': direction,
- 'ratio': ratio,
- 'duration': duration
- }
- # 方法2: 使用tick数据的主动买卖
- if 'side' in tick_data.columns:
- buy_volume = tick_data[tick_data['side'] == 'buy']['volume'].sum()
- sell_volume = tick_data[tick_data['side'] == 'sell']['volume'].sum()
- else:
- # 通过价格与买卖价关系推断
- buy_volume = tick_data[
- tick_data['price'] >= tick_data.get('ask', tick_data['price'])
- ]['volume'].sum()
- sell_volume = tick_data[
- tick_data['price'] <= tick_data.get('bid', tick_data['price'])
- ]['volume'].sum()
- total_volume = buy_volume + sell_volume
- if total_volume == 0:
- return {
- 'direction': ImbalanceDirection.BALANCED,
- 'ratio': 1.0,
- 'duration': 0
- }
- ratio = buy_volume / sell_volume if sell_volume > 0 else 1.0
- if ratio > self.imbalance_ratio_threshold:
- direction = ImbalanceDirection.BID_DOMINANT
- elif ratio < 1 / self.imbalance_ratio_threshold:
- direction = ImbalanceDirection.ASK_DOMINANT
- else:
- direction = ImbalanceDirection.BALANCED
- return {
- 'direction': direction,
- 'ratio': ratio,
- 'duration': self.imbalance_duration # 简化处理
- }
- def _calculate_imbalance_duration(
- self,
- order_book_data: pd.DataFrame,
- current_direction: ImbalanceDirection
- ) -> int:
- """计算不平衡持续时间"""
- if len(order_book_data) < self.imbalance_duration:
- return len(order_book_data)
- duration = 0
- for i in range(1, min(len(order_book_data), 10) + 1):
- bid_vol = order_book_data['bid_volume'].iloc[-i]
- ask_vol = order_book_data['ask_volume'].iloc[-i]
- if ask_vol > 0:
- ratio = bid_vol / ask_vol
- else:
- ratio = 1.0
- if current_direction == ImbalanceDirection.BID_DOMINANT and ratio > self.imbalance_ratio_threshold:
- duration += 1
- elif current_direction == ImbalanceDirection.ASK_DOMINANT and ratio < 1 / self.imbalance_ratio_threshold:
- duration += 1
- elif current_direction == ImbalanceDirection.BALANCED:
- duration += 1
- else:
- break
- return duration
- def _calculate_block_flow(self, tick_data: pd.DataFrame) -> Dict:
- """
- 计算大单流向
- 返回净流入(万元)
- """
- if tick_data.empty:
- return {
- 'net_flow': 0.0,
- 'buy_count': 0,
- 'sell_count': 0
- }
- # 计算平均成交金额
- avg_value = (tick_data['price'] * tick_data['volume']).mean()
- # 识别大单
- tick_data = tick_data.copy()
- tick_data['value'] = tick_data['price'] * tick_data['volume']
- block_trades = tick_data[
- tick_data['value'] > self.block_trade_min_value
- ]
- if block_trades.empty:
- return {
- 'net_flow': 0.0,
- 'buy_count': 0,
- 'sell_count': 0
- }
- # 分类买卖
- if 'side' in block_trades.columns:
- buy_trades = block_trades[block_trades['side'] == 'buy']
- sell_trades = block_trades[block_trades['side'] == 'sell']
- else:
- # 通过价格位置推断
- buy_trades = block_trades[block_trades['price'] >= block_trades.get('ask', block_trades['price'])]
- sell_trades = block_trades[block_trades['price'] <= block_trades.get('bid', block_trades['price'])]
- # 计算净流入(万元)
- buy_flow = buy_trades['value'].sum() / 10_000
- sell_flow = sell_trades['value'].sum() / 10_000
- net_flow = buy_flow - sell_flow
- return {
- 'net_flow': net_flow,
- 'buy_count': len(buy_trades),
- 'sell_count': len(sell_trades)
- }
- def _calculate_tick_activity(self, tick_data: pd.DataFrame) -> Dict:
- """
- 计算跳动率突变
- 跳动率 = 单位时间内的成交笔数
- """
- if len(tick_data) < self.tick_window_minutes + 1:
- return {
- 'activity': TickActivity.NORMAL,
- 'change': 0.0,
- 'current_rate': len(tick_data)
- }
- # 计算当前跳动率(最近1分钟)
- current_rate = len(tick_data.iloc[-1:])
- # 计算前N分钟平均跳动率
- prev_data = tick_data.iloc[-(self.tick_window_minutes + 1):-1]
- prev_rate = len(prev_data) / self.tick_window_minutes if self.tick_window_minutes > 0 else 1
- # 计算变化率
- if prev_rate > 0:
- change = (current_rate - prev_rate) / prev_rate
- else:
- change = 0.0
- # 判断活跃度
- if abs(change) > self.tick_rate_change_threshold:
- activity = TickActivity.SPIKE
- elif change > 0.2:
- activity = TickActivity.ELEVATED
- elif change < -0.2:
- activity = TickActivity.DEPRESSED
- else:
- activity = TickActivity.NORMAL
- return {
- 'activity': activity,
- 'change': change,
- 'current_rate': current_rate
- }
|