| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50指数 - 自动化交易报告系统 (完整版)
- 基于原版 cyb50_30min_intraday_reversal.py 策略
- 功能:
- 1. 获取近2个月实时数据
- 2. 运行完整策略回测(做多翻转策略)
- 3. 生成详细报告
- 4. 发送邮件通知
- """
- import pandas as pd
- import numpy as np
- import akshare as ak
- import warnings
- import os
- import smtplib
- import ssl
- import requests
- import json
- from datetime import datetime, timedelta
- from email.mime.text import MIMEText
- from email.mime.multipart import MIMEMultipart
- from email.header import Header
- warnings.filterwarnings('ignore')
- # ==================== 邮件配置 ====================
- EMAIL_CONFIG = {
- "smtp_server": "localhost",
- "smtp_port": 25,
- "sender_email": "catfly@openclaw.local",
- "sender_password": "",
- "receiver_email": "380880504@qq.com"
- }
- def send_email(subject, html_content, text_content=""):
- """发送邮件"""
- try:
- msg = MIMEMultipart('alternative')
- msg['Subject'] = Header(subject, 'utf-8')
- msg['From'] = EMAIL_CONFIG['sender_email']
- msg['To'] = EMAIL_CONFIG['receiver_email']
-
- text_part = MIMEText(text_content, 'plain', 'utf-8')
- msg.attach(text_part)
-
- html_part = MIMEText(html_content, 'html', 'utf-8')
- msg.attach(html_part)
-
- with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server:
- server.sendmail(
- EMAIL_CONFIG['sender_email'],
- EMAIL_CONFIG['receiver_email'],
- msg.as_string()
- )
- print(f"✅ 邮件发送成功: {subject}")
- return True
- except Exception as e:
- print(f"❌ 邮件发送失败: {e}")
- return False
- # ==================== 数据获取 ====================
- class DataFetcher:
- """数据获取类 - 双数据源"""
-
- @staticmethod
- def fetch_recent_2months():
- """获取近2个月数据"""
- end_date = datetime.now()
- start_date = end_date - timedelta(days=70)
-
- print(f"获取数据: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
-
- # 尝试东方财富
- df = DataFetcher._fetch_eastmoney_data(start_date, end_date)
- if df is not None:
- return df
-
- # 尝试新浪财经
- print("⚠️ 东方财富失败,尝试新浪财经...")
- df = DataFetcher._fetch_sina_data(start_date, end_date)
- if df is not None:
- return df
-
- raise Exception("无法获取实时数据,所有数据源均失败。")
-
- @staticmethod
- def _fetch_eastmoney_data(start_date, end_date):
- """东方财富数据源"""
- try:
- print("[数据源1] 东方财富30分钟K线...")
- df = ak.index_zh_a_hist_min_em(symbol="399673", period="30")
-
- if df is not None and not df.empty and len(df) >= 50:
- # 标准化列名(大写,与原版一致)
- df = df.rename(columns={
- '时间': 'DateTime',
- '开盘': 'Open',
- '收盘': 'Close',
- '最高': 'High',
- '最低': 'Low',
- '成交量': 'Volume'
- })
-
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df = df.set_index('DateTime').sort_index()
-
- # 计算基础指标
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
-
- # 保留近2个月
- backtest_start = end_date - timedelta(days=60)
- df_backtest = df[df.index >= backtest_start].copy()
-
- print(f"✅ 东方财富: {len(df_backtest)}条K线 ({df_backtest.index[0]} ~ {df_backtest.index[-1]})")
- return df_backtest
- except Exception as e:
- print(f"❌ 东方财富: {e}")
- return None
-
- @staticmethod
- def _fetch_sina_data(start_date, end_date):
- """新浪财经数据源"""
- try:
- print("[数据源2] 新浪财经30分钟K线...")
-
- symbol = "sz399673"
- url = f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var_{symbol}_30_/CN_MarketDataService.getKLineData?symbol={symbol}&scale=30&ma=no&datalen=1023"
-
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
- 'Referer': 'https://finance.sina.com.cn/'
- }
-
- response = requests.get(url, headers=headers, timeout=15)
- json_start = response.text.find('[')
- json_end = response.text.rfind(']') + 1
- json_str = response.text[json_start:json_end]
-
- data_dict = json.loads(json_str)
-
- if data_dict and isinstance(data_dict, list):
- data_list = []
- for item in data_dict:
- data_list.append({
- 'DateTime': item.get('day'),
- 'Open': float(item.get('open', 0)),
- 'High': float(item.get('high', 0)),
- 'Low': float(item.get('low', 0)),
- 'Close': float(item.get('close', 0)),
- 'Volume': float(item.get('volume', 0))
- })
-
- df = pd.DataFrame(data_list)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df = df.set_index('DateTime').sort_index()
-
- # 计算基础指标
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
-
- # 保留近2个月
- backtest_start = end_date - timedelta(days=60)
- df_backtest = df[df.index >= backtest_start].copy()
-
- print(f"✅ 新浪财经: {len(df_backtest)}条K线 ({df_backtest.index[0]} ~ {df_backtest.index[-1]})")
- return df_backtest
- except Exception as e:
- print(f"❌ 新浪财经: {e}")
- return None
- # ==================== 策略类(完整版) ====================
- class CatFlyStrategy:
- """cat-fly完整策略 - 日内翻转做多策略"""
-
- def __init__(self, config=None):
- self.config = config or {
- 'initial_capital': 1000000,
- 'position_size_pct': 1.0,
- 'stop_loss_pct': 0.008,
- 'take_profit_pct': 0.015,
- 'max_hold_bars': 16,
- 'min_reversal_score': 4 # 原版阈值是4
- }
- self.initial_capital = self.config['initial_capital']
-
- def calculate_indicators(self, df):
- """计算完整技术指标(与原版一致)"""
- print("计算技术指标...")
-
- # 短期移动平均线
- df['MA6'] = df['Close'].rolling(window=6).mean()
- df['MA12'] = df['Close'].rolling(window=12).mean()
- df['MA24'] = df['Close'].rolling(window=24).mean()
-
- # RSI
- delta = df['Close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
- rs = gain / loss
- df['RSI'] = 100 - (100 / (1 + rs))
-
- # 布林带
- df['BB_middle'] = df['Close'].rolling(window=20).mean()
- bb_std = df['Close'].rolling(window=20).std()
- df['BB_upper'] = df['BB_middle'] + (bb_std * 2)
- df['BB_lower'] = df['BB_middle'] - (bb_std * 2)
- df['BB_width'] = (df['BB_upper'] - df['BB_lower']) / df['BB_middle']
-
- # MACD
- exp1 = df['Close'].ewm(span=12, adjust=False).mean()
- exp2 = df['Close'].ewm(span=26, adjust=False).mean()
- df['MACD'] = exp1 - exp2
- df['MACD_signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
- df['MACD_hist'] = df['MACD'] - df['MACD_signal']
-
- # KDJ
- low_9 = df['Low'].rolling(window=9).min()
- high_9 = df['High'].rolling(window=9).max()
- rsv = (df['Close'] - low_9) / (high_9 - low_9) * 100
- df['K'] = rsv.ewm(com=2, adjust=False).mean()
- df['D'] = df['K'].ewm(com=2, adjust=False).mean()
- df['J'] = 3 * df['K'] - 2 * df['D']
-
- # ATR
- high_low = df['High'] - df['Low']
- high_close = abs(df['High'] - df['Close'].shift())
- low_close = abs(df['Low'] - df['Close'].shift())
- true_range = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
- df['ATR'] = true_range.rolling(window=14).mean()
- df['ATR_Pct'] = df['ATR'] / df['Close']
-
- # 动量指标
- df['Momentum'] = df['Close'] / df['Close'].shift(4) - 1
-
- # 成交量变化
- df['Volume_MA'] = df['Volume'].rolling(window=12).mean()
- df['Volume_Ratio'] = df['Volume'] / df['Volume_MA']
-
- # 价格动量
- df['Price_Momentum'] = (df['Close'] - df['Close'].shift(6)) / df['Close'].shift(6)
-
- print("技术指标计算完成")
- return df
-
- def generate_signals(self, df):
- """生成日内翻转信号(完整版)"""
- print("生成交易信号...")
- df = self.calculate_indicators(df)
-
- signals = []
- signal_count = 0
-
- # 从第24个周期开始(需要足够历史数据)
- for i in range(24, len(df)):
- current_bar = df.iloc[i]
- current_time = df.index[i]
-
- # 跳过非交易时间
- if hasattr(current_time, 'hour'):
- hour = current_time.hour
- if hour < 9 or hour > 15:
- continue
-
- # 计算翻转信号分数
- reversal_score = 0
- reversal_signals = []
-
- # 1. RSI超卖翻转
- if current_bar['RSI'] < 30:
- reversal_score += 2
- reversal_signals.append("RSI超卖")
- elif current_bar['RSI'] < 35:
- reversal_score += 1
- reversal_signals.append("RSI偏弱")
-
- # 2. KDJ超卖翻转
- if current_bar['K'] < 20 and current_bar['D'] < 20:
- reversal_score += 2
- reversal_signals.append("KDJ超卖")
- elif current_bar['J'] < 0:
- reversal_score += 2
- reversal_signals.append("KDJ极端超卖")
-
- # 3. MACD金叉或改善
- if current_bar['MACD_hist'] > 0 and df.iloc[i-1]['MACD_hist'] <= 0:
- reversal_score += 2
- reversal_signals.append("MACD金叉")
- elif current_bar['MACD_hist'] > df.iloc[i-1]['MACD_hist']:
- reversal_score += 1
- reversal_signals.append("MACD改善")
-
- # 4. 价格触及布林带下轨
- if current_bar['Close'] <= current_bar['BB_lower'] * 1.005:
- reversal_score += 2
- reversal_signals.append("触及下轨")
- elif current_bar['Close'] <= current_bar['BB_lower'] * 1.01:
- reversal_score += 1
- reversal_signals.append("接近下轨")
-
- # 5. 连续下跌后的反转
- recent_returns = df.iloc[i-6:i]['Returns']
- if recent_returns.min() < -0.015:
- consecutive_decline = sum(recent_returns < 0)
- if consecutive_decline >= 4:
- reversal_score += 2
- reversal_signals.append("连续下跌反转")
-
- # 6. 价格动量超卖
- if current_bar['Price_Momentum'] < -0.02:
- reversal_score += 1
- reversal_signals.append("动量超卖")
-
- # 7. 成交量配合
- if current_bar['Volume_Ratio'] > 1.2:
- reversal_score += 1
- reversal_signals.append("放量配合")
-
- # 8. 日内低位
- try:
- daily_data = df[df.index.date == current_time.date()]
- if len(daily_data) > 0:
- daily_high = daily_data['High'].max()
- daily_low = daily_data['Low'].min()
- daily_range = daily_high - daily_low
- if daily_range > 0:
- position_in_day = (current_bar['Close'] - daily_low) / daily_range
- if position_in_day < 0.3:
- reversal_score += 1
- reversal_signals.append("日内低位")
- except:
- pass
-
- # 记录信号
- df.loc[df.index[i], 'Reversal_Score'] = reversal_score
- df.loc[df.index[i], 'Reversal_Signals'] = ', '.join(reversal_signals)
-
- # 生成买入信号(阈值4分)
- if reversal_score >= self.config['min_reversal_score']:
- df.loc[df.index[i], 'Signal'] = 1
- df.loc[df.index[i], 'Signal_Type'] = '做多翻转'
- signal_count += 1
- else:
- df.loc[df.index[i], 'Signal'] = 0
- df.loc[df.index[i], 'Signal_Type'] = ''
-
- print(f"信号生成完成: 共{signal_count}个翻转信号")
- return df
-
- def backtest(self, df):
- """回测执行"""
- df = self.generate_signals(df)
-
- trades = []
- capital = self.initial_capital
- position = 0
- entry_price = 0
- entry_time = None
- entry_signals = ""
- holding_bars = 0
-
- for i in range(24, len(df)):
- current_bar = df.iloc[i]
- price = current_bar['Close']
- current_time = df.index[i]
-
- # 检查是否在交易时间
- if hasattr(current_time, 'hour'):
- hour = current_time.hour
- minute = current_time.minute
- if hour == 11 and minute >= 30: # 午休前不新开仓
- pass
- if hour == 15: # 收盘前不新开仓
- pass
-
- # 无持仓时检查开仓
- if position == 0:
- if current_bar.get('Signal', 0) == 1:
- position_value = capital * self.config['position_size_pct']
- position_size = int(position_value / price)
-
- if position_size > 0:
- position = position_size
- entry_price = price
- entry_time = current_time
- entry_signals = current_bar.get('Reversal_Signals', '')
- holding_bars = 0
-
- # 有持仓时检查平仓
- elif position > 0:
- holding_bars += 1
- exit_signal = False
- exit_reason = ""
-
- # 止损
- stop_loss_price = entry_price * (1 - self.config['stop_loss_pct'])
- take_profit_price = entry_price * (1 + self.config['take_profit_pct'])
-
- if price <= stop_loss_price:
- exit_signal = True
- exit_reason = f"止损({self.config['stop_loss_pct']*100:.1f}%)"
-
- # 止盈
- elif price >= take_profit_price:
- exit_signal = True
- exit_reason = f"止盈({self.config['take_profit_pct']*100:.1f}%)"
-
- # 时间止损
- elif holding_bars >= self.config['max_hold_bars']:
- exit_signal = True
- exit_reason = f"时间止损({holding_bars}周期)"
-
- # RSI超买平仓
- elif current_bar['RSI'] > 70:
- exit_signal = True
- exit_reason = "RSI超买平仓"
-
- # 执行平仓
- if exit_signal:
- pnl = (price - entry_price) * position
- pnl_pct = (price - entry_price) / entry_price * 100
- capital += pnl
-
- trades.append({
- '方向': '做多',
- '开仓时间': entry_time,
- '平仓时间': current_time,
- '开仓价': entry_price,
- '平仓价': price,
- '持仓数量': position,
- '盈亏金额': pnl,
- '盈亏百分比': pnl_pct,
- '退出原因': exit_reason,
- '持仓周期': holding_bars,
- '信号详情': entry_signals,
- '平仓后资金': capital
- })
-
- position = 0
- entry_price = 0
- entry_time = None
- holding_bars = 0
-
- return df, pd.DataFrame(trades), capital
- # ==================== 报告生成 ====================
- def generate_report(trades_df, final_capital, initial_capital=1000000):
- """生成详细报告"""
-
- total_return = (final_capital - initial_capital) / initial_capital * 100
- total_trades = len(trades_df)
-
- if total_trades == 0:
- html = f"""
- <html><body>
- <h1>🚀 创业板50交易报告</h1>
- <p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
- <p>数据区间: 近2个月</p>
- <p><b>近2个月无交易信号触发</b></p>
- <p>初始资金: {initial_capital:,.0f}元</p>
- <p>最终资金: {final_capital:,.0f}元</p>
- <p>收益率: {total_return:+.2f}%</p>
- <p style="color: #666;">说明:策略在指定期间内未找到符合条件的翻转信号(需RSI<30、KDJ超卖、MACD金叉等多重条件同时满足)</p>
- </body></html>
- """
- text = f"""
- 创业板50交易报告
- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 数据区间: 近2个月
- 近2个月无交易信号触发
- 初始资金: {initial_capital:,.0f}元
- 最终资金: {final_capital:,.0f}元
- 收益率: {total_return:+.2f}%
- 说明:策略在指定期间内未找到符合条件的翻转信号
- """
- return html, text
-
- # 有交易数据时的报告...
- winning_trades = trades_df[trades_df['盈亏金额'] > 0]
- losing_trades = trades_df[trades_df['盈亏金额'] < 0]
-
- win_rate = len(winning_trades) / total_trades * 100
- total_profit = winning_trades['盈亏金额'].sum() if len(winning_trades) > 0 else 0
- total_loss = abs(losing_trades['盈亏金额'].sum()) if len(losing_trades) > 0 else 0
- profit_factor = total_profit / total_loss if total_loss > 0 else 0
-
- # HTML报告
- html = f"""
- <html><head><style>
- body {{ font-family: Arial, sans-serif; margin: 20px; }}
- h1 {{ color: #333; border-bottom: 2px solid #007bff; padding-bottom: 10px; }}
- h2 {{ color: #555; margin-top: 30px; }}
- table {{ border-collapse: collapse; width: 100%; margin: 15px 0; font-size: 14px; }}
- th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
- th {{ background-color: #007bff; color: white; }}
- tr:nth-child(even) {{ background-color: #f2f2f2; }}
- .positive {{ color: green; font-weight: bold; }}
- .negative {{ color: red; font-weight: bold; }}
- </style></head><body>
- <h1>🚀 创业板50交易报告</h1>
- <p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
- <p>数据区间: 近2个月</p>
-
- <h2>📊 总体绩效</h2>
- <table>
- <tr><th>指标</th><th>数值</th></tr>
- <tr><td>初始资金</td><td>{initial_capital:,.0f}元</td></tr>
- <tr><td>最终资金</td><td>{final_capital:,.0f}元</td></tr>
- <tr><td>总收益率</td><td class="{'positive' if total_return >= 0 else 'negative'}">{total_return:+.2f}%</td></tr>
- <tr><td>总交易次数</td><td>{total_trades}笔</td></tr>
- <tr><td>胜率</td><td>{win_rate:.1f}%</td></tr>
- <tr><td>盈亏比</td><td>{profit_factor:.2f}</td></tr>
- </table>
-
- <h2>📝 最近10笔交易明细</h2>
- <table>
- <tr>
- <th>开仓时间</th><th>平仓时间</th><th>开仓价</th><th>平仓价</th>
- <th>盈亏</th><th>盈亏%</th><th>退出原因</th><th>信号</th>
- </tr>
- """
-
- for _, trade in trades_df.tail(10).iterrows():
- pnl_class = "positive" if trade['盈亏金额'] >= 0 else "negative"
- html += f"""
- <tr>
- <td>{trade['开仓时间']}</td><td>{trade['平仓时间']}</td>
- <td>{trade['开仓价']:.2f}</td><td>{trade['平仓价']:.2f}</td>
- <td class="{pnl_class}">{trade['盈亏金额']:+.0f}</td>
- <td class="{pnl_class}">{trade['盈亏百分比']:+.2f}%</td>
- <td>{trade['退出原因']}</td><td>{trade['信号详情'][:20]}...</td>
- </tr>
- """
-
- html += "</table></body></html>"
-
- # 纯文本报告
- text = f"""
- 创业板50交易报告
- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 【总体绩效】
- 初始资金: {initial_capital:,.0f}元
- 最终资金: {final_capital:,.0f}元
- 总收益率: {total_return:+.2f}%
- 总交易次数: {total_trades}笔
- 胜率: {win_rate:.1f}%
- 盈亏比: {profit_factor:.2f}
- 【最近5笔交易】
- {trades_df.tail(5)[['开仓时间', '平仓时间', '盈亏金额', '退出原因']].to_string(index=False)}
- """
-
- return html, text
- # ==================== 主程序 ====================
- def main():
- """主程序"""
- print("="*80)
- print("🚀 cat-fly 自动交易报告系统 (完整版)")
- print("="*80)
- print(f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 1. 获取数据
- print("\n📊 步骤1: 获取近2个月实时数据...")
- try:
- df = DataFetcher.fetch_recent_2months()
- except Exception as e:
- print(f"❌ 数据获取失败: {e}")
- return
-
- # 2. 运行策略
- print("\n📈 步骤2: 运行完整策略回测...")
- strategy = CatFlyStrategy()
- df, trades_df, final_capital = strategy.backtest(df)
-
- print(f"✅ 回测完成: 共{len(trades_df)}笔交易")
- print(f" 最终资金: {final_capital:,.0f}元")
- print(f" 收益率: {(final_capital/1000000-1)*100:+.2f}%")
-
- # 3. 生成报告
- print("\n📝 步骤3: 生成报告...")
- html_report, text_report = generate_report(trades_df, final_capital)
-
- # 4. 发送邮件
- print("\n📧 步骤4: 发送邮件...")
- subject = f"🚀 创业板50交易报告 {datetime.now().strftime('%m-%d %H:%M')} | 收益{(final_capital/1000000-1)*100:+.2f}% | {len(trades_df)}笔交易"
- send_email(subject, html_report, text_report)
-
- print("\n✅ 全部完成!")
- print("="*80)
- if __name__ == "__main__":
- main()
|