test_eastmoney_data.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. import akshare as ak
  2. import pandas as pd
  3. import numpy as np
  4. from datetime import datetime, timedelta
  5. import warnings
  6. warnings.filterwarnings('ignore')
  7. print("测试东方财富数据源...")
  8. print("=" * 50)
  9. try:
  10. # 获取创业板50的30分钟K线数据
  11. symbol = "399673" # 创业板50指数
  12. print(f"正在获取{symbol}的30分钟K线数据...")
  13. data = ak.index_zh_a_hist_min_em(symbol=symbol, period="30")
  14. if not data.empty:
  15. print(f"[SUCCESS] 成功获取{len(data)}条30分钟数据")
  16. print(f"数据列名: {data.columns.tolist()}")
  17. print(f"数据时间范围: {data.index[0]} 到 {data.index[-1]}")
  18. # 检查最新数据时间
  19. latest_time = data.index[-1]
  20. current_time = datetime.now()
  21. print(f"[LATEST] 最新数据时间: {latest_time}")
  22. print(f"[CURRENT] 当前时间: {current_time}")
  23. # 检查数据列是否有具体时间
  24. if hasattr(latest_time, 'hour'):
  25. time_diff = current_time - latest_time
  26. print(f"[DELAY] 数据延迟: {time_diff}")
  27. else:
  28. print(f"[INFO] 数据可能只有日期,没有具体时间")
  29. # 显示最近几条数据
  30. print(f"\n最近5条数据:")
  31. print(data.tail())
  32. else:
  33. print("[ERROR] 获取的数据为空")
  34. except Exception as e:
  35. print(f"[ERROR] 获取数据失败: {e}")
  36. import traceback
  37. traceback.print_exc()
  38. print("=" * 50)
  39. print("测试完成")