#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 实时交易报告系统 (多数据源) 支持新浪、腾讯、东方财富接口 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') sys.path.insert(0, '/root/.openclaw/workspace/quant') import pandas as pd import numpy as np from datetime import datetime, timedelta import smtplib import ssl from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import warnings import requests import json import time warnings.filterwarnings('ignore') # ==================== 邮件配置 ==================== EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "catfly@erwin.wang", "receiver_email": "380880504@qq.com" } # ==================== 多数据源获取 ==================== def fetch_sina_realtime(): """新浪实时行情接口""" try: # 新浪股票代码格式: sh000001(上证), sz399673(创业板50) url = "https://hq.sinajs.cn/list=sz399673" headers = { 'Referer': 'https://finance.sina.com.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' # 解析返回数据 data_str = response.text if 'var hq_str_sz399673="' in data_str: data_str = data_str.split('var hq_str_sz399673="')[1].split('"')[0] parts = data_str.split(',') if len(parts) >= 32: return { 'open': float(parts[1]), 'high': float(parts[4]), 'low': float(parts[5]), 'close': float(parts[3]), 'volume': int(parts[8]), 'name': parts[0], 'time': parts[31], 'date': parts[30] } except Exception as e: print(f"新浪实时数据获取失败: {e}") return None def fetch_tencent_realtime(): """腾讯实时行情接口""" try: # 腾讯股票代码格式: sh000001, sz399673 url = "https://qt.gtimg.cn/q=sz399673" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'v_sz399673="' in data_str: data_str = data_str.split('v_sz399673="')[1].split('"')[0] parts = data_str.split('~') if len(parts) >= 45: return { 'name': parts[1], 'close': float(parts[3]), 'open': float(parts[5]), 'high': float(parts[33]), 'low': float(parts[34]), 'volume': int(parts[36]), 'date': parts[30] } except Exception as e: print(f"腾讯实时数据获取失败: {e}") return None def fetch_sina_hist_t1(): """新浪历史数据 - T-1日""" try: # 新浪历史行情接口 today = datetime.now() url = f"https://quotes.sina.cn/cn/api/quotes.php?symbol=sz399673" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) if response.status_code == 200: data = response.json() if 'data' in data and len(data['data']) > 0: item = data['data'][0] return { 'date': item.get('date'), 'open': float(item.get('open', 0)), 'high': float(item.get('high', 0)), 'low': float(item.get('low', 0)), 'close': float(item.get('close', 0)), 'volume': int(item.get('volume', 0)) } except Exception as e: print(f"新浪历史数据获取失败: {e}") return None def fetch_163_hist(): """网易财经历史数据接口""" try: url = "http://quotes.money.163.com/service/chddata.html" params = { 'code': '1399673', # 创业板50代码 'start': (datetime.now() - timedelta(days=30)).strftime('%Y%m%d'), 'end': datetime.now().strftime('%Y%m%d'), 'fields': 'TCLOSE;HIGH;LOW;TOPEN;CHG;PCHG;VOTURNOVER' } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, params=params, headers=headers, timeout=10) if response.status_code == 200: # 解析CSV格式数据 lines = response.text.strip().split('\n') if len(lines) > 1: # 取最新一条 latest = lines[-1].split(',') if len(latest) >= 6: return { 'date': latest[0], 'open': float(latest[1]), 'high': float(latest[2]), 'low': float(latest[3]), 'close': float(latest[4]), 'volume': int(float(latest[5])) if latest[5] else 0 } except Exception as e: print(f"网易历史数据获取失败: {e}") return None def fetch_realtime_data(): """获取实时数据,尝试多个数据源""" print("尝试获取实时数据...") # 尝试新浪实时 print(" 尝试新浪实时接口...") data = fetch_sina_realtime() if data: print(f" ✅ 新浪实时数据获取成功") return data, 'sina_realtime' # 尝试腾讯实时 print(" 尝试腾讯实时接口...") data = fetch_tencent_realtime() if data: print(f" ✅ 腾讯实时数据获取成功") return data, 'tencent_realtime' # 尝试获取T-1日数据 print(" 尝试获取T-1历史数据...") # 网易历史数据 data = fetch_163_hist() if data: print(f" ✅ 网易历史数据获取成功") return data, '163_hist' print(" ❌ 所有数据源均失败") return None, None # ==================== 趋势跟踪策略 ==================== class TrendTrackingStrategy: """趋势跟踪策略 - 实时计算版""" def __init__(self): self.data = None def load_and_merge_data(self, csv_file='cyb50_baostock.csv'): """加载历史数据并合并实时数据""" try: # 加载历史数据 df = pd.read_csv(f'/root/.openclaw/workspace/quant/{csv_file}') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 获取最新历史数据日期 last_hist_date = df.index[-1] today = pd.Timestamp.now().normalize() print(f"历史数据最新日期: {last_hist_date.date()}") print(f"当前日期: {today.date()}") # 如果历史数据不是最新的,尝试获取实时数据 realtime_data = None source = None if last_hist_date < today: realtime_data, source = fetch_realtime_data() if realtime_data: # 处理日期格式 if 'date' in realtime_data and realtime_data['date']: try: # 尝试不同日期格式 date_str = str(realtime_data['date']) if len(date_str) == 8: # YYYYMMDD date = pd.Timestamp(date_str) else: date = today except: date = today else: date = today # 检查是否已有该日期数据 if date > last_hist_date: # 创建新数据行 new_row = pd.DataFrame({ 'open': [realtime_data['open']], 'high': [realtime_data['high']], 'low': [realtime_data['low']], 'close': [realtime_data['close']], 'volume': [realtime_data['volume']] }, index=[date]) # 合并数据 df = pd.concat([df, new_row]) print(f"✅ 已合并{source}数据: {date.date()} 收盘价 {realtime_data['close']}") else: print(f"⚠️ 获取的数据日期({date.date()})不大于历史最新日期") else: print("⚠️ 未获取到更新的实时数据,使用历史数据") else: print("✅ 历史数据已是最新") self.data = df print(f"数据范围: {df.index[0].date()} ~ {df.index[-1].date()}") return True except Exception as e: print(f"数据加载失败: {e}") return False def calculate_indicators(self): """计算技术指标""" df = self.data.copy() # 均线 df['ma10'] = df['close'].rolling(window=10).mean() df['ma30'] = df['close'].rolling(window=30).mean() # 20日高低点 df['high_20'] = df['high'].rolling(window=20).max() df['low_20'] = df['low'].rolling(window=20).min() # 10日涨幅 df['ret_10'] = df['close'].pct_change(periods=10) # ATR tr1 = df['high'] - df['low'] tr2 = abs(df['high'] - df['close'].shift(1)) tr3 = abs(df['low'] - df['close'].shift(1)) df['tr'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = df['tr'].rolling(window=20).mean() self.data = df return df def generate_signals(self): """生成交易信号""" df = self.data # 买入条件 buy_cond = ( (df['close'] > df['ma10']) & (df['ma10'] > df['ma30']) & (df['close'] >= df['high_20'] * 0.995) & (df['ret_10'] > 0.02) ) # 卖出条件 sell_cond = ( (df['close'] < df['ma30']) | (df['close'] <= df['low_20'] * 1.005) ) df['signal'] = 0 df.loc[buy_cond, 'signal'] = 1 df.loc[sell_cond, 'signal'] = -1 return df def backtest(self, initial_capital=1000000): """回测计算""" df = self.generate_signals() position = 0 entry_price = 0 peak_price = 0 capital = initial_capital trades = [] for i in range(30, len(df)): date = df.index[i] price = df['close'].iloc[i] signal = df['signal'].iloc[i] # 移动止损检查 if position > 0: if price > peak_price: peak_price = price if price < peak_price * 0.90: # 10%回撤止损 signal = -1 # 执行交易 if signal == 1 and position == 0: position = 1 entry_price = price peak_price = price trades.append({ 'date': date, 'action': 'BUY', 'price': price, 'capital': capital }) elif signal == -1 and position == 1: pnl = (price / entry_price - 1) * capital capital += pnl position = 0 trades.append({ 'date': date, 'action': 'SELL', 'price': price, 'capital': capital, 'pnl': pnl, 'return_pct': (price / entry_price - 1) * 100 }) # 计算当前持仓 current_position = position current_price = df['close'].iloc[-1] if position == 1: unrealized_pnl = (current_price / entry_price - 1) * capital total_value = capital + unrealized_pnl else: total_value = capital total_return = (total_value / initial_capital - 1) * 100 return { 'trades': trades, 'current_position': current_position, 'current_price': current_price, 'entry_price': entry_price if position == 1 else None, 'capital': capital, 'total_value': total_value, 'total_return': total_return, 'trade_count': len([t for t in trades if t['action'] == 'SELL']), 'data_end_date': df.index[-1].strftime('%Y-%m-%d') } def get_recent_indicators(self, days=20): """获取近N天指标详情""" df = self.data.tail(days).copy() indicators = [] for date, row in df.iterrows(): indicators.append({ 'date': date.strftime('%Y-%m-%d'), 'close': round(row['close'], 2), 'ma10': round(row['ma10'], 2) if not pd.isna(row['ma10']) else '-', 'ma30': round(row['ma30'], 2) if not pd.isna(row['ma30']) else '-', 'high_20': round(row['high_20'], 2) if not pd.isna(row['high_20']) else '-', 'ret_10': f"{row['ret_10']*100:.2f}%" if not pd.isna(row['ret_10']) else '-', 'signal': '买入' if row['signal'] == 1 else ('卖出' if row['signal'] == -1 else '持有'), 'atr': round(row['atr'], 2) if not pd.isna(row['atr']) else '-' }) return indicators def get_recent_trades(self, n=20): """获取近N次交易详情""" result = self.backtest() trades = result['trades'][-n:] return trades def generate_report(): """生成完整报告""" strategy = TrendTrackingStrategy() if not strategy.load_and_merge_data(): return None, None, None strategy.calculate_indicators() result = strategy.backtest() # 获取近期数据 recent_indicators = strategy.get_recent_indicators(20) recent_trades = strategy.get_recent_trades(20) # 生成HTML报告 html = f"""
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
入场价格: {result['entry_price']:.2f} 元
当前浮盈: {unrealized:+.2f}%
| 日期 | 收盘价 | MA10 | MA30 | 20日高 | 10日涨幅 | 信号 | ATR |
|---|---|---|---|---|---|---|---|
| {ind['date']} | {ind['close']} | {ind['ma10']} | {ind['ma30']} | {ind['high_20']} | {ind['ret_10']} | {ind['signal']} | {ind['atr']} |
| 日期 | 操作 | 价格 | 盈亏金额 | 盈亏比例 | 总资产 |
|---|---|---|---|---|---|
| {trade['date'].strftime('%Y-%m-%d') if isinstance(trade['date'], pd.Timestamp) else trade['date']} | {trade['action']} | {trade['price']:.2f} | {f'{pnl:+,.0f}元' if 'pnl' in trade else '-'} | {f'{ret:+.2f}%' if 'return_pct' in trade else '-'} | {trade['capital']:,.0f}元 |