#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 实时交易报告系统 (修复数据连续性) 支持新浪、腾讯、网易多数据源 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') sys.path.insert(0, '/root/.openclaw/workspace/quant') import pandas as pd import numpy as np from datetime import datetime, timedelta import smtplib import ssl from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import warnings import requests import json import time warnings.filterwarnings('ignore') # ==================== 邮件配置 ==================== EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "catfly@openclaw.local", "receiver_email": "380880504@qq.com" } # ==================== 多数据源获取 ==================== def fetch_sina_realtime(): """新浪实时行情接口""" try: url = "https://hq.sinajs.cn/list=sz399673" headers = { 'Referer': 'https://finance.sina.com.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'var hq_str_sz399673="' in data_str: data_str = data_str.split('var hq_str_sz399673="')[1].split('"')[0] parts = data_str.split(',') if len(parts) >= 32: # 解析日期时间 date_str = parts[30].strip() # YYYYMMDD time_str = parts[31].strip() if len(parts) > 31 else "" if len(date_str) == 8 and date_str.isdigit(): date = pd.Timestamp(f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}") else: date = pd.Timestamp.now().normalize() return { 'open': float(parts[1]), 'high': float(parts[4]), 'low': float(parts[5]), 'close': float(parts[3]), 'volume': int(parts[8]), 'name': parts[0], 'date': date, 'time': time_str } except Exception as e: print(f"新浪实时数据获取失败: {e}") return None def fetch_tencent_realtime(): """腾讯实时行情接口""" try: url = "https://qt.gtimg.cn/q=sz399673" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'v_sz399673="' in data_str: data_str = data_str.split('v_sz399673="')[1].split('"')[0] parts = data_str.split('~') if len(parts) >= 45: # 日期格式: YYYY-MM-DD date_str = parts[30] date = pd.Timestamp(date_str) return { 'name': parts[1], 'close': float(parts[3]), 'open': float(parts[5]), 'high': float(parts[33]), 'low': float(parts[34]), 'volume': int(parts[36]), 'date': date } except Exception as e: print(f"腾讯实时数据获取失败: {e}") return None def fetch_akshare_hist(start_date, end_date): """使用akshare获取历史数据""" try: import akshare as ak print(f" 使用akshare获取 {start_date} 到 {end_date} 的历史数据...") # 获取创业板50历史行情 df = ak.index_zh_a_hist(symbol="399673", period="daily", start_date=start_date, end_date=end_date) if df is not None and len(df) > 0: data_list = [] for _, row in df.iterrows(): data_list.append({ 'date': pd.Timestamp(row['日期']), 'open': float(row['开盘']), 'high': float(row['最高']), 'low': float(row['最低']), 'close': float(row['收盘']), 'volume': int(row['成交量']) }) return data_list except Exception as e: print(f" akshare获取失败: {e}") return None def fetch_baostock_hist(start_date, end_date): """使用baostock获取历史数据""" try: import baostock as bs print(f" 使用baostock获取 {start_date} 到 {end_date} 的历史数据...") lg = bs.login() if lg.error_code != '0': print(f" baostock登录失败: {lg.error_msg}") return None # 创业板50代码: sz.399673 rs = bs.query_history_k_data_plus("sz.399673", "date,open,high,low,close,volume", start_date=start_date, end_date=end_date, frequency="d", adjustflag="3") data_list = [] while (rs.error_code == '0') & rs.next(): row = rs.get_row_data() if row[0]: # date data_list.append({ 'date': pd.Timestamp(row[0]), 'open': float(row[1]) if row[1] else 0, 'high': float(row[2]) if row[2] else 0, 'low': float(row[3]) if row[3] else 0, 'close': float(row[4]) if row[4] else 0, 'volume': int(float(row[5])) if row[5] else 0 }) bs.logout() if data_list: return data_list except Exception as e: print(f" baostock获取失败: {e}") return None def fetch_missing_data(last_date, today): """获取缺失的历史数据""" if last_date >= today: return [] start_str = (last_date + timedelta(days=1)).strftime('%Y-%m-%d') end_str = (today - timedelta(days=1)).strftime('%Y-%m-%d') print(f"\n📊 需要补全数据: {start_str} 到 {end_str}") # 尝试akshare data = fetch_akshare_hist(start_str.replace('-', ''), end_str.replace('-', '')) if data: print(f" ✅ akshare获取成功: {len(data)} 条") return data # 尝试baostock data = fetch_baostock_hist(start_str, end_str) if data: print(f" ✅ baostock获取成功: {len(data)} 条") return data print(" ⚠️ 无法获取缺失数据,将仅使用实时数据") return [] def fetch_realtime_data(): """获取实时数据,尝试多个数据源""" print("尝试获取实时数据...") # 尝试新浪实时 print(" 尝试新浪实时接口...") data = fetch_sina_realtime() if data: print(f" ✅ 新浪实时数据获取成功") return data, 'sina_realtime' # 尝试腾讯实时 print(" 尝试腾讯实时接口...") data = fetch_tencent_realtime() if data: print(f" ✅ 腾讯实时数据获取成功") return data, 'tencent_realtime' print(" ❌ 所有数据源均失败") return None, None # ==================== 趋势跟踪策略 ==================== class TrendTrackingStrategy: """趋势跟踪策略 - 修复数据连续性版""" def __init__(self): self.data = None self.data_gaps = [] # 记录数据缺口 def load_and_merge_data(self, csv_file='cyb50_baostock.csv'): """加载历史数据并合并实时数据""" try: # 加载历史数据 df = pd.read_csv(f'/root/.openclaw/workspace/quant/{csv_file}') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 获取最新历史数据日期 last_hist_date = df.index[-1] today = pd.Timestamp.now().normalize() print(f"历史数据最新日期: {last_hist_date.date()}") print(f"当前日期: {today.date()}") # 检查数据缺口 missing_data = [] if last_hist_date < today - timedelta(days=1): # 尝试获取缺失的历史数据 missing_data = fetch_missing_data(last_hist_date, today) if missing_data: # 合并缺失数据 for item in missing_data: new_row = pd.DataFrame({ 'open': [item['open']], 'high': [item['high']], 'low': [item['low']], 'close': [item['close']], 'volume': [item['volume']] }, index=[item['date']]) df = pd.concat([df, new_row]) df = df.sort_index() last_hist_date = df.index[-1] print(f"✅ 已合并缺失数据,最新历史日期: {last_hist_date.date()}") else: # 记录数据缺口 gap_start = last_hist_date + timedelta(days=1) gap_end = today - timedelta(days=1) self.data_gaps.append(f"{gap_start.date()} 到 {gap_end.date()}") print(f"⚠️ 数据缺口: {gap_start.date()} 到 {gap_end.date()}") # 获取今天的实时数据 if last_hist_date < today: realtime_data, source = fetch_realtime_data() if realtime_data: date = realtime_data['date'] # 检查是否已有该日期数据 if date > last_hist_date: new_row = pd.DataFrame({ 'open': [realtime_data['open']], 'high': [realtime_data['high']], 'low': [realtime_data['low']], 'close': [realtime_data['close']], 'volume': [realtime_data['volume']] }, index=[date]) df = pd.concat([df, new_row]) print(f"✅ 已合并{source}数据: {date.date()} 收盘价 {realtime_data['close']}") else: print(f"⚠️ 获取的数据日期({date.date()})不大于历史最新日期") else: print("⚠️ 未获取到实时数据") else: print("✅ 历史数据已是最新") self.data = df.sort_index() print(f"\n数据范围: {self.data.index[0].date()} ~ {self.data.index[-1].date()}") print(f"数据条数: {len(self.data)}") if self.data_gaps: print(f"⚠️ 警告: 存在数据缺口: {', '.join(self.data_gaps)}") return True except Exception as e: print(f"数据加载失败: {e}") import traceback traceback.print_exc() return False def calculate_indicators(self): """计算技术指标""" df = self.data.copy() # 均线 df['ma10'] = df['close'].rolling(window=10, min_periods=1).mean() df['ma30'] = df['close'].rolling(window=30, min_periods=1).mean() # 20日高低点 df['high_20'] = df['high'].rolling(window=20).max() df['low_20'] = df['low'].rolling(window=20).min() # 10日涨幅 df['ret_10'] = df['close'].pct_change(periods=10) # ATR tr1 = df['high'] - df['low'] tr2 = abs(df['high'] - df['close'].shift(1)) tr3 = abs(df['low'] - df['close'].shift(1)) df['tr'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = df['tr'].rolling(window=20).mean() self.data = df return df def generate_signals(self): """生成交易信号""" df = self.data # 买入条件 buy_cond = ( (df['close'] > df['ma10']) & (df['ma10'] > df['ma30']) & (df['close'] >= df['high_20'] * 0.995) & (df['ret_10'] > 0.02) ) # 卖出条件 sell_cond = ( (df['close'] < df['ma30']) | (df['close'] <= df['low_20'] * 1.005) ) df['signal'] = 0 df.loc[buy_cond, 'signal'] = 1 df.loc[sell_cond, 'signal'] = -1 return df def backtest(self, initial_capital=1000000): """回测计算""" df = self.generate_signals() position = 0 entry_price = 0 peak_price = 0 capital = initial_capital trades = [] for i in range(30, len(df)): date = df.index[i] price = df['close'].iloc[i] signal = df['signal'].iloc[i] # 移动止损检查 if position > 0: if price > peak_price: peak_price = price if price < peak_price * 0.90: signal = -1 # 买入 if signal == 1 and position == 0: position = 1 entry_price = price peak_price = price trades.append({ 'date': date, 'action': 'BUY', 'price': price, 'capital': capital }) # 卖出 elif signal == -1 and position == 1: pnl = (price / entry_price - 1) * capital capital += pnl position = 0 trades.append({ 'date': date, 'action': 'SELL', 'price': price, 'capital': capital, 'pnl': pnl, 'return_pct': (price / entry_price - 1) * 100 }) current_position = position current_price = df['close'].iloc[-1] if position == 1: unrealized_pnl = (current_price / entry_price - 1) * capital total_value = capital + unrealized_pnl else: total_value = capital total_return = (total_value / initial_capital - 1) * 100 return { 'trades': trades, 'current_position': current_position, 'current_price': current_price, 'entry_price': entry_price if position == 1 else None, 'capital': capital, 'total_value': total_value, 'total_return': total_return, 'trade_count': len([t for t in trades if t['action'] == 'SELL']), 'data_end_date': df.index[-1].strftime('%Y-%m-%d'), 'data_gaps': self.data_gaps } def get_recent_indicators(self, days=20): """获取近N天指标详情""" df = self.data.tail(days).copy() indicators = [] for date, row in df.iterrows(): indicators.append({ 'date': date.strftime('%Y-%m-%d'), 'close': round(row['close'], 2), 'ma10': round(row['ma10'], 2) if not pd.isna(row['ma10']) else '-', 'ma30': round(row['ma30'], 2) if not pd.isna(row['ma30']) else '-', 'high_20': round(row['high_20'], 2) if not pd.isna(row['high_20']) else '-', 'ret_10': f"{row['ret_10']*100:.2f}%" if not pd.isna(row['ret_10']) else '-', 'signal': '买入' if row['signal'] == 1 else ('卖出' if row['signal'] == -1 else '持有'), 'atr': round(row['atr'], 2) if not pd.isna(row['atr']) else '-' }) return indicators def get_recent_trades(self, n=20): """获取近N次交易详情""" result = self.backtest() trades = result['trades'][-n:] return trades def generate_report(): """生成完整报告""" strategy = TrendTrackingStrategy() if not strategy.load_and_merge_data(): return None, None, None strategy.calculate_indicators() result = strategy.backtest() recent_indicators = strategy.get_recent_indicators(20) recent_trades = strategy.get_recent_trades(20) # 数据缺口警告 gap_warning = "" if result['data_gaps']: gap_warning = f"
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{gap_warning}入场价格: {result['entry_price']:.2f} 元
当前浮盈: {unrealized:+.2f}%
| 日期 | 收盘价 | MA10 | MA30 | 20日高 | 10日涨幅 | 信号 | ATR |
|---|---|---|---|---|---|---|---|
| {ind['date']} | {ind['close']} | {ind['ma10']} | {ind['ma30']} | {ind['high_20']} | {ind['ret_10']} | {ind['signal']} | {ind['atr']} |
| 日期 | 操作 | 价格 | 盈亏金额 | 盈亏比例 | 总资产 |
|---|---|---|---|---|---|
| {date_str} | {trade['action']} | {trade['price']:.2f} | {f'{pnl:+,.0f}元' if has_pnl else '-'} | {ret_text} | {trade['capital']:,.0f}元 |