#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 T+1 舒适区交易信号报告 ────────────────────────────────────────────────────────────────────── 在 auto_report_long_only_t1.py 的基础上融合最终策略(Version F): 流程: 1. 加载本地30分钟K线数据 2. 运行多空双向策略 → 提取做多交易 → T+1规则转换 3. 计算市场环境指标(MarketEnvironmentAnalyzer) 4. 应用 Version F 过滤: - 死亡区(下跌趋势低波 / 震荡低波)→ 跳过 - 加分模型评分 ≥ 5 → 正常仓位 - 命中组合规则 → 仓位 ×1.5 5. 计算今日信号(当日开仓 / 当日平仓 / 无操作) 6. 生成 HTML 报告并发送邮件 """ import sys import os if sys.platform == 'win32': import io sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import pandas as pd import numpy as np from datetime import datetime, timedelta import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.header import Header import warnings warnings.filterwarnings('ignore') from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) from t1_converter import simulate_t1_trades from comfort_zone_analyzer import MarketEnvironmentAnalyzer, ComfortZoneAnalyzer # ══════════════════════════════════════════════════════════════════ # 配置项 # ══════════════════════════════════════════════════════════════════ EMAIL_CONFIG = { "smtp_server": "localhost", "smtp_port": 25, "sender_email": "cyb50-t1@erwin.wang", "receiver_emails": ["380880504@qq.com"], } INITIAL_CAPITAL = 1_000_000 FETCH_DAYS = 90 # 拉取最近N天数据(含指标预热期) REPORT_DAYS = 60 # 报告统计区间(最近N天交易) # Version F 策略参数 DEATH_ZONES = {'下跌趋势低波', '震荡低波'} SCORE_THRESH = 5 # 最低入场评分 BOOST_MULT = 1.5 # 命中组合规则时的仓位乘数 # ══════════════════════════════════════════════════════════════════ # 最终策略核心逻辑(与 final_strategy.py 保持一致) # ══════════════════════════════════════════════════════════════════ def vol_bin(v): if pd.isna(v): return 'unknown' if v < 0.20: return 'vol极低' if v < 0.40: return 'vol低' if v < 0.60: return 'vol中' if v < 0.80: return 'vol高' return 'vol极高' def rsi_bin(v): if pd.isna(v): return 'unknown' if v < 0.05: return 'rsi极底' if v < 0.10: return 'rsi底' if v < 0.20: return 'rsi低' if v < 0.40: return 'rsi偏低' if v < 0.60: return 'rsi中' if v < 0.80: return 'rsi偏高' return 'rsi高' def ts_bin(v): if pd.isna(v): return 'unknown' if v < 1.0: return 'ts弱' if v < 1.5: return 'ts中弱' if v < 2.5: return 'ts中' if v < 4.0: return 'ts强' return 'ts极强' def comfort_score(row) -> int: s = 0 ms = str(row.get('市场状态', '')) vl = str(row.get('波动率水平', '')) rsi = str(row.get('RSI区域', '')) vq = row.get('波动率分位', float('nan')) rq = row.get('RSI分位', float('nan')) ts = row.get('趋势强度', float('nan')) t1 = str(row.get('T+1调整', row.get('T1调整', ''))) if ms == '下跌趋势高波' and vl == '极低': s += 3 if pd.notna(rq) and 0.05 <= rq < 0.10: s += 3 if pd.notna(vq) and vq < 0.30: s += 2 if pd.notna(ts) and 1.5 <= ts < 4.0: s += 2 if rsi == '中性偏弱': s += 2 if 'T0' not in t1: s += 2 if vl in ('极低', '低', '中等'): s += 1 if pd.notna(rq) and rq >= 0.60: s += 1 return s COMBO_RULES = [ {'RSI区域': '中性偏弱', 'rsi_bin': 'rsi偏低'}, {'vol_bin': 'vol中', 'ts_bin': 'ts强'}, {'波动率水平': '中等', 'ts_bin': 'ts强'}, {'市场状态': '震荡高波', 'RSI区域': '中性偏弱'}, {'RSI区域': '中性偏弱', 'ts_bin': 'ts弱'}, ] def hits_combo(row) -> bool: for rule in COMBO_RULES: if all(str(row.get(col, '')) == str(val) for col, val in rule.items()): return True return False def add_strategy_labels(df: pd.DataFrame) -> pd.DataFrame: """为已包含市场环境列的 DataFrame 追加评分、组合命中、仓位乘数列""" df = df.copy() df['vol_bin'] = df['波动率分位'].apply(vol_bin) df['rsi_bin'] = df['RSI分位'].apply(rsi_bin) df['ts_bin'] = df['趋势强度'].apply(ts_bin) df['评分'] = df.apply(comfort_score, axis=1) df['组合命中'] = df.apply(hits_combo, axis=1) df['死亡区'] = df['市场状态'].isin(DEATH_ZONES) df['入场有效'] = (~df['死亡区']) & (df['评分'] >= SCORE_THRESH) df['仓位乘数'] = df.apply( lambda r: BOOST_MULT if (r['入场有效'] and r['组合命中']) else 1.0, axis=1 ) df['实际盈亏'] = df['盈亏金额'] * df.apply( lambda r: r['仓位乘数'] if r['入场有效'] else 0.0, axis=1 ) df['入场建议'] = df.apply(_entry_advice, axis=1) return df def _entry_advice(row) -> str: if row['死亡区']: return f'⛔ 跳过(死亡区:{row["市场状态"]})' if row['评分'] < SCORE_THRESH: return f'⏸ 观望(评分{row["评分"]} < {SCORE_THRESH})' if row['组合命中']: return f'✅ 入场 {BOOST_MULT}x 仓(评分{row["评分"]},组合规则命中)' return f'✅ 入场 1x 仓(评分{row["评分"]})' # ══════════════════════════════════════════════════════════════════ # 实时数据获取 # ══════════════════════════════════════════════════════════════════ def fetch_data(fetcher) -> pd.DataFrame: """ 拉取最近 FETCH_DAYS 天的30分钟K线(东方财富主,新浪备用)。 多取 FETCH_DAYS 天是为了保证 MarketEnvironmentAnalyzer 的指标预热(需要 120 根以上K线)。 实际回测报告只展示最近 REPORT_DAYS 天的交易。 """ end_date = datetime.now() start_date = end_date - timedelta(days=FETCH_DAYS) print(f'📡 在线获取30分钟K线数据(最近 {FETCH_DAYS} 天)...') print(f' 请求区间: {start_date.strftime("%Y-%m-%d")} ~ {end_date.strftime("%Y-%m-%d")}') df = fetcher.fetch_30min_data(start_date=start_date, end_date=end_date) if df is None or df.empty: raise RuntimeError('数据获取失败,所有数据源均返回空') print(f' 实际区间: {df.index[0]} ~ {df.index[-1]} ({len(df)} 条K线)') return df # ══════════════════════════════════════════════════════════════════ # 今日信号分析 # ══════════════════════════════════════════════════════════════════ def analyse_today(labeled_df: pd.DataFrame, data_with_ind: pd.DataFrame, market_analyzer, cza): """ 返回当日相关交易与当前市场快照。 labeled_df: 经过 add_strategy_labels() 处理的完整 T+1 交易记录 data_with_ind: 含技术指标的 K 线数据(用于获取当前市场状态) market_analyzer: 已初始化的 MarketEnvironmentAnalyzer cza: 已初始化的 ComfortZoneAnalyzer(用于分类函数) """ today = datetime.now().date() # 当日开仓(买入信号,T+1 明日平仓) opened_today = labeled_df[ pd.to_datetime(labeled_df['开仓时间']).dt.date == today ] # 当日平仓(昨日买入,今日卖出) closing_today = labeled_df[ pd.to_datetime(labeled_df['平仓时间']).dt.date == today ] # 当前市场快照(取最新一根K线) last_bar = data_with_ind.iloc[-1] env = market_analyzer.get_environment_at_time(data_with_ind.index[-1]) if env is None: env = {} # 将 env 转为伪行以便计算评分 current_row = { '市场状态': env.get('market_regime', '未知'), '波动率水平': cza._classify_volatility(env.get('volatility_percentile', 0.5)), 'RSI区域': cza._classify_rsi(env.get('rsi', 50)), '波动率分位': env.get('volatility_percentile', float('nan')), 'RSI分位': env.get('rsi_percentile', float('nan')), '趋势强度': env.get('trend_strength', float('nan')), 'T+1调整': '', 'vol_bin': vol_bin(env.get('volatility_percentile', float('nan'))), 'rsi_bin': rsi_bin(env.get('rsi_percentile', float('nan'))), 'ts_bin': ts_bin(env.get('trend_strength', float('nan'))), } current_score = comfort_score(current_row) current_combo = hits_combo(current_row) current_dead = current_row['市场状态'] in DEATH_ZONES current_advice = _entry_advice({ '死亡区': current_dead, '评分': current_score, '组合命中': current_combo, '市场状态': current_row['市场状态'], }) snapshot = { 'time': str(data_with_ind.index[-1]), 'close': last_bar['Close'], 'market_regime': current_row['市场状态'], 'vol_level': current_row['波动率水平'], 'rsi_zone': current_row['RSI区域'], 'vol_pct': env.get('volatility_percentile', float('nan')), 'rsi_pct': env.get('rsi_percentile', float('nan')), 'trend_strength': env.get('trend_strength', float('nan')), 'rsi_value': env.get('rsi', float('nan')), 'score': current_score, 'combo_hit': current_combo, 'dead_zone': current_dead, 'advice': current_advice, } return opened_today, closing_today, snapshot # ══════════════════════════════════════════════════════════════════ # HTML 报告生成 # ══════════════════════════════════════════════════════════════════ CSS = """ body{font-family:Arial,sans-serif;margin:20px;color:#333} h1{border-bottom:3px solid #007bff;padding-bottom:8px;color:#222} h2{color:#555;margin-top:28px;border-left:4px solid #007bff;padding-left:8px} table{border-collapse:collapse;width:100%;margin:12px 0;font-size:13px} th,td{border:1px solid #ddd;padding:7px 10px;text-align:left} th{background:#007bff;color:#fff} tr:nth-child(even){background:#f7f9fc} .pos{color:#0a8a0a;font-weight:bold} .neg{color:#cc1111;font-weight:bold} .tag-green{background:#d4edda;color:#155724;border-radius:4px;padding:2px 7px;font-size:12px} .tag-red{background:#f8d7da;color:#721c24;border-radius:4px;padding:2px 7px;font-size:12px} .tag-yellow{background:#fff3cd;color:#856404;border-radius:4px;padding:2px 7px;font-size:12px} .tag-gray{background:#e2e3e5;color:#383d41;border-radius:4px;padding:2px 7px;font-size:12px} .signal-box{border-radius:8px;padding:16px 20px;margin:12px 0;font-size:15px} .signal-enter{background:#d4edda;border-left:5px solid #28a745} .signal-boost{background:#cce5ff;border-left:5px solid #007bff} .signal-watch{background:#fff3cd;border-left:5px solid #ffc107} .signal-skip{background:#f8d7da;border-left:5px solid #dc3545} """ def _pnl_cls(v): return 'pos' if v >= 0 else 'neg' def _score_tag(score): if score >= 5: return f'评分 {score} ✓' return f'评分 {score} ✗' def _regime_tag(regime): if regime in DEATH_ZONES: return f'{regime} ⚠' if '高波' in regime: return f'{regime}' return f'{regime}' def _advice_box(advice): if '⛔' in advice: cls = 'signal-skip' elif '⏸' in advice: cls = 'signal-watch' elif f'{BOOST_MULT}x' in advice: cls = 'signal-boost' else: cls = 'signal-enter' return f'
生成时间:{now_str} | 策略版本:Version F(排死亡区 + 评分≥{SCORE_THRESH} + 组合规则加仓{BOOST_MULT}x)
数据来源:在线实时(东方财富/新浪)| 拉取 {FETCH_DAYS} 天K线(含指标预热)| 报告统计近 {REPORT_DAYS} 天
| 指标 | 数值 | 说明 |
|---|---|---|
| 最新K线时间 | {snapshot['time']} | |
| 最新收盘价 | {snapshot['close']:.3f} | |
| 市场状态 | {_regime_tag(snapshot['market_regime'])} | {'⚠ 死亡区,新信号跳过' if snapshot['dead_zone'] else '非死亡区,信号有效'} |
| 波动率水平 | {snapshot['vol_level']} | 分位 {snapshot['vol_pct']:.2f} |
| RSI 状态 | {snapshot['rsi_zone']} | RSI={snapshot['rsi_value']:.1f},分位 {snapshot['rsi_pct']:.2f} |
| 趋势强度 | {snapshot['trend_strength']:.2f} | |
| 加分模型评分 | {_score_tag(snapshot['score'])} | 阈值 {SCORE_THRESH},{'通过 ✓' if snapshot['score'] >= SCORE_THRESH else '未通过 ✗'} |
| 组合规则 | {'命中 → 仓位×1.5' if snapshot['combo_hit'] else '未命中 → 仓位×1.0'} |
| 开仓时间 | 开仓价 | 市场状态 | 评分 | 组合命中 | 入场建议 | 仓位乘数 |
|---|---|---|---|---|---|---|
| {r['开仓时间']} | {r['开仓价格']:.3f} | {_regime_tag(r.get('市场状态','—'))} | {_score_tag(r.get('评分',0))} | {'是' if r.get('组合命中') else '否'} | {r.get('入场建议','—')} | {r.get('仓位乘数',1.0):.1f}x |
今日暂无新开仓信号。
' # ── 今日平仓 ───────────────────────────────────────────────── html += f'| 开仓时间 | 平仓时间 | 开仓价 | 平仓价 | 盈亏 | 评分 | 仓位乘数 | 实际盈亏 |
|---|---|---|---|---|---|---|---|
| {r['开仓时间']} | {r['平仓时间']} | {r['开仓价格']:.3f} | {r['平仓价格']:.3f} | {orig:+,.0f}元 | {_score_tag(r.get('评分',0))} | {r.get('仓位乘数',1.0):.1f}x | {pnl:+,.0f}元 |
今日暂无平仓操作。
' # ── 整体绩效(Version F) ───────────────────────────────────── html += f"""| 指标 | 数值 |
|---|---|
| 参与交易 | {total_n} 笔 |
| 胜率 | {wr:.1f}% |
| 盈亏比 | {plr:.2f} |
| 总盈亏 | {total_pnl:+,.0f} 元 |
| 总收益率 | {total_ret:+.2f}% |
| 最终资金 | {vF_cap:,.0f} 元 |
| 最大回撤 | {max_dd:.1f}% |
| 年份 | 笔数 | 胜率 | 总盈亏 | 年收益率 | 期末资金 |
|---|---|---|---|---|---|
| {y} | {len(sy)} | ' f'{wr_y:.1f}% | ' f'{pnl:+,.0f} | ' f'{ret_y:+.2f}% | ' f'{end:,.0f} |
| # | 开仓时间 | 平仓时间 | 开仓价 | 平仓价 | 市场状态 | 评分 | 仓位 | 盈亏 | 实际盈亏 | 资金余额 |
|---|---|---|---|---|---|---|---|---|---|---|
| {i+1} | ' f'{str(r["开仓时间"])[:16]} | ' f'{str(r["平仓时间"])[:16]} | ' f'{r["开仓价格"]:.3f} | {r["平仓价格"]:.3f} | ' f'{_regime_tag(r.get("市场状态","—"))} | ' f'{_score_tag(r.get("评分",0))} | ' f'{r.get("仓位乘数",1.0):.1f}x | ' f'{orig:+,.0f} | ' f'{pnl:+,.0f} | ' f'{r["资金余额"]:,.0f} |
暂无符合条件的交易记录。
' html += '' # ── 纯文本摘要 ──────────────────────────────────────────────── text = f"""CYB50 T+1 舒适区交易信号报告 生成时间: {now_str} 数据来源: 在线实时 | 拉取{FETCH_DAYS}天K线 | 报告统计近{REPORT_DAYS}天 【当前市场状态】 市场状态: {snapshot['market_regime']} {'⚠ 死亡区' if snapshot['dead_zone'] else ''} 波动率: {snapshot['vol_level']} (分位 {snapshot['vol_pct']:.2f}) RSI: {snapshot['rsi_zone']} ({snapshot['rsi_value']:.1f}, 分位 {snapshot['rsi_pct']:.2f}) 趋势强度: {snapshot['trend_strength']:.2f} 加分评分: {snapshot['score']} {'✓ 入场' if snapshot['score'] >= SCORE_THRESH else '✗ 观望'} 组合规则: {'命中 → ×1.5' if snapshot['combo_hit'] else '未命中 → ×1.0'} 【入场建议】{snapshot['advice']} 【今日信号】 新开仓: {len(opened_today)} 笔 平仓: {len(closing_today)} 笔 【Version F 整体绩效】 总收益率: {total_ret:+.2f}% 胜率: {wr:.1f}% 盈亏比: {plr:.2f} 总盈亏: {total_pnl:+,.0f}元 最大回撤: {max_dd:.1f}% """ return html, text # ══════════════════════════════════════════════════════════════════ # 邮件发送 # ══════════════════════════════════════════════════════════════════ def send_email(subject, html_content, text_content=''): try: msg = MIMEMultipart('alternative') msg['Subject'] = Header(subject, 'utf-8') msg['From'] = EMAIL_CONFIG['sender_email'] msg['To'] = ', '.join(EMAIL_CONFIG['receiver_emails']) msg.attach(MIMEText(text_content, 'plain', 'utf-8')) msg.attach(MIMEText(html_content, 'html', 'utf-8')) with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as s: s.sendmail(EMAIL_CONFIG['sender_email'], EMAIL_CONFIG['receiver_emails'], msg.as_string()) print(f'✅ 邮件发送成功: {subject}') return True except Exception as e: print(f'❌ 邮件发送失败: {e}') return False # ══════════════════════════════════════════════════════════════════ # 主流程 # ══════════════════════════════════════════════════════════════════ def main(): print('=' * 72) print(' CYB50 T+1 舒适区交易信号报告系统') print(f' 执行时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}') print('=' * 72) # ── 步骤1:在线获取数据 ────────────────────────────────────── config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) try: raw_data = fetch_data(fetcher) except Exception as e: print(f'❌ 数据获取失败: {e}') return if raw_data is None or len(raw_data) < 50: print('❌ 数据不足,退出') return # ── 步骤2:计算技术指标 ────────────────────────────────────── print('\n📈 步骤2: 计算技术指标并运行多空双向策略...') data_with_ind = fetcher.calculate_intraday_indicators(raw_data) signal_gen = DualDirectionSignalGenerator() signals_df = signal_gen.generate_dual_direction_signals(data_with_ind) executor = DualDirectionExecutor(initial_capital=INITIAL_CAPITAL) _, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() print(f' 做多交易: {len(long_trades)} 笔') # ── 步骤3:T+1 规则转换 ────────────────────────────────────── print('\n🔄 步骤3: T+1 规则转换...') t1_trades = simulate_t1_trades(data_with_ind, long_trades, INITIAL_CAPITAL) print(f' T+1 交易记录: {len(t1_trades)} 笔') # ── 步骤4:计算市场环境 ────────────────────────────────────── print('\n🔬 步骤4: 计算市场环境指标...') market_analyzer = MarketEnvironmentAnalyzer(data_with_ind) cza = ComfortZoneAnalyzer(t1_trades, market_analyzer) cza._enrich_trades_with_environment() # 结果存入 cza.enriched_trades enriched = cza.enriched_trades # ── 步骤5:应用 Version F 过滤 ─────────────────────────────── print('\n🎯 步骤5: 应用 Version F 策略过滤...') labeled = add_strategy_labels(enriched) total = len(labeled) passed = labeled['入场有效'].sum() dead = labeled['死亡区'].sum() low_sc = (~labeled['死亡区'] & (labeled['评分'] < SCORE_THRESH)).sum() combo = (labeled['入场有效'] & labeled['组合命中']).sum() print(f' 全量: {total}笔 | 死亡区过滤: {dead}笔 | 低分过滤: {low_sc}笔') print(f' 入场有效: {passed}笔 | 其中加仓×{BOOST_MULT}: {combo}笔') # ── 步骤6:今日信号分析 ────────────────────────────────────── print('\n📅 步骤6: 分析今日信号...') opened_today, closing_today, snapshot = analyse_today( labeled, data_with_ind, market_analyzer, cza ) print(f' 今日开仓: {len(opened_today)}笔 今日平仓: {len(closing_today)}笔') print(f' 当前市场: {snapshot["market_regime"]} 评分: {snapshot["score"]} 建议: {snapshot["advice"]}') # ── 步骤7:生成报告 ────────────────────────────────────────── print('\n📝 步骤7: 生成 HTML 报告...') html_report, text_report = generate_report(labeled, snapshot, opened_today, closing_today) # ── 步骤8:决定是否发送邮件 ────────────────────────────────── now = datetime.now() is_post_close = (now.hour == 15 and 0 <= now.minute <= 30) has_today = len(opened_today) > 0 or len(closing_today) > 0 should_send = has_today or is_post_close cutoff = datetime.now() - timedelta(days=REPORT_DAYS) vF_trades = labeled[ labeled['入场有效'] & (pd.to_datetime(labeled['开仓时间']) >= cutoff) ] vF_pnl = vF_trades['实际盈亏'].sum() if len(vF_trades) else 0 vF_ret = vF_pnl / INITIAL_CAPITAL * 100 subject = ( f'📡 CYB50-T1信号 {now.strftime("%m-%d %H:%M")} | ' f'{snapshot["market_regime"]} | ' f'评分{snapshot["score"]} | ' f'建议:{snapshot["advice"][:6]} | ' f'累计收益{vF_ret:+.1f}%' ) print(f'\n📧 步骤8: 发送邮件...') if should_send: send_email(subject, html_report, text_report) else: print(' 当天无交易且非盘后时间,跳过发送') print(f' 邮件主题预览: {subject}') # ── 本地保存报告 ───────────────────────────────────────────── ts = now.strftime('%Y%m%d_%H%M%S') rpt_path = os.path.join(os.path.dirname(__file__), f'signal_report_{ts}.html') with open(rpt_path, 'w', encoding='utf-8') as f: f.write(html_report) print(f' 报告已保存: {os.path.basename(rpt_path)}') print('\n' + '=' * 72) print(' 完成') print('=' * 72) if __name__ == '__main__': main()