realtime_signal.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 创业板50实时信号计算器
  5. 在任意时刻获取实时行情,计算当前信号状态
  6. """
  7. import pandas as pd
  8. import numpy as np
  9. import akshare as ak
  10. import sys
  11. import warnings
  12. warnings.filterwarnings('ignore')
  13. sys.path.insert(0, '/root/.openclaw/workspace/cat-fly')
  14. from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator
  15. from datetime import datetime, timedelta
  16. def get_realtime_price():
  17. """获取创业板50实时价格"""
  18. try:
  19. # 使用新浪接口获取实时行情(更稳定)
  20. df = ak.stock_zh_index_spot_sina()
  21. cyb50 = df[df['代码'] == 'sz399673']
  22. if len(cyb50) > 0:
  23. return {
  24. 'price': float(cyb50.iloc[0]['最新价']),
  25. 'open': float(cyb50.iloc[0]['今开']),
  26. 'high': float(cyb50.iloc[0]['最高']),
  27. 'low': float(cyb50.iloc[0]['最低']),
  28. 'volume': float(cyb50.iloc[0]['成交量']),
  29. 'time': datetime.now().strftime('%H:%M:%S')
  30. }
  31. except Exception as e:
  32. print(f"获取实时行情失败: {e}")
  33. return None
  34. def calculate_signal_at_time(current_price, last_kline_data):
  35. """
  36. 基于最新价格和最后一根K线数据,估算当前信号状态
  37. 参数:
  38. current_price: 当前实时价格
  39. last_kline_data: 最后一根完整K线的技术指标数据 (Series)
  40. """
  41. score = 0
  42. signals = []
  43. # 1. RSI估算 - 假设价格变化与RSI变化大致线性
  44. last_close = last_kline_data['Close']
  45. price_change_pct = (current_price - last_close) / last_close
  46. # 简化估算:价格每跌1%,RSI约跌2-3个点
  47. estimated_rsi = last_kline_data['RSI'] + price_change_pct * 250
  48. if estimated_rsi < 30:
  49. score += 2
  50. signals.append(f"RSI超卖(估{estimated_rsi:.1f})")
  51. elif estimated_rsi < 35:
  52. score += 1
  53. signals.append(f"RSI偏弱(估{estimated_rsi:.1f})")
  54. # 2. KDJ估算 - 价格影响J值较大
  55. estimated_j = last_kline_data['J'] + price_change_pct * 300
  56. if estimated_j < 0:
  57. score += 1
  58. signals.append(f"KDJ极端超卖(估J={estimated_j:.1f})")
  59. # 3. 布林带位置判断
  60. bb_lower = last_kline_data['BB_lower']
  61. bb_upper = last_kline_data['BB_upper']
  62. if current_price <= bb_lower * 1.005:
  63. score += 2
  64. signals.append("触及下轨")
  65. elif current_price <= bb_lower * 1.02:
  66. score += 1
  67. signals.append("接近下轨")
  68. # 4. 价格动量 - 使用实时价格与开盘/昨日收盘比较
  69. if price_change_pct < -0.015:
  70. score += 1
  71. signals.append(f"动量超卖({price_change_pct*100:.2f}%)")
  72. # 5. MA趋势判断(基于已有数据)
  73. if last_kline_data['MA6'] > last_kline_data['MA12']:
  74. score += 1
  75. signals.append("MA短期上行")
  76. else:
  77. score -= 1
  78. signals.append("MA下降趋势惩罚")
  79. return {
  80. 'score': score,
  81. 'signals': signals,
  82. 'estimated_rsi': estimated_rsi,
  83. 'estimated_j': estimated_j,
  84. 'price_change_pct': price_change_pct,
  85. 'bb_lower': bb_lower,
  86. 'bb_upper': bb_upper,
  87. 'current_price': current_price,
  88. 'last_close': last_close
  89. }
  90. def generate_realtime_report():
  91. """生成实时信号报告"""
  92. print("="*80)
  93. print("🚀 创业板50实时信号检测")
  94. print(f"检测时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  95. print("="*80)
  96. # 1. 获取历史30分钟K线数据(用于计算技术指标)
  97. print("\n📊 获取历史数据...")
  98. config_manager = ConfigManager('config.json')
  99. fetcher = IntradayDataFetcher(config_manager)
  100. end_date = datetime.now()
  101. start_date = end_date - timedelta(days=30) # 取30天足够计算指标
  102. raw_data = fetcher.fetch_30min_data(start_date, end_date)
  103. if raw_data is None or len(raw_data) == 0:
  104. print("❌ 数据获取失败")
  105. return None
  106. # 2. 计算技术指标
  107. data_with_indicators = fetcher.calculate_intraday_indicators(raw_data)
  108. last_kline = data_with_indicators.iloc[-1]
  109. print(f" 最后一根K线: {data_with_indicators.index[-1]}")
  110. print(f" 收盘价: {last_kline['Close']:.2f}")
  111. # 3. 获取实时价格
  112. print("\n📈 获取实时行情...")
  113. realtime = get_realtime_price()
  114. if realtime is None:
  115. print("❌ 获取实时价格失败")
  116. return None
  117. print(f" 当前时间: {realtime['time']}")
  118. print(f" 当前价格: {realtime['price']:.2f}")
  119. print(f" 今日最高: {realtime['high']:.2f}")
  120. print(f" 今日最低: {realtime['low']:.2f}")
  121. # 4. 计算当前信号
  122. print("\n🎯 计算实时信号...")
  123. signal_result = calculate_signal_at_time(realtime['price'], last_kline)
  124. print(f"\n{'='*80}")
  125. print("实时信号评估结果")
  126. print(f"{'='*80}")
  127. print(f"当前价格: {signal_result['current_price']:.2f}")
  128. print(f"最后一根K线收盘: {signal_result['last_close']:.2f}")
  129. print(f"价格变化: {signal_result['price_change_pct']*100:+.2f}%")
  130. print(f"\n技术指标估算:")
  131. print(f" RSI(估算): {signal_result['estimated_rsi']:.2f}")
  132. print(f" KDJ J(估算): {signal_result['estimated_j']:.2f}")
  133. print(f" 布林带: [{signal_result['bb_lower']:.2f}, {signal_result['bb_upper']:.2f}]")
  134. print(f"\n信号评分: {signal_result['score']}/4 (阈值: 4分触发买入)")
  135. print(f"触发信号: {', '.join(signal_result['signals']) if signal_result['signals'] else '无'}")
  136. if signal_result['score'] >= 4:
  137. print(f"\n🟢 建议: 触发买入信号!")
  138. elif signal_result['score'] >= 3:
  139. print(f"\n🟡 建议: 接近触发,建议关注")
  140. else:
  141. print(f"\n⚪ 建议: 未触发买入信号")
  142. print(f"{'='*80}")
  143. return signal_result
  144. if __name__ == "__main__":
  145. generate_realtime_report()