#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 趋势质量评估器 - 交易日15:06定时推送 检查是否为交易日,如果是则发送报告 """ import sys sys.path.insert(0, '/root/.openclaw/workspace/trend-quality-evaluator') import numpy as np import pandas as pd from trend_quality_evaluator import fetch_stock_data, TrendQualityEvaluator from datetime import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.header import Header from email import encoders import warnings warnings.filterwarnings('ignore') print("="*60) print(f"TQE交易日定时推送 - {datetime.now().strftime('%Y-%m-%d %H:%M')}") print("="*60) # 获取数据 df = fetch_stock_data("399673", "2025-01-01", "2026-12-31", "d") if df is None: print("❌ 数据获取失败,跳过") exit(0) # 检查今天是否有数据(是否为交易日) today = datetime.now().strftime('%Y-%m-%d') if today not in df.index.strftime('%Y-%m-%d').values: print(f"📅 {today} 非交易日,跳过推送") exit(0) print(f"✅ {today} 是交易日,生成报告...") # 获取最近365天数据用于统计 last_365 = df.tail(365).copy() # 评估今日趋势质量 evaluator = TrendQualityEvaluator() score = evaluator.evaluate(df) # 获取最新数据 latest = df.iloc[-1] prev = df.iloc[-2] if len(df) > 1 else latest # 计算涨跌 daily_change = latest['close'] - prev['close'] daily_change_pct = daily_change / prev['close'] * 100 # 计算近期趋势 ret_5d = (latest['close'] / df['close'].iloc[-6] - 1) * 100 if len(df) >= 6 else 0 ret_20d = (latest['close'] / df['close'].iloc[-21] - 1) * 100 if len(df) >= 21 else 0 # 生成邮件内容 state_name = ['震荡', '趋势', '反转'][0] # TQE不输出状态,只输出评分 is_tradeable = score.is_tradeable html = f"""

📊 Trend-Quality-Evaluator 每日质量报告

📈 今日评估 ({today})

收盘价: {latest['close']:.2f}

日涨跌: {daily_change:+.2f} ({daily_change_pct:+.2f}%)

趋势质量评分: {score.total_score:.1f}分

交易建议: {'✅ 可交易 (≥60分)' if is_tradeable else '❌ 观望 (<60分)'}

📊 各因子得分详情

因子得分满分原始指标阈值
ADX趋势强度{score.adx_score:.1f}30ADX={score.adx_value:.2f}>25
均线斜率{score.ma_slope_score:.1f}25斜率={score.ma_slope:.4f}>1.002
波动率收缩{score.volatility_score:.1f}20ATR比={score.volatility_ratio:.3f}<0.8
时间框架共振{score.timeframe_score:.1f}15日周共振-
成交量确认{score.volume_score:.1f}10量比={score.volume_ratio:.2f}x>1.5

📈 近期趋势

5日涨跌: {ret_5d:+.2f}%

20日涨跌: {ret_20d:+.2f}%

365天最高: {last_365['close'].max():.2f} ({last_365['close'].idxmax().strftime('%y-%m-%d')})

365天最低: {last_365['close'].min():.2f} ({last_365['close'].idxmin().strftime('%y-%m-%d')})

💡 交易建议说明


推送时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}
评估标的: 创业板50指数 (sz399673)
模型版本: Trend-Quality-Evaluator v1.0 (参数已优化)

""" # 发送邮件 EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "regime@erwin.wang", "receiver_email": "380880504@qq.com" } msg = MIMEMultipart('related') msg['Subject'] = Header(f"📊 TQE每日质量报告 [{today}] {'✅可交易' if is_tradeable else '❌观望'} {score.total_score:.0f}分", 'utf-8') msg['From'] = EMAIL_CONFIG['sender_email'] msg['To'] = EMAIL_CONFIG['receiver_email'] msg.attach(MIMEText(html, 'html', 'utf-8')) try: with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server: server.sendmail(EMAIL_CONFIG['sender_email'], EMAIL_CONFIG['receiver_email'], msg.as_string()) print(f"✅ 邮件发送成功! [{today}] 评分: {score.total_score:.1f}分 {'可交易' if is_tradeable else '观望'}") except Exception as e: print(f"❌ 邮件发送失败: {e}") print("="*60)