#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 多数据源交叉验证版 确保数据准确性 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') sys.path.insert(0, '/root/.openclaw/workspace/quant') import pandas as pd import numpy as np from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import warnings import requests import json import time import baostock as bs warnings.filterwarnings('ignore') # ==================== 邮件配置 ==================== EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "catfly@openclaw.local", "receiver_email": "380880504@qq.com" } # ==================== 数据源配置 ==================== DATA_SOURCES = { 'baostock': {'priority': 1, 'weight': 0.4}, 'akshare': {'priority': 2, 'weight': 0.3}, 'sina': {'priority': 3, 'weight': 0.15}, 'tencent': {'priority': 4, 'weight': 0.15} } # ==================== 多数据源获取 ==================== class DataValidator: """数据验证器 - 交叉验证多个数据源""" PRICE_TOLERANCE = 0.02 # 价格差异容忍度 2% @staticmethod def validate_prices(sources_data): """ 验证多个数据源的价格一致性 返回: (validated_data, warnings) """ if not sources_data: return None, ["无可用数据源"] if len(sources_data) == 1: return sources_data[0]['data'], [] # 提取收盘价 prices = {} for src in sources_data: if src and 'data' in src and src['data']: prices[src['source']] = src['data'].get('close', 0) if len(prices) < 2: return sources_data[0]['data'], [] # 计算价格统计 price_values = list(prices.values()) price_mean = np.mean(price_values) price_std = np.std(price_values) price_max = max(price_values) price_min = min(price_values) price_range = (price_max - price_min) / price_mean if price_mean > 0 else 0 warnings = [] # 检查价格差异 if price_range > DataValidator.PRICE_TOLERANCE: warnings.append(f"⚠️ 价格差异过大: {price_range*100:.2f}% (容忍度: {DataValidator.PRICE_TOLERANCE*100}%)") for src, price in prices.items(): deviation = (price - price_mean) / price_mean * 100 warnings.append(f" - {src}: {price:.2f} (偏离均值: {deviation:+.2f}%)") # 使用加权平均价格 weighted_price = 0 total_weight = 0 for src in sources_data: if src and 'data' in src and src['data']: source_name = src['source'] weight = DATA_SOURCES.get(source_name, {}).get('weight', 0.25) weighted_price += src['data']['close'] * weight total_weight += weight if total_weight > 0: weighted_price /= total_weight # 选择最接近加权平均的数据源 best_source = None min_diff = float('inf') for src in sources_data: if src and 'data' in src and src['data']: diff = abs(src['data']['close'] - weighted_price) if diff < min_diff: min_diff = diff best_source = src if warnings: warnings.append(f"✅ 使用加权平均价格: {weighted_price:.2f}") if best_source: warnings.append(f"✅ 选择数据源: {best_source['source']} (价格: {best_source['data']['close']:.2f})") return best_source['data'] if best_source else sources_data[0]['data'], warnings def fetch_sina_realtime(): """新浪实时行情接口""" try: url = "https://hq.sinajs.cn/list=sz399673" headers = { 'Referer': 'https://finance.sina.com.cn', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'var hq_str_sz399673="' in data_str: data_str = data_str.split('var hq_str_sz399673="')[1].split('"')[0] parts = data_str.split(',') if len(parts) >= 32: date_str = parts[30].strip() if len(date_str) == 8 and date_str.isdigit(): date = pd.Timestamp(f"{date_str[:4]}-{date_str[4:6]}-{date_str[6:8]}") else: date = pd.Timestamp.now().normalize() return { 'source': 'sina', 'data': { 'open': float(parts[1]), 'high': float(parts[4]), 'low': float(parts[5]), 'close': float(parts[3]), 'volume': int(parts[8]), 'date': date } } except Exception as e: print(f" 新浪实时数据获取失败: {e}") return None def fetch_tencent_realtime(): """腾讯实时行情接口""" try: url = "https://qt.gtimg.cn/q=sz399673" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } response = requests.get(url, headers=headers, timeout=10) response.encoding = 'gb2312' data_str = response.text if 'v_sz399673="' in data_str: data_str = data_str.split('v_sz399673="')[1].split('"')[0] parts = data_str.split('~') if len(parts) >= 45: date_str = parts[30] date = pd.Timestamp(date_str) return { 'source': 'tencent', 'data': { 'close': float(parts[3]), 'open': float(parts[5]), 'high': float(parts[33]), 'low': float(parts[34]), 'volume': int(parts[36]), 'date': date } } except Exception as e: print(f" 腾讯实时数据获取失败: {e}") return None def fetch_akshare_hist(start_date, end_date): """使用akshare获取历史数据""" try: import akshare as ak print(f" [akshare] 获取 {start_date} 到 {end_date} 的历史数据...") df = ak.index_zh_a_hist(symbol="399673", period="daily", start_date=start_date, end_date=end_date) if df is not None and len(df) > 0: data_list = [] for _, row in df.iterrows(): data_list.append({ 'date': pd.Timestamp(row['日期']), 'open': float(row['开盘']), 'high': float(row['最高']), 'low': float(row['最低']), 'close': float(row['收盘']), 'volume': int(row['成交量']) }) print(f" [akshare] ✅ 获取成功: {len(data_list)} 条") return data_list except Exception as e: print(f" [akshare] ❌ 获取失败: {e}") return None def fetch_baostock_hist(start_date, end_date): """使用baostock获取历史数据""" try: print(f" [baostock] 获取 {start_date} 到 {end_date} 的历史数据...") lg = bs.login() if lg.error_code != '0': print(f" [baostock] ❌ 登录失败: {lg.error_msg}") return None rs = bs.query_history_k_data_plus("sz.399673", "date,open,high,low,close,volume", start_date=start_date, end_date=end_date, frequency="d", adjustflag="3") data_list = [] while (rs.error_code == '0') & rs.next(): row = rs.get_row_data() if row[0]: data_list.append({ 'date': pd.Timestamp(row[0]), 'open': float(row[1]) if row[1] else 0, 'high': float(row[2]) if row[2] else 0, 'low': float(row[3]) if row[3] else 0, 'close': float(row[4]) if row[4] else 0, 'volume': int(float(row[5])) if row[5] else 0 }) bs.logout() if data_list: print(f" [baostock] ✅ 获取成功: {len(data_list)} 条") return data_list except Exception as e: print(f" [baostock] ❌ 获取失败: {e}") return None def fetch_multi_source_realtime(): """从多个源获取实时数据并交叉验证""" print("\n📊 多数据源获取实时数据...") sources = [] # 获取新浪数据 sina_data = fetch_sina_realtime() if sina_data: sources.append(sina_data) print(f" [新浪] 收盘价: {sina_data['data']['close']:.2f}") # 获取腾讯数据 tencent_data = fetch_tencent_realtime() if tencent_data: sources.append(tencent_data) print(f" [腾讯] 收盘价: {tencent_data['data']['close']:.2f}") # 交叉验证 validated_data, warnings = DataValidator.validate_prices(sources) if warnings: print("\n ⚠️ 数据验证警告:") for w in warnings: print(f" {w}") if validated_data: print(f"\n ✅ 最终使用收盘价: {validated_data['close']:.2f}") return validated_data, sources return None, [] def fetch_missing_data_multi_source(last_date, today): """使用多个源获取缺失的历史数据""" if last_date >= today: return [], [] start_str = (last_date + timedelta(days=1)).strftime('%Y-%m-%d') end_str = (today - timedelta(days=1)).strftime('%Y-%m-%d') print(f"\n📊 补全缺失数据: {start_str} 到 {end_str}") all_warnings = [] # 尝试baostock (优先级高) data = fetch_baostock_hist(start_str, end_str) if data: return data, all_warnings # 尝试akshare data = fetch_akshare_hist(start_str.replace('-', ''), end_str.replace('-', '')) if data: return data, all_warnings all_warnings.append(f"⚠️ 无法获取 {start_str} 到 {end_str} 的历史数据") return [], all_warnings # ==================== 趋势跟踪策略 ==================== class TrendTrackingStrategy: """趋势跟踪策略 - 多数据源验证版""" def __init__(self): self.data = None self.warnings = [] self.data_sources = [] def load_and_merge_data(self, csv_file='cyb50_baostock.csv'): """加载历史数据并合并实时数据""" try: # 加载历史数据 df = pd.read_csv(f'/root/.openclaw/workspace/quant/{csv_file}') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') last_hist_date = df.index[-1] today = pd.Timestamp.now().normalize() print(f"历史数据最新日期: {last_hist_date.date()}") print(f"当前日期: {today.date()}") # 获取缺失的历史数据 missing_data, warnings = fetch_missing_data_multi_source(last_hist_date, today) self.warnings.extend(warnings) if missing_data: for item in missing_data: new_row = pd.DataFrame({ 'open': [item['open']], 'high': [item['high']], 'low': [item['low']], 'close': [item['close']], 'volume': [item['volume']] }, index=[item['date']]) df = pd.concat([df, new_row]) df = df.sort_index() last_hist_date = df.index[-1] print(f"✅ 已合并历史数据,最新日期: {last_hist_date.date()}") # 获取今天的实时数据(多源验证) if last_hist_date < today: realtime_data, sources = fetch_multi_source_realtime() self.data_sources = sources if realtime_data: date = realtime_data['date'] if date > last_hist_date: new_row = pd.DataFrame({ 'open': [realtime_data['open']], 'high': [realtime_data['high']], 'low': [realtime_data['low']], 'close': [realtime_data['close']], 'volume': [realtime_data['volume']] }, index=[date]) df = pd.concat([df, new_row]) print(f"✅ 已合并实时数据: {date.date()} 收盘价 {realtime_data['close']:.2f}") else: print(f"⚠️ 实时数据日期({date.date()})不大于历史最新日期") else: self.warnings.append("⚠️ 未获取到实时数据") else: print("✅ 历史数据已是最新") self.data = df.sort_index() print(f"\n数据范围: {self.data.index[0].date()} ~ {self.data.index[-1].date()}") print(f"数据条数: {len(self.data)}") return True except Exception as e: print(f"❌ 数据加载失败: {e}") import traceback traceback.print_exc() return False def calculate_indicators(self): """计算技术指标""" df = self.data.copy() # 均线 df['ma10'] = df['close'].rolling(window=10, min_periods=1).mean() df['ma30'] = df['close'].rolling(window=30, min_periods=1).mean() # 20日高低点 df['high_20'] = df['high'].rolling(window=20).max() df['low_20'] = df['low'].rolling(window=20).min() # 10日涨幅 df['ret_10'] = df['close'].pct_change(periods=10) # ATR tr1 = df['high'] - df['low'] tr2 = abs(df['high'] - df['close'].shift(1)) tr3 = abs(df['low'] - df['close'].shift(1)) df['tr'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = df['tr'].rolling(window=20).mean() self.data = df return df def generate_signals(self): """生成交易信号""" df = self.data # 买入条件 buy_cond = ( (df['close'] > df['ma10']) & (df['ma10'] > df['ma30']) & (df['close'] >= df['high_20'] * 0.995) & (df['ret_10'] > 0.02) ) # 卖出条件 sell_cond = ( (df['close'] < df['ma30']) | (df['close'] <= df['low_20'] * 1.005) ) df['signal'] = 0 df.loc[buy_cond, 'signal'] = 1 df.loc[sell_cond, 'signal'] = -1 return df def backtest(self, initial_capital=1000000): """回测计算""" df = self.generate_signals() position = 0 entry_price = 0 peak_price = 0 capital = initial_capital trades = [] for i in range(30, len(df)): date = df.index[i] price = df['close'].iloc[i] signal = df['signal'].iloc[i] # 移动止损检查 if position > 0: if price > peak_price: peak_price = price if price < peak_price * 0.90: signal = -1 # 买入 if signal == 1 and position == 0: position = 1 entry_price = price peak_price = price trades.append({ 'date': date, 'action': 'BUY', 'price': price, 'capital': capital }) # 卖出 elif signal == -1 and position == 1: pnl = (price / entry_price - 1) * capital capital += pnl position = 0 trades.append({ 'date': date, 'action': 'SELL', 'price': price, 'capital': capital, 'pnl': pnl, 'return_pct': (price / entry_price - 1) * 100 }) current_position = position current_price = df['close'].iloc[-1] if position == 1: unrealized_pnl = (current_price / entry_price - 1) * capital total_value = capital + unrealized_pnl else: total_value = capital total_return = (total_value / initial_capital - 1) * 100 return { 'trades': trades, 'current_position': current_position, 'current_price': current_price, 'entry_price': entry_price if position == 1 else None, 'capital': capital, 'total_value': total_value, 'total_return': total_return, 'trade_count': len([t for t in trades if t['action'] == 'SELL']), 'data_end_date': df.index[-1].strftime('%Y-%m-%d') } def get_recent_indicators(self, days=20): """获取近N天指标详情""" df = self.data.tail(days).copy() indicators = [] for date, row in df.iterrows(): indicators.append({ 'date': date.strftime('%Y-%m-%d'), 'close': round(row['close'], 2), 'ma10': round(row['ma10'], 2) if not pd.isna(row['ma10']) else '-', 'ma30': round(row['ma30'], 2) if not pd.isna(row['ma30']) else '-', 'high_20': round(row['high_20'], 2) if not pd.isna(row['high_20']) else '-', 'ret_10': f"{row['ret_10']*100:.2f}%" if not pd.isna(row['ret_10']) else '-', 'signal': '买入' if row['signal'] == 1 else ('卖出' if row['signal'] == -1 else '持有'), 'atr': round(row['atr'], 2) if not pd.isna(row['atr']) else '-' }) return indicators def get_recent_trades(self, n=20): """获取近N次交易详情""" result = self.backtest() trades = result['trades'][-n:] return trades def generate_report(): """生成完整报告""" strategy = TrendTrackingStrategy() if not strategy.load_and_merge_data(): return None, None, None strategy.calculate_indicators() result = strategy.backtest() recent_indicators = strategy.get_recent_indicators(20) recent_trades = strategy.get_recent_trades(20) # 数据源信息 source_info = "" if strategy.data_sources: source_info = "
数据来源: " for src in strategy.data_sources: source_info += f"{src['source']}({src['data']['close']:.2f}) " source_info += "| 已交叉验证
" # 警告信息 warnings_html = "" if strategy.warnings: warnings_html = "
⚠️ 警告:
" + "
".join(strategy.warnings) + "
" html = f"""

