#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 多数据源交叉验证版 确保数据准确性 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') sys.path.insert(0, '/root/.openclaw/workspace/quant') import pandas as pd import numpy as np from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import warnings import requests import json import time import baostock as bs warnings.filterwarnings('ignore') # ==================== 邮件配置 ==================== EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "catfly@openclaw.local", "receiver_email": "380880504@qq.com" } # ==================== 数据源配置 ==================== DATA_SOURCES = { 'baostock': {'priority': 1, 'weight': 0.4}, 'akshare': {'priority': 2, 'weight': 0.3}, 'sina': {'priority': 3, 'weight': 0.15}, 'tencent': {'priority': 4, 'weight': 0.15} } # ==================== 多数据源获取 ==================== class DataValidator: """数据验证器 - 交叉验证多个数据源""" PRICE_TOLERANCE = 0.02 # 价格差异容忍度 2% @staticmethod def validate_prices(sources_data): """ 验证多个数据源的价格一致性 返回: (validated_data, warnings) """ if not sources_data: return None, ["无可用数据源"] if len(sources_data) == 1: return sources_data[0]['data'], [] # 提取收盘价 prices = {} for src in sources_data: if src and 'data' in src and src['data']: prices[src['source']] = src['data'].get('close', 0) if len(prices) < 2: return sources_data[0]['data'], [] # 计算价格统计 price_values = list(prices.values()) price_mean = np.mean(price_values) price_std = np.std(price_values) price_max = max(price_values) price_min = min(price_values) price_range = (price_max - price_min) / price_mean if price_mean > 0 else 0 warnings = [] # 检查价格差异 if price_range > DataValidator.PRICE_TOLERANCE: warnings.append(f"⚠️ 价格差异过大: {price_range*100:.2f}% (容忍度: {DataValidator.PRICE_TOLERANCE*100}%)") for src, price in prices.items(): deviation = (price - price_mean) / price_mean * 100 warnings.append(f" - {src}: {price:.2f} (偏离均值: {deviation:+.2f}%)") # 使用加权平均价格 weighted_price = 0 total_weight = 0 for src in sources_data: if src and 'data' in src and src['data']: source_name = src['source'] weight = DATA_SOURCES.get(source_name, {}).get('weight', 0.25) weighted_price += src['data']['close'] * weight total_weight += weight if total_weight > 0: weighted_price /= total_weight # 选择最接近加权平均的数据源 best_source = None min_diff = float('inf') for src in sources_data: if src and 'data' in src and src['data']: diff = abs(src['data']['close'] - weighted_price) if diff < min_diff: min_diff = diff best_source = src if warnings: warnings.append(f"✅ 使用加权平均价格: {weighted_price:.2f}") if best_source: warnings.append(f"✅ 选择数据源: {best_source['source']} (价格: {best_source['data']['close']:.2f})") return best_source['data'] if best_source else sources_data[0]['data'], warnings def fetch_sina_realtime(): """新浪实时行情接口""" try: url = "https://hq.sinajs.cn/list=sz399673" headers = { 'Referer': 'https://finance.sina.com.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'var hq_str_sz399673="' in data_str: data_str = data_str.split('var hq_str_sz399673="')[1].split('"')[0] parts = data_str.split(',') if len(parts) >= 32: date_str = parts[30].strip() if len(date_str) == 8 and date_str.isdigit(): date = pd.Timestamp(f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}") else: date = pd.Timestamp.now().normalize() return { 'source': 'sina', 'data': { 'open': float(parts[1]), 'high': float(parts[4]), 'low': float(parts[5]), 'close': float(parts[3]), 'volume': int(parts[8]), 'date': date } } except Exception as e: print(f" 新浪实时数据获取失败: {e}") return None def fetch_tencent_realtime(): """腾讯实时行情接口""" try: url = "https://qt.gtimg.cn/q=sz399673" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'v_sz399673="' in data_str: data_str = data_str.split('v_sz399673="')[1].split('"')[0] parts = data_str.split('~') if len(parts) >= 45: date_str = parts[30] date = pd.Timestamp(date_str) return { 'source': 'tencent', 'data': { 'close': float(parts[3]), 'open': float(parts[5]), 'high': float(parts[33]), 'low': float(parts[34]), 'volume': int(parts[36]), 'date': date } } except Exception as e: print(f" 腾讯实时数据获取失败: {e}") return None def fetch_akshare_hist(start_date, end_date): """使用akshare获取历史数据""" try: import akshare as ak print(f" [akshare] 获取 {start_date} 到 {end_date} 的历史数据...") df = ak.index_zh_a_hist(symbol="399673", period="daily", start_date=start_date, end_date=end_date) if df is not None and len(df) > 0: data_list = [] for _, row in df.iterrows(): data_list.append({ 'date': pd.Timestamp(row['日期']), 'open': float(row['开盘']), 'high': float(row['最高']), 'low': float(row['最低']), 'close': float(row['收盘']), 'volume': int(row['成交量']) }) print(f" [akshare] ✅ 获取成功: {len(data_list)} 条") return data_list except Exception as e: print(f" [akshare] ❌ 获取失败: {e}") return None def fetch_baostock_hist(start_date, end_date): """使用baostock获取历史数据""" try: print(f" [baostock] 获取 {start_date} 到 {end_date} 的历史数据...") lg = bs.login() if lg.error_code != '0': print(f" [baostock] ❌ 登录失败: {lg.error_msg}") return None rs = bs.query_history_k_data_plus("sz.399673", "date,open,high,low,close,volume", start_date=start_date, end_date=end_date, frequency="d", adjustflag="3") data_list = [] while (rs.error_code == '0') & rs.next(): row = rs.get_row_data() if row[0]: data_list.append({ 'date': pd.Timestamp(row[0]), 'open': float(row[1]) if row[1] else 0, 'high': float(row[2]) if row[2] else 0, 'low': float(row[3]) if row[3] else 0, 'close': float(row[4]) if row[4] else 0, 'volume': int(float(row[5])) if row[5] else 0 }) bs.logout() if data_list: print(f" [baostock] ✅ 获取成功: {len(data_list)} 条") return data_list except Exception as e: print(f" [baostock] ❌ 获取失败: {e}") return None def fetch_multi_source_realtime(): """从多个源获取实时数据并交叉验证""" print("\n📊 多数据源获取实时数据...") sources = [] # 获取新浪数据 sina_data = fetch_sina_realtime() if sina_data: sources.append(sina_data) print(f" [新浪] 收盘价: {sina_data['data']['close']:.2f}") # 获取腾讯数据 tencent_data = fetch_tencent_realtime() if tencent_data: sources.append(tencent_data) print(f" [腾讯] 收盘价: {tencent_data['data']['close']:.2f}") # 交叉验证 validated_data, warnings = DataValidator.validate_prices(sources) if warnings: print("\n ⚠️ 数据验证警告:") for w in warnings: print(f" {w}") if validated_data: print(f"\n ✅ 最终使用收盘价: {validated_data['close']:.2f}") return validated_data, sources return None, [] def fetch_missing_data_multi_source(last_date, today): """使用多个源获取缺失的历史数据""" if last_date >= today: return [], [] start_str = (last_date + timedelta(days=1)).strftime('%Y-%m-%d') end_str = (today - timedelta(days=1)).strftime('%Y-%m-%d') print(f"\n📊 补全缺失数据: {start_str} 到 {end_str}") all_warnings = [] # 尝试baostock (优先级高) data = fetch_baostock_hist(start_str, end_str) if data: return data, all_warnings # 尝试akshare data = fetch_akshare_hist(start_str.replace('-', ''), end_str.replace('-', '')) if data: return data, all_warnings all_warnings.append(f"⚠️ 无法获取 {start_str} 到 {end_str} 的历史数据") return [], all_warnings # ==================== 趋势跟踪策略 ==================== class TrendTrackingStrategy: """趋势跟踪策略 - 多数据源验证版""" def __init__(self): self.data = None self.warnings = [] self.data_sources = [] def load_and_merge_data(self, csv_file='cyb50_baostock.csv'): """加载历史数据并合并实时数据""" try: # 加载历史数据 df = pd.read_csv(f'/root/.openclaw/workspace/quant/{csv_file}') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') last_hist_date = df.index[-1] today = pd.Timestamp.now().normalize() print(f"历史数据最新日期: {last_hist_date.date()}") print(f"当前日期: {today.date()}") # 获取缺失的历史数据 missing_data, warnings = fetch_missing_data_multi_source(last_hist_date, today) self.warnings.extend(warnings) if missing_data: for item in missing_data: new_row = pd.DataFrame({ 'open': [item['open']], 'high': [item['high']], 'low': [item['low']], 'close': [item['close']], 'volume': [item['volume']] }, index=[item['date']]) df = pd.concat([df, new_row]) df = df.sort_index() last_hist_date = df.index[-1] print(f"✅ 已合并历史数据,最新日期: {last_hist_date.date()}") # 获取今天的实时数据(多源验证) if last_hist_date < today: realtime_data, sources = fetch_multi_source_realtime() self.data_sources = sources if realtime_data: date = realtime_data['date'] if date > last_hist_date: new_row = pd.DataFrame({ 'open': [realtime_data['open']], 'high': [realtime_data['high']], 'low': [realtime_data['low']], 'close': [realtime_data['close']], 'volume': [realtime_data['volume']] }, index=[date]) df = pd.concat([df, new_row]) print(f"✅ 已合并实时数据: {date.date()} 收盘价 {realtime_data['close']:.2f}") else: print(f"⚠️ 实时数据日期({date.date()})不大于历史最新日期") else: self.warnings.append("⚠️ 未获取到实时数据") else: print("✅ 历史数据已是最新") self.data = df.sort_index() print(f"\n数据范围: {self.data.index[0].date()} ~ {self.data.index[-1].date()}") print(f"数据条数: {len(self.data)}") return True except Exception as e: print(f"❌ 数据加载失败: {e}") import traceback traceback.print_exc() return False def calculate_indicators(self): """计算技术指标""" df = self.data.copy() # 均线 df['ma10'] = df['close'].rolling(window=10, min_periods=1).mean() df['ma30'] = df['close'].rolling(window=30, min_periods=1).mean() # 20日高低点 df['high_20'] = df['high'].rolling(window=20).max() df['low_20'] = df['low'].rolling(window=20).min() # 10日涨幅 df['ret_10'] = df['close'].pct_change(periods=10) # ATR tr1 = df['high'] - df['low'] tr2 = abs(df['high'] - df['close'].shift(1)) tr3 = abs(df['low'] - df['close'].shift(1)) df['tr'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = df['tr'].rolling(window=20).mean() self.data = df return df def generate_signals(self): """生成交易信号""" df = self.data # 买入条件 buy_cond = ( (df['close'] > df['ma10']) & (df['ma10'] > df['ma30']) & (df['close'] >= df['high_20'] * 0.995) & (df['ret_10'] > 0.02) ) # 卖出条件 sell_cond = ( (df['close'] < df['ma30']) | (df['close'] <= df['low_20'] * 1.005) ) df['signal'] = 0 df.loc[buy_cond, 'signal'] = 1 df.loc[sell_cond, 'signal'] = -1 return df def backtest(self, initial_capital=1000000): """回测计算""" df = self.generate_signals() position = 0 entry_price = 0 peak_price = 0 capital = initial_capital trades = [] for i in range(30, len(df)): date = df.index[i] price = df['close'].iloc[i] signal = df['signal'].iloc[i] # 移动止损检查 if position > 0: if price > peak_price: peak_price = price if price < peak_price * 0.90: signal = -1 # 买入 if signal == 1 and position == 0: position = 1 entry_price = price peak_price = price trades.append({ 'date': date, 'action': 'BUY', 'price': price, 'capital': capital }) # 卖出 elif signal == -1 and position == 1: pnl = (price / entry_price - 1) * capital capital += pnl position = 0 trades.append({ 'date': date, 'action': 'SELL', 'price': price, 'capital': capital, 'pnl': pnl, 'return_pct': (price / entry_price - 1) * 100 }) current_position = position current_price = df['close'].iloc[-1] if position == 1: unrealized_pnl = (current_price / entry_price - 1) * capital total_value = capital + unrealized_pnl else: total_value = capital total_return = (total_value / initial_capital - 1) * 100 return { 'trades': trades, 'current_position': current_position, 'current_price': current_price, 'entry_price': entry_price if position == 1 else None, 'capital': capital, 'total_value': total_value, 'total_return': total_return, 'trade_count': len([t for t in trades if t['action'] == 'SELL']), 'data_end_date': df.index[-1].strftime('%Y-%m-%d') } def get_recent_indicators(self, days=20): """获取近N天指标详情""" df = self.data.tail(days).copy() indicators = [] for date, row in df.iterrows(): indicators.append({ 'date': date.strftime('%Y-%m-%d'), 'close': round(row['close'], 2), 'ma10': round(row['ma10'], 2) if not pd.isna(row['ma10']) else '-', 'ma30': round(row['ma30'], 2) if not pd.isna(row['ma30']) else '-', 'high_20': round(row['high_20'], 2) if not pd.isna(row['high_20']) else '-', 'ret_10': f"{row['ret_10']*100:.2f}%" if not pd.isna(row['ret_10']) else '-', 'signal': '买入' if row['signal'] == 1 else ('卖出' if row['signal'] == -1 else '持有'), 'atr': round(row['atr'], 2) if not pd.isna(row['atr']) else '-' }) return indicators def get_recent_trades(self, n=20): """获取近N次交易详情""" result = self.backtest() trades = result['trades'][-n:] return trades def generate_report(): """生成完整报告""" strategy = TrendTrackingStrategy() if not strategy.load_and_merge_data(): return None, None, None strategy.calculate_indicators() result = strategy.backtest() recent_indicators = strategy.get_recent_indicators(20) recent_trades = strategy.get_recent_trades(20) # 数据源信息 source_info = "" if strategy.data_sources: source_info = "
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{warnings_html} {source_info}入场价格: {result['entry_price']:.2f} 元
当前浮盈: {unrealized:+.2f}%
| 日期 | 收盘价 | MA10 | MA30 | 20日高 | 10日涨幅 | 信号 | ATR |
|---|---|---|---|---|---|---|---|
| {ind['date']} | {ind['close']} | {ind['ma10']} | {ind['ma30']} | {ind['high_20']} | {ind['ret_10']} | {ind['signal']} | {ind['atr']} |
| 日期 | 操作 | 价格 | 盈亏金额 | 盈亏比例 | 总资产 |
|---|---|---|---|---|---|
| {date_str} | {trade['action']} | {trade['price']:.2f} | {f'{pnl:+,.0f}元' if has_pnl else '-'} | {ret_text} | {trade['capital']:,.0f}元 |