| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- import akshare as ak
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import warnings
- warnings.filterwarnings('ignore')
- print("测试东方财富数据源...")
- print("=" * 50)
- try:
- # 获取创业板50的30分钟K线数据
- symbol = "399673" # 创业板50指数
- print(f"正在获取{symbol}的30分钟K线数据...")
-
- data = ak.index_zh_a_hist_min_em(symbol=symbol, period="30")
-
- if not data.empty:
- print(f"[SUCCESS] 成功获取{len(data)}条30分钟数据")
- print(f"数据列名: {data.columns.tolist()}")
- print(f"数据时间范围: {data.index[0]} 到 {data.index[-1]}")
-
- # 检查最新数据时间
- latest_time = data.index[-1]
- current_time = datetime.now()
-
- print(f"[LATEST] 最新数据时间: {latest_time}")
- print(f"[CURRENT] 当前时间: {current_time}")
-
- # 检查数据列是否有具体时间
- if hasattr(latest_time, 'hour'):
- time_diff = current_time - latest_time
- print(f"[DELAY] 数据延迟: {time_diff}")
- else:
- print(f"[INFO] 数据可能只有日期,没有具体时间")
-
- # 显示最近几条数据
- print(f"\n最近5条数据:")
- print(data.tail())
-
- else:
- print("[ERROR] 获取的数据为空")
-
- except Exception as e:
- print(f"[ERROR] 获取数据失败: {e}")
- import traceback
- traceback.print_exc()
- print("=" * 50)
- print("测试完成")
|