🚀 创业板50趋势跟踪策略报告 (多源验证版)

生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

{warnings_html} {source_info}
数据更新: 最新数据日期: {result['data_end_date']} | 数据范围: {strategy.data.index[0].date()} ~ {strategy.data.index[-1].date()}

📊 总体绩效

当前持仓
{'持有中' if result['current_position'] == 1 else '空仓'}
当前价格
{result['current_price']:.2f}
累计收益率
{result['total_return']:+.2f}%
总资产
{result['total_value']:,.0f}元
交易次数
{result['trade_count']}
""" if result['current_position'] == 1: unrealized = (result['current_price'] / result['entry_price'] - 1) * 100 html += f"""

📈 持仓详情

入场价格: {result['entry_price']:.2f} 元

当前浮盈: {unrealized:+.2f}%

""" # 近20天指标 html += """

📅 近20天指标详情

""" for ind in recent_indicators: signal_class = 'buy' if ind['signal'] == '买入' else ('sell' if ind['signal'] == '卖出' else '') html += f""" """ html += """
日期 收盘价 MA10 MA30 20日高 10日涨幅 信号 ATR
{ind['date']} {ind['close']} {ind['ma10']} {ind['ma30']} {ind['high_20']} {ind['ret_10']} {ind['signal']} {ind['atr']}

