#!/usr/bin/env python3 # coding=utf-8 """ T0交易信号分析器 支持日内T0交易,打印最近10天的买卖交易信号及原因 """ import sys import io import pandas as pd import numpy as np from datetime import datetime, timedelta from typing import Dict, List, Optional from dataclasses import dataclass # 设置标准输出为UTF-8编码 sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') from data_fetcher_v2 import DataFetcherV2, DataManagerV2 import MyTT # 尝试导入requests用于实时数据 try: import requests REQUESTS_AVAILABLE = True except ImportError: REQUESTS_AVAILABLE = False print("⚠️ requests未安装,无法获取实时数据") @dataclass class TradingSignal: """交易信号记录""" date: datetime symbol: str close_price: float y0: float y1: float y2: float y3: float h1: float h2: float a1: float b1: float cross_y0_y1: bool cross_y1_y0: bool signal_type: str # 'BUY', 'SELL', 'NONE' reason: str # 信号原因或不触发原因 position_status: str # 当前持仓状态 class T0StrategyAnalyzer: """T0策略分析器 - 支持日内交易""" def __init__(self): self.maPeriod = 26 self.stdPeriod = 150 self.stdRange = 1 self.symbol = '399673' self.period = max(self.maPeriod, self.stdPeriod, self.stdRange) + 1 # T0相关变量 self.t0_bought_today = {} # 当日买入的股票 {symbol: volume} self.t0_available_to_sell = {} # T0可用卖出数量 # 信号记录 self.signals: List[TradingSignal] = [] def fetch_realtime_data(self) -> Optional[pd.Series]: """获取实时数据 - 使用新浪API(策略计算不需要成交量)""" if not REQUESTS_AVAILABLE: print("⚠️ requests模块不可用") return None try: print("正在获取实时行情数据...") # 新浪指数实时行情API code = self.symbol.replace('sz', '').replace('sh', '') # 判断前缀 - 399xxx是深交所指数,使用sz前缀 if self.symbol.startswith('sz') or (len(code) == 6 and code.startswith('3')): prefix = 'sz' elif self.symbol.startswith('sh') or (len(code) == 6 and code.startswith('0')): prefix = 'sh' else: prefix = 'sz' sina_url = f'http://hq.sinajs.cn/list={prefix}{code}' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'http://finance.sina.com.cn' } response = requests.get(sina_url, headers=headers, timeout=10) if response.status_code == 200: # 修复编码问题 response.encoding = 'gbk' content = response.text if content and '"' in content: # 解析新浪数据格式 data_str = content.split('"')[1] parts = data_str.split(',') # 新浪数据格式: 名称,今开,昨收,现价,最高,最低,... if len(parts) >= 6: try: name = parts[0] open_price = float(parts[1]) close_prev = float(parts[2]) current_price = float(parts[3]) high_price = float(parts[4]) low_price = float(parts[5]) # 数据有效性检查 if current_price > 0: change_pct = ((current_price - close_prev) / close_prev) * 100 print(f"✅ {name} 现价={current_price:.2f} ({change_pct:+.2f}%), " f"今开={open_price:.2f}, 最高={high_price:.2f}, 最低={low_price:.2f}") return pd.Series({ 'date': datetime.now(), 'open': open_price, 'high': high_price, 'low': low_price, 'close': current_price, 'volume': 0 # 策略不需要成交量 }) except (ValueError, IndexError) as e: print(f"⚠️ 数据解析失败: {e}") except requests.exceptions.Timeout: print("⚠️ 请求超时") except requests.exceptions.ConnectionError: print("⚠️ 网络连接失败") except Exception as e: print(f"⚠️ 获取实时数据失败: {e}") return None def _format_index_code(self, symbol: str) -> str: """格式化指数代码""" symbol = symbol.strip() if len(symbol) == 6: if symbol.startswith(('00', '30')): return f"sz{symbol}" elif symbol.startswith(('60', '68')): return f"sh{symbol}" return symbol.lower() def fetch_data(self, start_date: str, end_date: str) -> pd.DataFrame: """获取数据""" print(f"获取历史数据: {start_date} 至 {end_date}") data_fetcher = DataFetcherV2() data = data_fetcher.fetch_index_data_v2( symbol=self.symbol, start_date=start_date, end_date=end_date ) if data.empty: print("❌ 数据获取失败") return pd.DataFrame() # 检查数据的实际日期范围 actual_start = data.index[0].strftime('%Y-%m-%d') actual_end = data.index[-1].strftime('%Y-%m-%d') print(f"✅ 获取到 {len(data)} 条历史数据") print(f" 数据范围: {actual_start} 至 {actual_end}") # 检查是否有今天的数据 today = datetime.now() today_str = today.strftime('%Y-%m-%d') actual_end_dt = pd.to_datetime(actual_end) if actual_end_dt.date() < today.date(): print(f"\n⚠️ 历史数据未包含今天({today_str}),最后交易日为 {actual_end}") # 检查今天是否是交易日(排除周末) if today.weekday() < 5: # 0-4是周一到周五 print(f"今天是{['周一', '周二', '周三', '周四', '周五'][today.weekday()]},尝试获取实时数据...") # 尝试获取实时数据 realtime = self.fetch_realtime_data() if realtime is not None: # 将实时数据添加到历史数据 realtime_df = pd.DataFrame([realtime]) realtime_df = realtime_df.set_index('date') realtime_df.index = pd.to_datetime(realtime_df.index) # 使用concat合并数据 data = pd.concat([data, realtime_df], ignore_index=False) # 重新排序索引 data = data.sort_index() print(f" ✅ 已添加今日({today_str})实时数据,总数据量: {len(data)} 条") else: print(f" ⚠️ 未能获取实时数据,将使用历史数据分析") else: print(f" ℹ️ 今天是周末,非交易日") return data def calculate_indicators(self, data: pd.DataFrame) -> dict: """计算技术指标""" if data.empty or len(data) < self.period: return None close = data['close'].values low = data['low'].values high = data['high'].values # 计算EMA H1_5 = MyTT.EMA(close, 8) H2_5 = MyTT.EMA(H1_5, 20) # 计算CROSS try: H1H2_CROSS = MyTT.CROSS(H1_5, H2_5) H2H1_CROSS = MyTT.CROSS(H2_5, H1_5) except: H1H2_CROSS = (H1_5 > H2_5) & (np.roll(H1_5, 1) <= np.roll(H2_5, 1)) H2H1_CROSS = (H2_5 > H1_5) & (np.roll(H2_5, 1) <= np.roll(H1_5, 1)) # KDJ相关计算 rsv = (close - MyTT.LLV(low, 7)) / (MyTT.HHV(high, 7) - MyTT.LLV(low, 7)) * 100 rsv = np.nan_to_num(rsv) Y0 = MyTT.SMA(rsv, 3, 1) Y0 = np.nan_to_num(Y0) Y1 = MyTT.SMA(Y0, 3, 1) Y1 = np.nan_to_num(Y1) try: CROSS_Y0_Y1 = MyTT.CROSS(Y0, Y1) CROSS_Y1_Y0 = MyTT.CROSS(Y1, Y0) except: CROSS_Y0_Y1 = (Y0 > Y1) & (np.roll(Y0, 1) <= np.roll(Y1, 1)) CROSS_Y1_Y0 = (Y1 > Y0) & (np.roll(Y1, 1) <= np.roll(Y0, 1)) RSV1 = (close - MyTT.LLV(low, 38)) / (MyTT.HHV(high, 38) - MyTT.LLV(low, 38)) * 100 RSV1 = np.nan_to_num(RSV1) Y2 = MyTT.SMA(RSV1, 5, 1) Y2 = np.nan_to_num(Y2) Y3 = MyTT.SMA(Y2, 10, 1) Y3 = np.nan_to_num(Y3) a1 = (H1_5[-1] - H2_5[-1]) / ((H1_5[-1] + H2_5[-1]) / 2) if (H1_5[-1] + H2_5[-1]) / 2 != 0 else 0 b1 = (Y2[-1] - Y3[-1]) / 100 def safe_bool(value): if isinstance(value, (bool, np.bool_)): return bool(value) elif isinstance(value, (int, float)): return bool(value) else: try: return bool(value[-1] if hasattr(value, '__iter__') else value) except: return False return { 'H1_5': H1_5[-1], 'H2_5': H2_5[-1], 'Y0': Y0[-1], 'Y1': Y1[-1], 'Y2': Y2[-1], 'Y3': Y3[-1], 'a1': a1, 'b1': b1, 'cross_y0_y1': safe_bool(CROSS_Y0_Y1[-1]), 'cross_y1_y0': safe_bool(CROSS_Y1_Y0[-1]), } def analyze_signal( self, indicators: dict, has_position: bool, current_date: datetime, close_price: float ) -> TradingSignal: """分析交易信号""" y0 = indicators['Y0'] y1 = indicators['Y1'] y2 = indicators['Y2'] y3 = indicators['Y3'] h1 = indicators['H1_5'] h2 = indicators['H2_5'] a1 = indicators['a1'] b1 = indicators['b1'] cross_y0_y1 = indicators['cross_y0_y1'] cross_y1_y0 = indicators['cross_y1_y0'] signal_type = 'NONE' reason = '' # 1. 首先检查交叉信号条件 if not cross_y0_y1 or not cross_y1_y0: # 交叉条件不满足 cross_status = [] if not cross_y0_y1: cross_status.append("Y0未上穿Y1") if not cross_y1_y0: cross_status.append("Y1未上穿Y0") reason = f"交叉条件不满足: {', '.join(cross_status)}" # 2. 交叉条件满足后的买入判断 elif y0 > y1 and b1 > 0 and (a1 > -0.02 or a1 < 0.02): # 检查排除条件 if a1 < -0.04: reason = f"买入信号排除: a1({a1:.4f}) < -0.04 (趋势过弱)" elif b1 < -0.17: reason = f"买入信号排除: b1({b1:.4f}) < -0.17 (动量过弱)" elif not has_position: signal_type = 'BUY' reason = f"买入信号触发: Y0({y0:.2f})>Y1({y1:.2f}), b1({b1:.4f})>0" else: reason = f"已有持仓,不追加买入: Y0({y0:.2f})>Y1({y1:.2f}), b1({b1:.4f})>0" # 3. 交叉条件满足后的卖出判断 elif y0 <= y1 and (a1 > -0.02 or a1 < 0.02): # 检查排除条件 if a1 > 0.05: reason = f"卖出信号排除: a1({a1:.4f}) > 0.05 (趋势过强)" elif has_position: signal_type = 'SELL' reason = f"卖出信号触发: Y0({y0:.2f})<=Y1({y1:.2f}), a1({a1:.4f})条件满足" else: reason = f"无持仓,无法卖出: Y0({y0:.2f})<=Y1({y1:.2f})" # 4. 其他情况 else: conditions = [] if y0 <= y1: conditions.append(f"Y0({y0:.2f})<=Y1({y1:.2f})") else: conditions.append(f"Y0({y0:.2f})>Y1({y1:.2f})") if b1 <= 0: conditions.append(f"b1({b1:.4f})<=0") if not (a1 > -0.02 or a1 < 0.02): conditions.append(f"a1({a1:.4f})不在[-0.02, 0.02]范围") reason = f"不满足任何交易条件: {', '.join(conditions)}" return TradingSignal( date=current_date, symbol=self.symbol, close_price=close_price, y0=y0, y1=y1, y2=y2, y3=y3, h1=h1, h2=h2, a1=a1, b1=b1, cross_y0_y1=cross_y0_y1, cross_y1_y0=cross_y1_y0, signal_type=signal_type, reason=reason, position_status="持仓" if has_position else "空仓" ) def run_t0_backtest( self, start_date: str, end_date: str, initial_cash: float = 1000000 ): """运行T0回测并分析信号""" print("=" * 80) print("T0策略信号分析") print("=" * 80) # 获取数据 data = self.fetch_data(start_date, end_date) if data.empty: return # 模拟持仓和现金 cash = initial_cash position_volume = 0 position_cost = 0 # 滑动窗口分析 window_size = self.period print(f"\n开始分析交易信号...") print("-" * 80) for i in range(window_size, len(data)): current_date = data.index[i] current_data = data.iloc[:i+1] # 检查是否有足够数据 if len(current_data) < self.period: continue # 计算指标 indicators = self.calculate_indicators(current_data) if indicators is None: continue close_price = data.iloc[i]['close'] # 分析信号 has_position = position_volume > 0 signal = self.analyze_signal( indicators, has_position, current_date, close_price ) self.signals.append(signal) # T0交易模拟 if signal.signal_type == 'BUY': # 计算可买入数量 volume = int(cash / close_price) volume = max(100, (volume // 100) * 100) if volume > 0: cost = volume * close_price cash -= cost # T0: 更新持仓成本和数量(使用加权平均) if position_volume > 0: total_cost = position_cost * position_volume + cost position_volume += volume position_cost = total_cost / position_volume else: position_volume = volume position_cost = close_price elif signal.signal_type == 'SELL': if position_volume > 0: # T0: 卖出全部持仓 proceeds = position_volume * close_price cash += proceeds position_volume = 0 position_cost = 0 # 打印最近10天的信号 self.print_recent_signals(days=10) def print_recent_signals(self, days: int = 10): """打印最近N天的交易信号""" print("\n" + "=" * 140) print(f"最近 {days} 个交易日信号分析") print("=" * 140) recent_signals = self.signals[-days:] if len(self.signals) >= days else self.signals for signal in recent_signals: # 打印日期和价格 print(f"\n📅 {signal.date.strftime('%Y-%m-%d')} | 收盘价: {signal.close_price:.2f} | 持仓: {signal.position_status}") # 打印指标值 print(f" 指标: Y0={signal.y0:.2f} Y1={signal.y1:.2f} Y2={signal.y2:.2f} Y3={signal.y3:.2f} | " f"H1={signal.h1:.2f} H2={signal.h2:.2f} | a1={signal.a1:.4f} b1={signal.b1:.4f}") # 打印交叉状态 cross_str = "" if signal.cross_y0_y1: cross_str += "Y0↑Y1 " if signal.cross_y1_y0: cross_str += "Y1↑Y0 " print(f" 交叉: {cross_str if cross_str else '无交叉'}") # 打印信号 if signal.signal_type == 'BUY': print(f" 🟢 买入信号: {signal.reason}") elif signal.signal_type == 'SELL': print(f" 🔴 卖出信号: {signal.reason}") else: print(f" ⚪ 无信号: {signal.reason}") print("\n" + "=" * 140) def export_signals_to_csv(self, filename: str = "t0_signals.csv"): """导出信号到CSV""" if not self.signals: print("没有信号可导出") return data = [] for signal in self.signals: data.append({ 'date': signal.date.strftime('%Y-%m-%d'), 'symbol': signal.symbol, 'close_price': signal.close_price, 'y0': signal.y0, 'y1': signal.y1, 'y2': signal.y2, 'y3': signal.y3, 'h1': signal.h1, 'h2': signal.h2, 'a1': signal.a1, 'b1': signal.b1, 'cross_y0_y1': signal.cross_y0_y1, 'cross_y1_y0': signal.cross_y1_y0, 'signal_type': signal.signal_type, 'reason': signal.reason, 'position_status': signal.position_status }) df = pd.DataFrame(data) df.to_csv(filename, index=False, encoding='utf-8-sig') print(f"✅ 信号已导出到: {filename}") def main(): """主函数""" # 配置 - 扩大时间范围以确保有足够数据 END_DATE = datetime.now().strftime('%Y-%m-%d') START_DATE = (datetime.now() - timedelta(days=365*2)).strftime('%Y-%m-%d') # 获取2年数据 print(f"分析期间: {START_DATE} 至 {END_DATE}") # 创建分析器 analyzer = T0StrategyAnalyzer() # 运行分析 analyzer.run_t0_backtest( start_date=START_DATE, end_date=END_DATE, initial_cash=1000000 ) # 导出信号 analyzer.export_signals_to_csv("t0_signals.csv") if __name__ == '__main__': main()