| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- A50期货 - 30分钟K线数据获取
- 标的:富时中国A50指数期货 (CN0Y) / 华夏A50ETF (159601) 作为替代
- 数据周期:近40天
- 注意:由于网络限制,部分数据源可能无法访问
- """
- import pandas as pd
- import numpy as np
- import akshare as ak
- import requests
- import json
- from datetime import datetime, timedelta
- import warnings
- warnings.filterwarnings('ignore')
- class A50FuturesDataFetcher:
- """A50期货数据获取类"""
-
- def __init__(self):
- self.symbol = "CN0Y" # A50期货主力合约代码
- print(f"A50期货数据获取器初始化")
- print(f"标的代码: {self.symbol}")
-
- def fetch_30min_data(self, days=40):
- """
- 获取A50相关指数30分钟K线数据
-
- Parameters:
- days: 获取近N天的数据(默认40天)
-
- Returns:
- DataFrame with columns: datetime, open, high, low, close, volume
- """
- end_date = datetime.now()
- start_date = end_date - timedelta(days=days+5)
-
- print(f"\n{'='*60}")
- print(f"获取A50相关数据 (近{days}天)")
- print(f"时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
- print(f"{'='*60}")
-
- # 尝试多个数据源
- df = None
-
- # 方法1: 新浪财经A50期货
- df = self._fetch_sina_a50_futures(start_date, end_date)
- if df is not None and len(df) > 0:
- print(f"\n✅ 使用新浪财经A50期货数据")
- return df
-
- # 方法2: 使用A50 ETF作为替代
- df = self._fetch_a50_etf(start_date, end_date)
- if df is not None and len(df) > 0:
- print(f"\n✅ 使用A50 ETF数据作为替代")
- return df
-
- # 方法3: 使用上证50指数
- df = self._fetch_sz50_index(start_date, end_date)
- if df is not None and len(df) > 0:
- print(f"\n✅ 使用上证50指数作为替代")
- return df
-
- # 方法4: 生成模拟数据(用于测试)
- print("\n⚠️ 无法获取实时数据,生成模拟数据用于测试...")
- df = self._generate_sample_data(days)
- return df
-
- def _fetch_sina_a50_futures(self, start_date, end_date):
- """从新浪财经获取A50期货30分钟数据"""
- try:
- print("\n[数据源0] 新浪财经A50期货...")
-
- import requests
- import json
-
- # 新浪财经A50期货接口
- url = "https://stock.finance.sina.com.cn/futures/api/jsonp.php/var_a50=/GlobalService.getMink?symbol=CN0Y&scale=30"
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
- 'Referer': 'https://finance.sina.com.cn/'
- }
-
- response = requests.get(url, headers=headers, timeout=15)
-
- # 解析JSONP响应
- json_start = response.text.find('[')
- json_end = response.text.rfind(']') + 1
- json_str = response.text[json_start:json_end]
-
- data_dict = json.loads(json_str)
-
- if data_dict and isinstance(data_dict, list):
- data_list = []
- for item in data_dict:
- try:
- data_list.append({
- 'datetime': item.get('day'),
- 'open': float(item.get('open', 0)),
- 'high': float(item.get('high', 0)),
- 'low': float(item.get('low', 0)),
- 'close': float(item.get('close', 0)),
- 'volume': float(item.get('volume', 0))
- })
- except Exception:
- continue
-
- if data_list:
- df = pd.DataFrame(data_list)
- df['datetime'] = pd.to_datetime(df['datetime'])
- df = df.set_index('datetime').sort_index()
-
- # 筛选时间范围
- df = df[(df.index >= start_date) & (df.index <= end_date)]
-
- if len(df) > 10:
- print(f" ✅ 获取成功: {len(df)}条")
- return df
-
- print(f" 数据不足")
-
- except Exception as e:
- print(f" 获取失败: {e}")
-
- return None
-
- def _fetch_a50_etf(self, start_date, end_date):
- """获取华夏A50ETF数据作为A50期货的替代"""
- try:
- print("\n[数据源1] 获取华夏A50ETF(159601)数据...")
-
- # 使用akshare获取ETF分钟数据
- df = ak.fund_etf_hist_min_em(symbol="159601", period="30",
- start_date=start_date.strftime('%Y%m%d%H%M%S'),
- end_date=end_date.strftime('%Y%m%d%H%M%S'))
-
- if df is not None and not df.empty and len(df) > 10:
- # 标准化列名
- df = df.rename(columns={
- '时间': 'datetime',
- '开盘': 'open',
- '收盘': 'close',
- '最高': 'high',
- '最低': 'low',
- '成交量': 'volume'
- })
-
- df['datetime'] = pd.to_datetime(df['datetime'])
- df = df.set_index('datetime').sort_index()
-
- print(f" ✅ 获取成功: {len(df)}条")
- return df
- else:
- print(f" 数据不足")
-
- except Exception as e:
- print(f" 获取失败: {e}")
-
- return None
-
- def _fetch_sz50_index(self, start_date, end_date):
- """获取上证50指数作为替代"""
- try:
- print("\n[数据源2] 获取上证50指数(000016)数据...")
-
- df = ak.index_zh_a_hist_min_em(symbol="000016", period="30",
- start_date=start_date.strftime('%Y%m%d%H%M%S'),
- end_date=end_date.strftime('%Y%m%d%H%M%S'))
-
- if df is not None and not df.empty and len(df) > 10:
- df = df.rename(columns={
- '时间': 'datetime',
- '开盘': 'open',
- '收盘': 'close',
- '最高': 'high',
- '最低': 'low',
- '成交量': 'volume'
- })
-
- df['datetime'] = pd.to_datetime(df['datetime'])
- df = df.set_index('datetime').sort_index()
-
- print(f" ✅ 获取成功: {len(df)}条")
- return df
- else:
- print(f" 数据不足")
-
- except Exception as e:
- print(f" 获取失败: {e}")
-
- return None
-
- def _generate_sample_data(self, days=40):
- """生成A50风格的模拟数据(用于测试)"""
- print("\n[模拟数据] 生成A50风格模拟数据...")
-
- # 生成时间序列(交易日9:30-11:30, 13:00-15:00)
- dates = pd.date_range(end=datetime.now(), periods=days, freq='B') # 工作日
- all_times = []
-
- for date in dates:
- # 上午 9:30-11:30 (4个30分钟)
- for hour, minute in [(9, 30), (10, 0), (10, 30), (11, 0)]:
- all_times.append(date.replace(hour=hour, minute=minute))
- # 下午 13:00-15:00 (4个30分钟)
- for hour, minute in [(13, 0), (13, 30), (14, 0), (14, 30)]:
- all_times.append(date.replace(hour=hour, minute=minute))
-
- all_times = sorted([t for t in all_times if t <= datetime.now()])
- n = len(all_times)
-
- # A50基准价约13500点
- base_price = 13500
-
- # 生成随机价格序列(带趋势)
- np.random.seed(42)
- returns = np.random.normal(0.0002, 0.008, n) # A50波动率
-
- # 添加一些趋势
- trend = np.sin(np.linspace(0, 4*np.pi, n)) * 0.002
- returns += trend
-
- # 计算价格
- prices = base_price * np.exp(np.cumsum(returns))
-
- # 生成OHLC数据
- data = []
- for i, (time, price) in enumerate(zip(all_times, prices)):
- volatility = price * 0.003 # 0.3%波动
- open_price = price + np.random.normal(0, volatility * 0.3)
- high_price = max(open_price, price) + np.random.uniform(0, volatility)
- low_price = min(open_price, price) - np.random.uniform(0, volatility)
- close_price = price
- volume = np.random.randint(100000, 500000)
-
- data.append({
- 'datetime': time,
- 'open': round(open_price, 2),
- 'high': round(high_price, 2),
- 'low': round(low_price, 2),
- 'close': round(close_price, 2),
- 'volume': volume
- })
-
- df = pd.DataFrame(data)
- df = df.set_index('datetime').sort_index()
-
- print(f" ✅ 生成模拟数据: {len(df)}条")
- print(f" ⚠️ 注意:此为模拟数据,仅用于测试")
-
- return df
-
- def save_to_csv(self, df, filename=None):
- """保存数据到CSV文件"""
- if df is None or len(df) == 0:
- print("❌ 无数据可保存")
- return None
-
- if filename is None:
- filename = f"A50_futures_30min_{datetime.now().strftime('%Y%m%d')}.csv"
-
- try:
- df.to_csv(filename)
- import os
- print(f"\n💾 数据已保存: {filename}")
- print(f" 数据条数: {len(df)}")
- print(f" 文件大小: {round(os.path.getsize(filename)/1024, 2)} KB")
- return filename
- except Exception as e:
- print(f"❌ 保存失败: {e}")
- return None
-
- def show_statistics(self, df):
- """显示数据统计信息"""
- if df is None or len(df) == 0:
- return
-
- print(f"\n{'='*60}")
- print(f"数据统计信息")
- print(f"{'='*60}")
-
- print(f"\n【基本信息】")
- print(f" 数据条数: {len(df)}")
- print(f" 时间区间: {df.index[0]} 至 {df.index[-1]}")
- unique_dates = set([d.date() for d in pd.to_datetime(df.index)])
- print(f" 交易天数: {len(unique_dates)}天")
-
- print(f"\n【价格统计】")
- print(f" 最高价: {df['high'].max():.2f}")
- print(f" 最低价: {df['low'].min():.2f}")
- print(f" 最新价: {df['close'].iloc[-1]:.2f}")
-
- # 计算涨跌幅
- price_change = (df['close'].iloc[-1] - df['close'].iloc[0]) / df['close'].iloc[0] * 100
- print(f" 区间涨跌: {price_change:+.2f}%")
-
- print(f"\n【成交量统计】")
- print(f" 总成交量: {df['volume'].sum():,.0f}")
- print(f" 日均成交: {df['volume'].mean():,.0f}")
-
- print(f"\n【波动率统计】")
- df['returns'] = df['close'].pct_change()
- volatility = df['returns'].std() * np.sqrt(48) * 100 # 年化波动率
- print(f" 年化波动率: {volatility:.2f}%")
- def main():
- """主程序"""
- print("="*70)
- print("A50期货30分钟K线数据获取工具")
- print("="*70)
- print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 创建数据获取器
- fetcher = A50FuturesDataFetcher()
-
- # 获取近40天数据
- df = fetcher.fetch_30min_data(days=40)
-
- if df is not None and len(df) > 0:
- # 显示统计信息
- fetcher.show_statistics(df)
-
- # 保存到CSV
- fetcher.save_to_csv(df)
-
- # 显示最新10条数据
- print(f"\n{'='*60}")
- print(f"最新10条数据")
- print(f"{'='*60}")
- print(df.tail(10)[['open', 'high', 'low', 'close', 'volume']].to_string())
-
- print(f"\n✅ 数据获取完成!")
- else:
- print(f"\n❌ 数据获取失败")
-
- print("="*70)
- if __name__ == "__main__":
- import os
- main()
|