#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 实时交易报告系统 (含实时数据获取) 基于趋势跟踪策略,生成当前时点报告 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/cat-fly') sys.path.insert(0, '/root/.openclaw/workspace/quant') import pandas as pd import numpy as np from datetime import datetime, timedelta import smtplib import ssl from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import warnings warnings.filterwarnings('ignore') # ==================== 邮件配置 ==================== EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "catfly@erwin.wang", "receiver_email": "380880504@qq.com" } # ==================== 实时数据获取 ==================== def fetch_realtime_data(): """获取实时数据,与历史数据合并""" try: # 使用东方财富或新浪财经接口获取最新数据 import requests import json # 尝试东方财富接口 url = "http://push2.eastmoney.com/api/qt/stock/get" params = { "secid": "0.399673", # 创业板50 "fields": "f43,f44,f45,f46,f47,f48,f50,f51,f52,f57,f58,f60,f107,f108,f109,f110,f111,f112,f113,f114,f115,f116,f117,f118,f119,f120,f121,f122,f123,f124,f125,f126,f127,f128,f129,f130,f131,f132,f133,f134,f135,f136,f137,f138,f139,f140,f141,f142,f143,f144,f145,f146,f147,f148,f149,f150,f151,f152,f153,f154,f155,f156,f157,f158,f159,f160,f161,f162,f163,f164,f165,f166,f167,f168,f169,f170,f171,f172,f173,f174,f175,f176,f177,f178,f179,f180,f181,f182,f183,f184,f185,f186,f187,f188,f189,f190,f191,f192,f193,f194,f195,f196,f197,f198,f199,f200" } response = requests.get(url, params=params, timeout=10) data = response.json() if 'data' in data and data['data']: d = data['data'] # 解析实时数据 current_price = float(d.get('f43', 0)) / 100 # 当前价 open_price = float(d.get('f46', 0)) / 100 # 开盘价 high_price = float(d.get('f44', 0)) / 100 # 最高价 low_price = float(d.get('f45', 0)) / 100 # 最低价 volume = int(d.get('f47', 0)) # 成交量 today = datetime.now().strftime('%Y-%m-%d') return { 'date': today, 'open': open_price, 'high': high_price, 'low': low_price, 'close': current_price, 'volume': volume } except Exception as e: print(f"实时数据获取失败: {e}") return None # ==================== 趋势跟踪策略 ==================== class TrendTrackingStrategy: """趋势跟踪策略 - 实时计算版""" def __init__(self): self.data = None def load_and_merge_data(self, csv_file='cyb50_baostock.csv'): """加载历史数据并合并实时数据""" try: # 加载历史数据 df = pd.read_csv(f'/root/.openclaw/workspace/quant/{csv_file}') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') # 获取最新历史数据日期 last_hist_date = df.index[-1] today = pd.Timestamp.now().normalize() print(f"历史数据最新日期: {last_hist_date.date()}") print(f"当前日期: {today.date()}") # 如果历史数据不是最新的,尝试获取实时数据 if last_hist_date < today: print("历史数据不是最新,尝试获取实时数据...") realtime = fetch_realtime_data() if realtime and pd.Timestamp(realtime['date']) > last_hist_date: # 创建新数据行 new_row = pd.DataFrame([realtime]).set_index('date') new_row.index = pd.to_datetime(new_row.index) # 合并数据 df = pd.concat([df, new_row]) print(f"✅ 已合并实时数据: {realtime['date']}") else: print("⚠️ 未获取到更新的实时数据,使用历史数据") else: print("✅ 历史数据已是最新") self.data = df print(f"数据范围: {df.index[0].date()} ~ {df.index[-1].date()}") return True except Exception as e: print(f"数据加载失败: {e}") return False def calculate_indicators(self): """计算技术指标""" df = self.data.copy() # 均线 df['ma10'] = df['close'].rolling(window=10).mean() df['ma30'] = df['close'].rolling(window=30).mean() # 20日高低点 df['high_20'] = df['high'].rolling(window=20).max() df['low_20'] = df['low'].rolling(window=20).min() # 10日涨幅 df['ret_10'] = df['close'].pct_change(periods=10) # ATR tr1 = df['high'] - df['low'] tr2 = abs(df['high'] - df['close'].shift(1)) tr3 = abs(df['low'] - df['close'].shift(1)) df['tr'] = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) df['atr'] = df['tr'].rolling(window=20).mean() self.data = df return df def generate_signals(self): """生成交易信号""" df = self.data # 买入条件 buy_cond = ( (df['close'] > df['ma10']) & (df['ma10'] > df['ma30']) & (df['close'] >= df['high_20'] * 0.995) & (df['ret_10'] > 0.02) ) # 卖出条件 sell_cond = ( (df['close'] < df['ma30']) | (df['close'] <= df['low_20'] * 1.005) ) df['signal'] = 0 df.loc[buy_cond, 'signal'] = 1 df.loc[sell_cond, 'signal'] = -1 return df def backtest(self, initial_capital=1000000): """回测计算""" df = self.generate_signals() position = 0 entry_price = 0 peak_price = 0 capital = initial_capital trades = [] for i in range(30, len(df)): date = df.index[i] price = df['close'].iloc[i] signal = df['signal'].iloc[i] # 移动止损检查 if position > 0: if price > peak_price: peak_price = price if price < peak_price * 0.90: # 10%回撤止损 signal = -1 # 执行交易 if signal == 1 and position == 0: position = 1 entry_price = price peak_price = price trades.append({ 'date': date, 'action': 'BUY', 'price': price, 'capital': capital }) elif signal == -1 and position == 1: pnl = (price / entry_price - 1) * capital capital += pnl position = 0 trades.append({ 'date': date, 'action': 'SELL', 'price': price, 'capital': capital, 'pnl': pnl, 'return_pct': (price / entry_price - 1) * 100 }) # 计算当前持仓 current_position = position current_price = df['close'].iloc[-1] if position == 1: unrealized_pnl = (current_price / entry_price - 1) * capital total_value = capital + unrealized_pnl else: total_value = capital total_return = (total_value / initial_capital - 1) * 100 return { 'trades': trades, 'current_position': current_position, 'current_price': current_price, 'entry_price': entry_price if position == 1 else None, 'capital': capital, 'total_value': total_value, 'total_return': total_return, 'trade_count': len([t for t in trades if t['action'] == 'SELL']), 'data_end_date': df.index[-1].strftime('%Y-%m-%d') } def get_recent_indicators(self, days=20): """获取近N天指标详情""" df = self.data.tail(days).copy() indicators = [] for date, row in df.iterrows(): indicators.append({ 'date': date.strftime('%Y-%m-%d'), 'close': round(row['close'], 2), 'ma10': round(row['ma10'], 2) if not pd.isna(row['ma10']) else '-', 'ma30': round(row['ma30'], 2) if not pd.isna(row['ma30']) else '-', 'high_20': round(row['high_20'], 2) if not pd.isna(row['high_20']) else '-', 'ret_10': f"{row['ret_10']*100:.2f}%" if not pd.isna(row['ret_10']) else '-', 'signal': '买入' if row['signal'] == 1 else ('卖出' if row['signal'] == -1 else '持有'), 'atr': round(row['atr'], 2) if not pd.isna(row['atr']) else '-' }) return indicators def get_recent_trades(self, n=20): """获取近N次交易详情""" result = self.backtest() trades = result['trades'][-n:] return trades def generate_report(): """生成完整报告""" strategy = TrendTrackingStrategy() if not strategy.load_and_merge_data(): return None, None, None strategy.calculate_indicators() result = strategy.backtest() # 获取近期数据 recent_indicators = strategy.get_recent_indicators(20) recent_trades = strategy.get_recent_trades(20) # 生成HTML报告 html = f"""
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
入场价格: {result['entry_price']:.2f} 元
当前浮盈: {unrealized:+.2f}%
| 日期 | 收盘价 | MA10 | MA30 | 20日高 | 10日涨幅 | 信号 | ATR |
|---|---|---|---|---|---|---|---|
| {ind['date']} | {ind['close']} | {ind['ma10']} | {ind['ma30']} | {ind['high_20']} | {ind['ret_10']} | {ind['signal']} | {ind['atr']} |
| 日期 | 操作 | 价格 | 盈亏金额 | 盈亏比例 | 总资产 |
|---|---|---|---|---|---|
| {trade['date'].strftime('%Y-%m-%d') if isinstance(trade['date'], pd.Timestamp) else trade['date']} | {trade['action']} | {trade['price']:.2f} | {f'{pnl:+,.0f}元' if 'pnl' in trade else '-'} | {f'{ret:+.2f}%' if 'return_pct' in trade else '-'} | {trade['capital']:,.0f}元 |