fetch_a50_data.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. A50期货 - 30分钟K线数据获取
  5. 标的:富时中国A50指数期货 (CN0Y) / 华夏A50ETF (159601) 作为替代
  6. 数据周期:近40天
  7. 注意:由于网络限制,部分数据源可能无法访问
  8. """
  9. import pandas as pd
  10. import numpy as np
  11. import akshare as ak
  12. import requests
  13. import json
  14. from datetime import datetime, timedelta
  15. import warnings
  16. warnings.filterwarnings('ignore')
  17. class A50FuturesDataFetcher:
  18. """A50期货数据获取类"""
  19. def __init__(self):
  20. self.symbol = "CN0Y" # A50期货主力合约代码
  21. print(f"A50期货数据获取器初始化")
  22. print(f"标的代码: {self.symbol}")
  23. def fetch_30min_data(self, days=40):
  24. """
  25. 获取A50相关指数30分钟K线数据
  26. Parameters:
  27. days: 获取近N天的数据(默认40天)
  28. Returns:
  29. DataFrame with columns: datetime, open, high, low, close, volume
  30. """
  31. end_date = datetime.now()
  32. start_date = end_date - timedelta(days=days+5)
  33. print(f"\n{'='*60}")
  34. print(f"获取A50相关数据 (近{days}天)")
  35. print(f"时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
  36. print(f"{'='*60}")
  37. # 尝试多个数据源
  38. df = None
  39. # 方法1: 使用A50 ETF作为替代
  40. df = self._fetch_a50_etf(start_date, end_date)
  41. if df is not None and len(df) > 0:
  42. print(f"\n✅ 使用A50 ETF数据作为替代")
  43. return df
  44. # 方法2: 使用上证50指数
  45. df = self._fetch_sz50_index(start_date, end_date)
  46. if df is not None and len(df) > 0:
  47. print(f"\n✅ 使用上证50指数作为替代")
  48. return df
  49. # 方法3: 生成模拟数据(用于测试)
  50. print("\n⚠️ 无法获取实时数据,生成模拟数据用于测试...")
  51. df = self._generate_sample_data(days)
  52. return df
  53. def _fetch_a50_etf(self, start_date, end_date):
  54. """获取华夏A50ETF数据作为A50期货的替代"""
  55. try:
  56. print("\n[数据源1] 获取华夏A50ETF(159601)数据...")
  57. # 使用akshare获取ETF分钟数据
  58. df = ak.fund_etf_hist_min_em(symbol="159601", period="30",
  59. start_date=start_date.strftime('%Y%m%d%H%M%S'),
  60. end_date=end_date.strftime('%Y%m%d%H%M%S'))
  61. if df is not None and not df.empty and len(df) > 10:
  62. # 标准化列名
  63. df = df.rename(columns={
  64. '时间': 'datetime',
  65. '开盘': 'open',
  66. '收盘': 'close',
  67. '最高': 'high',
  68. '最低': 'low',
  69. '成交量': 'volume'
  70. })
  71. df['datetime'] = pd.to_datetime(df['datetime'])
  72. df = df.set_index('datetime').sort_index()
  73. print(f" ✅ 获取成功: {len(df)}条")
  74. return df
  75. else:
  76. print(f" 数据不足")
  77. except Exception as e:
  78. print(f" 获取失败: {e}")
  79. return None
  80. def _fetch_sz50_index(self, start_date, end_date):
  81. """获取上证50指数作为替代"""
  82. try:
  83. print("\n[数据源2] 获取上证50指数(000016)数据...")
  84. df = ak.index_zh_a_hist_min_em(symbol="000016", period="30",
  85. start_date=start_date.strftime('%Y%m%d%H%M%S'),
  86. end_date=end_date.strftime('%Y%m%d%H%M%S'))
  87. if df is not None and not df.empty and len(df) > 10:
  88. df = df.rename(columns={
  89. '时间': 'datetime',
  90. '开盘': 'open',
  91. '收盘': 'close',
  92. '最高': 'high',
  93. '最低': 'low',
  94. '成交量': 'volume'
  95. })
  96. df['datetime'] = pd.to_datetime(df['datetime'])
  97. df = df.set_index('datetime').sort_index()
  98. print(f" ✅ 获取成功: {len(df)}条")
  99. return df
  100. else:
  101. print(f" 数据不足")
  102. except Exception as e:
  103. print(f" 获取失败: {e}")
  104. return None
  105. def _generate_sample_data(self, days=40):
  106. """生成A50风格的模拟数据(用于测试)"""
  107. print("\n[模拟数据] 生成A50风格模拟数据...")
  108. # 生成时间序列(交易日9:30-11:30, 13:00-15:00)
  109. dates = pd.date_range(end=datetime.now(), periods=days, freq='B') # 工作日
  110. all_times = []
  111. for date in dates:
  112. # 上午 9:30-11:30 (4个30分钟)
  113. for hour, minute in [(9, 30), (10, 0), (10, 30), (11, 0)]:
  114. all_times.append(date.replace(hour=hour, minute=minute))
  115. # 下午 13:00-15:00 (4个30分钟)
  116. for hour, minute in [(13, 0), (13, 30), (14, 0), (14, 30)]:
  117. all_times.append(date.replace(hour=hour, minute=minute))
  118. all_times = sorted([t for t in all_times if t <= datetime.now()])
  119. n = len(all_times)
  120. # A50基准价约13500点
  121. base_price = 13500
  122. # 生成随机价格序列(带趋势)
  123. np.random.seed(42)
  124. returns = np.random.normal(0.0002, 0.008, n) # A50波动率
  125. # 添加一些趋势
  126. trend = np.sin(np.linspace(0, 4*np.pi, n)) * 0.002
  127. returns += trend
  128. # 计算价格
  129. prices = base_price * np.exp(np.cumsum(returns))
  130. # 生成OHLC数据
  131. data = []
  132. for i, (time, price) in enumerate(zip(all_times, prices)):
  133. volatility = price * 0.003 # 0.3%波动
  134. open_price = price + np.random.normal(0, volatility * 0.3)
  135. high_price = max(open_price, price) + np.random.uniform(0, volatility)
  136. low_price = min(open_price, price) - np.random.uniform(0, volatility)
  137. close_price = price
  138. volume = np.random.randint(100000, 500000)
  139. data.append({
  140. 'datetime': time,
  141. 'open': round(open_price, 2),
  142. 'high': round(high_price, 2),
  143. 'low': round(low_price, 2),
  144. 'close': round(close_price, 2),
  145. 'volume': volume
  146. })
  147. df = pd.DataFrame(data)
  148. df = df.set_index('datetime').sort_index()
  149. print(f" ✅ 生成模拟数据: {len(df)}条")
  150. print(f" ⚠️ 注意:此为模拟数据,仅用于测试")
  151. return df
  152. def save_to_csv(self, df, filename=None):
  153. """保存数据到CSV文件"""
  154. if df is None or len(df) == 0:
  155. print("❌ 无数据可保存")
  156. return None
  157. if filename is None:
  158. filename = f"A50_futures_30min_{datetime.now().strftime('%Y%m%d')}.csv"
  159. try:
  160. df.to_csv(filename)
  161. import os
  162. print(f"\n💾 数据已保存: {filename}")
  163. print(f" 数据条数: {len(df)}")
  164. print(f" 文件大小: {round(os.path.getsize(filename)/1024, 2)} KB")
  165. return filename
  166. except Exception as e:
  167. print(f"❌ 保存失败: {e}")
  168. return None
  169. def show_statistics(self, df):
  170. """显示数据统计信息"""
  171. if df is None or len(df) == 0:
  172. return
  173. print(f"\n{'='*60}")
  174. print(f"数据统计信息")
  175. print(f"{'='*60}")
  176. print(f"\n【基本信息】")
  177. print(f" 数据条数: {len(df)}")
  178. print(f" 时间区间: {df.index[0]} 至 {df.index[-1]}")
  179. unique_dates = set([d.date() for d in pd.to_datetime(df.index)])
  180. print(f" 交易天数: {len(unique_dates)}天")
  181. print(f"\n【价格统计】")
  182. print(f" 最高价: {df['high'].max():.2f}")
  183. print(f" 最低价: {df['low'].min():.2f}")
  184. print(f" 最新价: {df['close'].iloc[-1]:.2f}")
  185. # 计算涨跌幅
  186. price_change = (df['close'].iloc[-1] - df['close'].iloc[0]) / df['close'].iloc[0] * 100
  187. print(f" 区间涨跌: {price_change:+.2f}%")
  188. print(f"\n【成交量统计】")
  189. print(f" 总成交量: {df['volume'].sum():,.0f}")
  190. print(f" 日均成交: {df['volume'].mean():,.0f}")
  191. print(f"\n【波动率统计】")
  192. df['returns'] = df['close'].pct_change()
  193. volatility = df['returns'].std() * np.sqrt(48) * 100 # 年化波动率
  194. print(f" 年化波动率: {volatility:.2f}%")
  195. def main():
  196. """主程序"""
  197. print("="*70)
  198. print("A50期货30分钟K线数据获取工具")
  199. print("="*70)
  200. print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  201. # 创建数据获取器
  202. fetcher = A50FuturesDataFetcher()
  203. # 获取近40天数据
  204. df = fetcher.fetch_30min_data(days=40)
  205. if df is not None and len(df) > 0:
  206. # 显示统计信息
  207. fetcher.show_statistics(df)
  208. # 保存到CSV
  209. fetcher.save_to_csv(df)
  210. # 显示最新10条数据
  211. print(f"\n{'='*60}")
  212. print(f"最新10条数据")
  213. print(f"{'='*60}")
  214. print(df.tail(10)[['open', 'high', 'low', 'close', 'volume']].to_string())
  215. print(f"\n✅ 数据获取完成!")
  216. else:
  217. print(f"\n❌ 数据获取失败")
  218. print("="*70)
  219. if __name__ == "__main__":
  220. import os
  221. main()