#!/usr/bin/env python3 """ 创业板50指数数据获取模块 (纯标准库版本) 使用 mairui API 获取 K线数据,无需安装 pandas """ import urllib.request import urllib.error import json import os import csv from datetime import datetime from typing import Optional, List, Dict class MairuiDataFetcher: """mairui 数据获取类 (纯标准库)""" # API 配置 BASE_URL = "https://api.mairuiapi.com/hsindex/history" TOKEN = "AE17EE23-AAE4-492F-A959-EC883DFA5A76" # 指数代码映射 INDEX_CODES = { "cyb50": "399673.SZ", "cy": "399006.SZ", "sh": "000001.SH", "hs300": "000300.SH", "sz": "399001.SZ", } def __init__(self, data_dir: str = "./data"): self.data_dir = data_dir os.makedirs(data_dir, exist_ok=True) def fetch_data( self, index_code: str = "399673.SZ", timeframe: str = "30", start_date: Optional[str] = None, end_date: Optional[str] = None ) -> List[Dict]: """ 获取K线数据 Args: index_code: 指数代码 timeframe: K线周期 (d=日线, 30=30分钟, 60=60分钟) start_date: 开始日期 (YYYY-MM-DD) end_date: 结束日期 (YYYY-MM-DD) Returns: 数据列表,每个元素是一个字典 """ # 日期格式转换 if start_date: st = start_date.replace("-", "") else: st = "20230101" if end_date: et = end_date.replace("-", "") else: et = datetime.now().strftime("%Y%m%d") print(f"正在获取 {index_code} 的{timeframe}分钟K线数据...") print(f"时间范围: {start_date or st} 至 {end_date or et}") # 构建API URL url = f"{self.BASE_URL}/{index_code}/{timeframe}/{self.TOKEN}?st={st}&et={et}" print(f"API URL: {url}") try: # 使用标准库发送请求 req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.0' }) with urllib.request.urlopen(req, timeout=60) as response: data = json.loads(response.read().decode('utf-8')) # 解析数据 if isinstance(data, list): records = data elif isinstance(data, dict): records = data.get("data", data.get("list", [])) else: records = [] if not records: print("✗ API返回空数据") return [] print(f"✓ 获取到 {len(records)} 条数据") # 标准化数据 standardized_records = [] for record in records: std_record = { "datetime": record.get("d") or record.get("t"), "open": record.get("o"), "high": record.get("h"), "low": record.get("l"), "close": record.get("c"), "volume": record.get("v"), "amount": record.get("a"), } standardized_records.append(std_record) # 按时间排序 standardized_records.sort(key=lambda x: x["datetime"]) if standardized_records: print(f" 时间范围: {standardized_records[0]['datetime']} 至 {standardized_records[-1]['datetime']}") return standardized_records except urllib.error.URLError as e: print(f"✗ 网络请求失败: {e}") return [] except json.JSONDecodeError as e: print(f"✗ JSON解析失败: {e}") return [] except Exception as e: print(f"✗ 处理失败: {e}") import traceback traceback.print_exc() return [] def save_to_csv(self, records: List[Dict], filename: str) -> str: """保存数据到CSV文件""" if not records: print("✗ 没有数据可保存") return "" filepath = os.path.join(self.data_dir, filename) # 获取所有字段名 fieldnames = ["datetime", "open", "high", "low", "close", "volume", "amount"] with open(filepath, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(records) print(f"✓ 数据已保存到: {filepath}") return filepath def load_from_csv(self, filename: str) -> List[Dict]: """从CSV文件加载数据""" filepath = os.path.join(self.data_dir, filename) if not os.path.exists(filepath): print(f"✗ 文件不存在: {filepath}") return [] records = [] with open(filepath, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: records.append(dict(row)) print(f"✓ 已从 {filepath} 加载 {len(records)} 条记录") return records def print_preview(self, records: List[Dict], head: int = 10, tail: int = 5): """打印数据预览""" if not records: print("没有数据") return print(f"\n数据预览 (前{head}条):") print("-" * 80) print(f"{'datetime':<20} {'open':<10} {'high':<10} {'low':<10} {'close':<10} {'volume':<12}") print("-" * 80) for r in records[:head]: print(f"{r.get('datetime', ''):<20} {r.get('open', ''):<10} {r.get('high', ''):<10} " f"{r.get('low', ''):<10} {r.get('close', ''):<10} {r.get('volume', ''):<12}") if len(records) > head + tail: print(f"\n... ({len(records) - head - tail} 条数据省略) ...\n") print(f"数据预览 (后{tail}条):") print("-" * 80) for r in records[-tail:]: print(f"{r.get('datetime', ''):<20} {r.get('open', ''):<10} {r.get('high', ''):<10} " f"{r.get('low', ''):<10} {r.get('close', ''):<10} {r.get('volume', ''):<12}") print("-" * 80) def fetch_cyb50( timeframe: str = "d", start_date: str = "2023-01-01", end_date: Optional[str] = None, save: bool = True ) -> List[Dict]: """获取创业板50指数数据(便捷函数)""" fetcher = MairuiDataFetcher(data_dir="./data") records = fetcher.fetch_data( index_code="399673.SZ", timeframe=timeframe, start_date=start_date, end_date=end_date ) if save and records: end_str = end_date.replace("-", "") if end_date else datetime.now().strftime("%Y%m%d") tf_name = "day" if timeframe == "d" else f"{timeframe}min" filename = f"cyb50_{tf_name}_{start_date.replace('-', '')}_{end_str}.csv" fetcher.save_to_csv(records, filename) return records if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="获取指数K线数据") parser.add_argument("--code", default="399673.SZ", help="指数代码") parser.add_argument("--tf", default="d", help="K线周期: d=日线, 30=30分钟") parser.add_argument("--start", default="2015-01-01", help="开始日期 (YYYY-MM-DD)") parser.add_argument("--end", default=None, help="结束日期 (YYYY-MM-DD)") parser.add_argument("--no-save", action="store_true", help="不保存到文件") args = parser.parse_args() print("=" * 80) print("mairui 指数数据获取工具 (纯标准库版本)") print("=" * 80) fetcher = MairuiDataFetcher(data_dir="./data") records = fetcher.fetch_data( index_code=args.code, timeframe=args.tf, start_date=args.start, end_date=args.end ) if records: fetcher.print_preview(records) print(f"\n数据统计:") print(f" 总记录数: {len(records)}") if not args.no_save: end_str = args.end.replace("-", "") if args.end else datetime.now().strftime("%Y%m%d") tf_name = "day" if args.tf == "d" else f"{args.tf}min" filename = f"{args.code.replace('.', '_')}_{tf_name}_{args.start.replace('-', '')}_{end_str}.csv" fetcher.save_to_csv(records, filename) else: print("\n✗ 数据获取失败") exit(1) print("\n" + "=" * 80) print("完成!") print("=" * 80)