| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- #!/usr/bin/env python3
- """
- 创业板50指数数据获取模块 (纯标准库版本)
- 使用 mairui API 获取 K线数据,无需安装 pandas
- """
- import urllib.request
- import urllib.error
- import json
- import os
- import csv
- from datetime import datetime
- from typing import Optional, List, Dict
- class MairuiDataFetcher:
- """mairui 数据获取类 (纯标准库)"""
-
- # API 配置
- BASE_URL = "https://api.mairuiapi.com/hsindex/history"
- TOKEN = "AE17EE23-AAE4-492F-A959-EC883DFA5A76"
-
- # 指数代码映射
- INDEX_CODES = {
- "cyb50": "399673.SZ",
- "cy": "399006.SZ",
- "sh": "000001.SH",
- "hs300": "000300.SH",
- "sz": "399001.SZ",
- }
-
- def __init__(self, data_dir: str = "./data"):
- self.data_dir = data_dir
- os.makedirs(data_dir, exist_ok=True)
-
- def fetch_data(
- self,
- index_code: str = "399673.SZ",
- timeframe: str = "30",
- start_date: Optional[str] = None,
- end_date: Optional[str] = None
- ) -> List[Dict]:
- """
- 获取K线数据
-
- Args:
- index_code: 指数代码
- timeframe: K线周期 (d=日线, 30=30分钟, 60=60分钟)
- start_date: 开始日期 (YYYY-MM-DD)
- end_date: 结束日期 (YYYY-MM-DD)
-
- Returns:
- 数据列表,每个元素是一个字典
- """
- # 日期格式转换
- if start_date:
- st = start_date.replace("-", "")
- else:
- st = "20230101"
-
- if end_date:
- et = end_date.replace("-", "")
- else:
- et = datetime.now().strftime("%Y%m%d")
-
- print(f"正在获取 {index_code} 的{timeframe}分钟K线数据...")
- print(f"时间范围: {start_date or st} 至 {end_date or et}")
-
- # 构建API URL
- url = f"{self.BASE_URL}/{index_code}/{timeframe}/{self.TOKEN}?st={st}&et={et}"
- print(f"API URL: {url}")
-
- try:
- # 使用标准库发送请求
- req = urllib.request.Request(url, headers={
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0'
- })
-
- with urllib.request.urlopen(req, timeout=60) as response:
- data = json.loads(response.read().decode('utf-8'))
-
- # 解析数据
- if isinstance(data, list):
- records = data
- elif isinstance(data, dict):
- records = data.get("data", data.get("list", []))
- else:
- records = []
-
- if not records:
- print("✗ API返回空数据")
- return []
-
- print(f"✓ 获取到 {len(records)} 条数据")
-
- # 标准化数据
- standardized_records = []
- for record in records:
- std_record = {
- "datetime": record.get("d") or record.get("t"),
- "open": record.get("o"),
- "high": record.get("h"),
- "low": record.get("l"),
- "close": record.get("c"),
- "volume": record.get("v"),
- "amount": record.get("a"),
- }
- standardized_records.append(std_record)
-
- # 按时间排序
- standardized_records.sort(key=lambda x: x["datetime"])
-
- if standardized_records:
- print(f" 时间范围: {standardized_records[0]['datetime']} 至 {standardized_records[-1]['datetime']}")
-
- return standardized_records
-
- except urllib.error.URLError as e:
- print(f"✗ 网络请求失败: {e}")
- return []
- except json.JSONDecodeError as e:
- print(f"✗ JSON解析失败: {e}")
- return []
- except Exception as e:
- print(f"✗ 处理失败: {e}")
- import traceback
- traceback.print_exc()
- return []
-
- def save_to_csv(self, records: List[Dict], filename: str) -> str:
- """保存数据到CSV文件"""
- if not records:
- print("✗ 没有数据可保存")
- return ""
-
- filepath = os.path.join(self.data_dir, filename)
-
- # 获取所有字段名
- fieldnames = ["datetime", "open", "high", "low", "close", "volume", "amount"]
-
- with open(filepath, 'w', newline='', encoding='utf-8') as f:
- writer = csv.DictWriter(f, fieldnames=fieldnames)
- writer.writeheader()
- writer.writerows(records)
-
- print(f"✓ 数据已保存到: {filepath}")
- return filepath
-
- def load_from_csv(self, filename: str) -> List[Dict]:
- """从CSV文件加载数据"""
- filepath = os.path.join(self.data_dir, filename)
- if not os.path.exists(filepath):
- print(f"✗ 文件不存在: {filepath}")
- return []
-
- records = []
- with open(filepath, 'r', encoding='utf-8') as f:
- reader = csv.DictReader(f)
- for row in reader:
- records.append(dict(row))
-
- print(f"✓ 已从 {filepath} 加载 {len(records)} 条记录")
- return records
-
- def print_preview(self, records: List[Dict], head: int = 10, tail: int = 5):
- """打印数据预览"""
- if not records:
- print("没有数据")
- return
-
- print(f"\n数据预览 (前{head}条):")
- print("-" * 80)
- print(f"{'datetime':<20} {'open':<10} {'high':<10} {'low':<10} {'close':<10} {'volume':<12}")
- print("-" * 80)
- for r in records[:head]:
- print(f"{r.get('datetime', ''):<20} {r.get('open', ''):<10} {r.get('high', ''):<10} "
- f"{r.get('low', ''):<10} {r.get('close', ''):<10} {r.get('volume', ''):<12}")
-
- if len(records) > head + tail:
- print(f"\n... ({len(records) - head - tail} 条数据省略) ...\n")
-
- print(f"数据预览 (后{tail}条):")
- print("-" * 80)
- for r in records[-tail:]:
- print(f"{r.get('datetime', ''):<20} {r.get('open', ''):<10} {r.get('high', ''):<10} "
- f"{r.get('low', ''):<10} {r.get('close', ''):<10} {r.get('volume', ''):<12}")
- print("-" * 80)
- def fetch_cyb50(
- timeframe: str = "d",
- start_date: str = "2023-01-01",
- end_date: Optional[str] = None,
- save: bool = True
- ) -> List[Dict]:
- """获取创业板50指数数据(便捷函数)"""
- fetcher = MairuiDataFetcher(data_dir="./data")
-
- records = fetcher.fetch_data(
- index_code="399673.SZ",
- timeframe=timeframe,
- start_date=start_date,
- end_date=end_date
- )
-
- if save and records:
- end_str = end_date.replace("-", "") if end_date else datetime.now().strftime("%Y%m%d")
- tf_name = "day" if timeframe == "d" else f"{timeframe}min"
- filename = f"cyb50_{tf_name}_{start_date.replace('-', '')}_{end_str}.csv"
- fetcher.save_to_csv(records, filename)
-
- return records
- if __name__ == "__main__":
- import argparse
-
- parser = argparse.ArgumentParser(description="获取指数K线数据")
- parser.add_argument("--code", default="399673.SZ", help="指数代码")
- parser.add_argument("--tf", default="d", help="K线周期: d=日线, 30=30分钟")
- parser.add_argument("--start", default="2015-01-01", help="开始日期 (YYYY-MM-DD)")
- parser.add_argument("--end", default=None, help="结束日期 (YYYY-MM-DD)")
- parser.add_argument("--no-save", action="store_true", help="不保存到文件")
-
- args = parser.parse_args()
-
- print("=" * 80)
- print("mairui 指数数据获取工具 (纯标准库版本)")
- print("=" * 80)
-
- fetcher = MairuiDataFetcher(data_dir="./data")
- records = fetcher.fetch_data(
- index_code=args.code,
- timeframe=args.tf,
- start_date=args.start,
- end_date=args.end
- )
-
- if records:
- fetcher.print_preview(records)
-
- print(f"\n数据统计:")
- print(f" 总记录数: {len(records)}")
-
- if not args.no_save:
- end_str = args.end.replace("-", "") if args.end else datetime.now().strftime("%Y%m%d")
- tf_name = "day" if args.tf == "d" else f"{args.tf}min"
- filename = f"{args.code.replace('.', '_')}_{tf_name}_{args.start.replace('-', '')}_{end_str}.csv"
- fetcher.save_to_csv(records, filename)
- else:
- print("\n✗ 数据获取失败")
- exit(1)
-
- print("\n" + "=" * 80)
- print("完成!")
- print("=" * 80)
|