💼 近20次交易详情

""" for trade in recent_trades: action_class = 'buy' if trade['action'] == 'BUY' else 'sell' pnl = trade.get('pnl', 0) ret = trade.get('return_pct', 0) has_pnl = 'pnl' in trade date_str = trade['date'].strftime('%Y-%m-%d') if hasattr(trade['date'], 'strftime') else str(trade['date']) ret_class = '' if not has_pnl else ('positive' if ret >= 0 else 'negative') ret_text = '-' if not has_pnl else f'{ret:+.2f}%' html += f""" """ html += """
日期 操作 价格 盈亏金额 盈亏比例 总资产
{date_str} {trade['action']} {trade['price']:.2f} {f'{pnl:+,.0f}元' if has_pnl else '-'} {ret_text} {trade['capital']:,.0f}元
策略说明:
• 买入条件: 价格>MA10>MA30 且 突破20日高×0.995 且 10日涨幅>2%
• 卖出条件: 跌破MA30 或 创20日新低 或 回撤10%止损
• 数据验证: 多源交叉验证,差异>2%时报警
""" # 文本报告 text = f""" 创业板50趋势跟踪策略报告 (多源验证版) 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 数据最新日期: {result['data_end_date']} """ if strategy.warnings: text += "\n警告:\n" + "\n".join(strategy.warnings) + "\n" text += f""" 【总体绩效】 当前持仓: {'持有中' if result['current_position'] == 1 else '空仓'} 当前价格: {result['current_price']:.2f}元 累计收益率: {result['total_return']:+.2f}% 总资产: {result['total_value']:,.0f}元 【近20天指标】 日期 收盘价 MA10 MA30 20日高 10日涨幅 信号 """ for ind in recent_indicators: text += f"{ind['date']} {ind['close']:>8} {ind['ma10']:>8} {ind['ma30']:>8} {ind['high_20']:>8} {ind['ret_10']:>8} {ind['signal']:>4}\n" text += "\n【近20次交易】\n" for trade in recent_trades: pnl_str = f"{trade.get('pnl', 0):+,.0f}元" if 'pnl' in trade else '-' ret_str = f"{trade.get('return_pct', 0):+.2f}%" if 'return_pct' in trade else '-' date_str = trade['date'].strftime('%Y-%m-%d') if hasattr(trade['date'], 'strftime') else str(trade['date']) text += f"{date_str} {trade['action']:>6} {trade['price']:>8.2f} {pnl_str:>12} {ret_str:>10}\n" return html, text, result def send_email(subject, html_content, text_content): """发送邮件""" try: msg = MIMEMultipart('alternative') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = EMAIL_CONFIG['sender_email'] msg['To'] = EMAIL_CONFIG['receiver_email'] text_part = MIMEText(text_content, 'plain', 'utf-8') msg.attach(text_part) html_part = MIMEText(html_content, 'html', 'utf-8') msg.attach(html_part) with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server: server.sendmail( EMAIL_CONFIG['sender_email'], EMAIL_CONFIG['receiver_email'], msg.as_string() ) print(f"✅ 邮件发送成功: {subject}") return True except Exception as e: print(f"❌ 邮件发送失败: {e}") return False def main(): """主程序""" print("="*60) print("🚀 创业板50趋势跟踪实时报告 (多数据源交叉验证版)") print("="*60) print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") html, text, result = generate_report() if html is None: print("❌ 报告生成失败") return print(f"\n✅ 报告生成完成") print(f" 数据最新日期: {result['data_end_date']}") print(f" 当前持仓: {'持有中' if result['current_position'] == 1 else '空仓'}") print(f" 累计收益: {result['total_return']:+.2f}%") print("\n📧 发送邮件...") position_status = "持仓" if result['current_position'] == 1 else "空仓" subject = f"🚀 创业板50趋势报告 {datetime.now().strftime('%m-%d %H:%M')} | {position_status} | 收益{result['total_return']:+.2f}%" send_email(subject, html, text) print("\n✅ 全部完成!") print("="*60) if __name__ == "__main__": main()