#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Kalman策略 - 运行并发送邮件报告 (修复版) """ import sys sys.path.insert(0, '/root/.openclaw/workspace/kalman-filter') import subprocess import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.header import Header from email import encoders import os from datetime import datetime # 邮件配置 EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "kalman@erwin.wang", "receiver_email": "380880504@qq.com" } def run_kalman_strategy(): """运行 Kalman 策略""" print("="*60) print("📈 运行 Kalman 策略...") print("="*60) result = subprocess.run( ['python3', 'cyb50_kalman_filter_daily.py'], cwd='/root/.openclaw/workspace/kalman-filter', capture_output=True, text=True, timeout=300 ) return result.stdout, result.stderr, result.returncode def parse_results(output): """解析策略输出结果""" lines = output.split('\n') summary = { 'total_return': 'N/A', 'max_drawdown': 'N/A', 'sharpe': 'N/A', 'num_trades': 'N/A', 'win_rate': 'N/A', 'final_value': 'N/A' } for line in lines: if '总收益率:' in line: summary['total_return'] = line.split(':')[1].strip() elif '最大回撤:' in line: summary['max_drawdown'] = line.split(':')[1].strip() elif '夏普比率:' in line: summary['sharpe'] = line.split(':')[1].strip() elif '总交易对数' in line: summary['num_trades'] = line.split(':')[1].strip() elif '胜率:' in line: summary['win_rate'] = line.split(':')[1].strip() elif '最终资产:' in line: summary['final_value'] = line.split(':')[1].strip() return summary def simplify_reason(reason): """简化退出原因""" if '早期止损' in reason: return '早期止损' elif '趋势衰减' in reason: return '趋势衰减' elif '追踪止损' in reason: return '追踪止损' elif '最大亏损' in reason: return '最大亏损止损' elif '趋势卖出' in reason: return '趋势卖出' else: return reason.split('(')[0] if '(' in reason else reason def extract_recent_trades(output): """提取最近20次交易(优先从逐笔交易分析中提取,补充所有交易信号)""" lines = output.split('\n') trades = [] # 方法1: 从"逐笔交易盈亏分析"中提取(包含完整交易信息) in_trade_section = False for line in lines: if '逐笔交易盈亏分析' in line: in_trade_section = True continue if in_trade_section: stripped = line.strip() # 匹配格式: "21 2026-02-26 3500.75 2026-03-04 3310.59 4 1.0x -5.43% 早期止损(...)" if len(stripped) > 60: parts = stripped.split() if len(parts) >= 9: try: # 检查第一列是数字,第二列是日期 if parts[0].isdigit() and len(parts[1]) == 10 and parts[1][4] == '-': # 简化退出原因 parts[-1] = simplify_reason(parts[-1]) trade_line = ' '.join(parts[:9]) trades.append(trade_line) except: pass if len(trades) >= 25: # 多取一些,确保包含最近的交易 break if '退出原因统计' in line: break # 方法2: 如果逐笔交易不足,从"所有交易信号"中补充配对 if len(trades) < 20: signals = [] in_signal_section = False for line in lines: if '所有交易信号详情' in line: in_signal_section = True continue if in_signal_section: stripped = line.strip() if len(stripped) > 30: parts = stripped.split() if len(parts) >= 3 and parts[0].isdigit() and len(parts[1]) == 10 and parts[1][4] == '-': signals.append({ 'num': parts[0], 'date': parts[1], 'action': parts[2], 'price': parts[3] if len(parts) > 3 else '', }) if '当前市场状态' in line: break # 配对买卖信号(取全部信号,不只是最后40个) for i in range(len(signals) - 1): if signals[i]['action'] == '买入' and signals[i+1]['action'] == '卖出': buy = signals[i] sell = signals[i+1] # 检查是否已经存在(通过日期判断) trade_date = f"{buy['date']}买入" if not any(trade_date in t for t in trades): trade_line = f"{buy['num']:<4} {buy['date']:<12} {buy['price']:<10} {sell['date']:<12} {sell['price']:<10} {'?':<7} {'1.0x':<6} {'?':<8} {'信号配对'}" trades.append(trade_line) # 按交易编号排序,取最近20条 def get_trade_num(trade_line): try: return int(trade_line.split()[0]) except: return 0 trades.sort(key=get_trade_num) return trades[-20:] def extract_recent_signals(output): """提取最近20天信号详情""" lines = output.split('\n') signals = [] in_signal_section = False for line in lines: if '最近' in line and '个交易日信号判断详情' in line: in_signal_section = True continue if in_signal_section: stripped = line.strip() if len(stripped) > 80 and stripped[0:4].isdigit(): signals.append(stripped) if len(signals) >= 20: break if '分析完成' in line or '当前市场状态' in line: break return signals def extract_current_status(output): """提取当前市场状态""" lines = output.split('\n') status = [] in_status = False for line in lines: if '当前市场状态' in line: in_status = True if in_status: status.append(line.strip()) if len(status) >= 8: break return status def send_email(subject, html_content, text_content, attachments=None): """发送邮件""" try: msg = MIMEMultipart('alternative') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = EMAIL_CONFIG['sender_email'] msg['To'] = EMAIL_CONFIG['receiver_email'] text_part = MIMEText(text_content, 'plain', 'utf-8') msg.attach(text_part) html_part = MIMEText(html_content, 'html', 'utf-8') msg.attach(html_part) if attachments: for filepath in attachments: if os.path.exists(filepath): with open(filepath, 'rb') as f: attachment = MIMEBase('application', 'octet-stream') attachment.set_payload(f.read()) encoders.encode_base64(attachment) filename = os.path.basename(filepath) attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"') msg.attach(attachment) with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server: server.sendmail( EMAIL_CONFIG['sender_email'], EMAIL_CONFIG['receiver_email'], msg.as_string() ) print(f"✅ 邮件发送成功: {subject}") return True except Exception as e: print(f"❌ 邮件发送失败: {e}") return False def main(): """主程序""" print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") stdout, stderr, returncode = run_kalman_strategy() if returncode != 0: print(f"❌ 策略运行失败: {stderr}") return summary = parse_results(stdout) recent_trades = extract_recent_trades(stdout) recent_signals = extract_recent_signals(stdout) current_status = extract_current_status(stdout) # 构建文本邮件 text = f""" Kalman策略报告 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 【绩效摘要】 总收益率: {summary['total_return']} 最大回撤: {summary['max_drawdown']} 夏普比率: {summary['sharpe']} 总交易次数: {summary['num_trades']} 胜率: {summary['win_rate']} 最终资产: {summary['final_value']} 【当前市场状态】 """ for line in current_status[:8]: text += line + '\n' # 最近20次交易 text += "\n【最近20次交易】\n" for i, trade in enumerate(recent_trades, 1): text += f"{i:2d}. {trade}\n" # 最近20天信号 text += "\n【最近20天信号详情】\n" for signal in recent_signals: text += signal + '\n' # 构建HTML trades_html = "" for trade in recent_trades: parts = trade.split() if len(parts) >= 9: trades_html += "" # 只取前8列(编号到收益率),第9列是退出原因需要简化 for part in parts[:8]: trades_html += f"{part}" # 简化退出原因 reason = parts[8] if len(parts) > 8 else "" if '早期止损' in reason: reason = '早期止损' elif '趋势衰减' in reason: reason = '趋势衰减' elif '追踪止损' in reason: reason = '追踪止损' elif '最大亏损' in reason: reason = '最大亏损止损' elif '趋势卖出' in reason: reason = '趋势卖出' elif '信号配对' in reason: reason = '信号配对' trades_html += f"{reason}" trades_html += "" elif len(parts) >= 8: # 如果没有第9列,补充一个占位符 trades_html += "" for part in parts[:8]: trades_html += f"{part}" trades_html += "-" trades_html += "" signals_html = "" for signal in recent_signals: parts = signal.split() if len(parts) >= 11: signal_class = "" if '>>买入' in signal: signal_class = "buy" elif '>>卖出' in signal: signal_class = "sell" signals_html += f"" for part in parts[:11]: signals_html += f"{part}" signals_html += "" html = f"""

📊 Kalman策略报告

生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

📈 绩效摘要

总收益率
{summary['total_return']}
最大回撤
{summary['max_drawdown']}
夏普比率
{summary['sharpe']}
交易次数
{summary['num_trades']}
胜率
{summary['win_rate']}
最终资产
{summary['final_value']}

💼 最近20次交易

{trades_html}
#买入时间买入价卖出时间卖出价持仓天仓位收益率退出原因

📅 最近20天信号详情

{signals_html}
日期收盘Trend加速度波动率K偏离趋势加速波动偏离信号说明
附件:
• kalman_filter_analysis.png - 策略分析图表
• kalman_daily_signals.csv - 完整交易信号数据
""" subject = f"📊 Kalman策略 {datetime.now().strftime('%m-%d %H:%M')} | 收益{summary['total_return']}" attachments = [ '/root/.openclaw/workspace/kalman-filter/kalman_filter_analysis.png', '/root/.openclaw/workspace/kalman-filter/kalman_daily_signals.csv' ] send_email(subject, html, text, attachments) print(f"\n📊 提取到 {len(recent_trades)} 条交易记录") print(f"📊 提取到 {len(recent_signals)} 条信号记录") print("\n✅ 全部完成!") if __name__ == "__main__": main()