| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- #!/usr/bin/env python3
- # coding=utf-8
- """
- T0交易信号分析器
- 支持日内T0交易,打印最近10天的买卖交易信号及原因
- """
- import sys
- import io
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional
- from dataclasses import dataclass
- # 设置标准输出为UTF-8编码
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- from data_fetcher_v2 import DataFetcherV2, DataManagerV2
- import MyTT
- # 尝试导入requests用于实时数据
- try:
- import requests
- REQUESTS_AVAILABLE = True
- except ImportError:
- REQUESTS_AVAILABLE = False
- print("⚠️ requests未安装,无法获取实时数据")
- @dataclass
- class TradingSignal:
- """交易信号记录"""
- date: datetime
- symbol: str
- close_price: float
- y0: float
- y1: float
- y2: float
- y3: float
- h1: float
- h2: float
- a1: float
- b1: float
- cross_y0_y1: bool
- cross_y1_y0: bool
- signal_type: str # 'BUY', 'SELL', 'NONE'
- reason: str # 信号原因或不触发原因
- position_status: str # 当前持仓状态
- class T0StrategyAnalyzer:
- """T0策略分析器 - 支持日内交易"""
- def __init__(self):
- self.maPeriod = 26
- self.stdPeriod = 150
- self.stdRange = 1
- self.symbol = '399673'
- self.period = max(self.maPeriod, self.stdPeriod, self.stdRange) + 1
- # T0相关变量
- self.t0_bought_today = {} # 当日买入的股票 {symbol: volume}
- self.t0_available_to_sell = {} # T0可用卖出数量
- # 信号记录
- self.signals: List[TradingSignal] = []
- def fetch_realtime_data(self) -> Optional[pd.Series]:
- """获取实时数据 - 使用新浪API(策略计算不需要成交量)"""
- if not REQUESTS_AVAILABLE:
- print("⚠️ requests模块不可用")
- return None
- try:
- print("正在获取实时行情数据...")
- # 新浪指数实时行情API
- code = self.symbol.replace('sz', '').replace('sh', '')
- # 判断前缀 - 399xxx是深交所指数,使用sz前缀
- if self.symbol.startswith('sz') or (len(code) == 6 and code.startswith('3')):
- prefix = 'sz'
- elif self.symbol.startswith('sh') or (len(code) == 6 and code.startswith('0')):
- prefix = 'sh'
- else:
- prefix = 'sz'
- sina_url = f'http://hq.sinajs.cn/list={prefix}{code}'
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
- 'Referer': 'http://finance.sina.com.cn'
- }
- response = requests.get(sina_url, headers=headers, timeout=10)
- if response.status_code == 200:
- # 修复编码问题
- response.encoding = 'gbk'
- content = response.text
- if content and '"' in content:
- # 解析新浪数据格式
- data_str = content.split('"')[1]
- parts = data_str.split(',')
- # 新浪数据格式: 名称,今开,昨收,现价,最高,最低,...
- if len(parts) >= 6:
- try:
- name = parts[0]
- open_price = float(parts[1])
- close_prev = float(parts[2])
- current_price = float(parts[3])
- high_price = float(parts[4])
- low_price = float(parts[5])
- # 数据有效性检查
- if current_price > 0:
- change_pct = ((current_price - close_prev) / close_prev) * 100
- print(f"✅ {name} 现价={current_price:.2f} ({change_pct:+.2f}%), "
- f"今开={open_price:.2f}, 最高={high_price:.2f}, 最低={low_price:.2f}")
- return pd.Series({
- 'date': datetime.now(),
- 'open': open_price,
- 'high': high_price,
- 'low': low_price,
- 'close': current_price,
- 'volume': 0 # 策略不需要成交量
- })
- except (ValueError, IndexError) as e:
- print(f"⚠️ 数据解析失败: {e}")
- except requests.exceptions.Timeout:
- print("⚠️ 请求超时")
- except requests.exceptions.ConnectionError:
- print("⚠️ 网络连接失败")
- except Exception as e:
- print(f"⚠️ 获取实时数据失败: {e}")
- return None
- def _format_index_code(self, symbol: str) -> str:
- """格式化指数代码"""
- symbol = symbol.strip()
- if len(symbol) == 6:
- if symbol.startswith(('00', '30')):
- return f"sz{symbol}"
- elif symbol.startswith(('60', '68')):
- return f"sh{symbol}"
- return symbol.lower()
- def fetch_data(self, start_date: str, end_date: str) -> pd.DataFrame:
- """获取数据"""
- print(f"获取历史数据: {start_date} 至 {end_date}")
- data_fetcher = DataFetcherV2()
- data = data_fetcher.fetch_index_data_v2(
- symbol=self.symbol,
- start_date=start_date,
- end_date=end_date
- )
- if data.empty:
- print("❌ 数据获取失败")
- return pd.DataFrame()
- # 检查数据的实际日期范围
- actual_start = data.index[0].strftime('%Y-%m-%d')
- actual_end = data.index[-1].strftime('%Y-%m-%d')
- print(f"✅ 获取到 {len(data)} 条历史数据")
- print(f" 数据范围: {actual_start} 至 {actual_end}")
- # 检查是否有今天的数据
- today = datetime.now()
- today_str = today.strftime('%Y-%m-%d')
- actual_end_dt = pd.to_datetime(actual_end)
- if actual_end_dt.date() < today.date():
- print(f"\n⚠️ 历史数据未包含今天({today_str}),最后交易日为 {actual_end}")
- # 检查今天是否是交易日(排除周末)
- if today.weekday() < 5: # 0-4是周一到周五
- print(f"今天是{['周一', '周二', '周三', '周四', '周五'][today.weekday()]},尝试获取实时数据...")
- # 尝试获取实时数据
- realtime = self.fetch_realtime_data()
- if realtime is not None:
- # 将实时数据添加到历史数据
- realtime_df = pd.DataFrame([realtime])
- realtime_df = realtime_df.set_index('date')
- realtime_df.index = pd.to_datetime(realtime_df.index)
- # 使用concat合并数据
- data = pd.concat([data, realtime_df], ignore_index=False)
- # 重新排序索引
- data = data.sort_index()
- print(f" ✅ 已添加今日({today_str})实时数据,总数据量: {len(data)} 条")
- else:
- print(f" ⚠️ 未能获取实时数据,将使用历史数据分析")
- else:
- print(f" ℹ️ 今天是周末,非交易日")
- return data
- def calculate_indicators(self, data: pd.DataFrame) -> dict:
- """计算技术指标"""
- if data.empty or len(data) < self.period:
- return None
- close = data['close'].values
- low = data['low'].values
- high = data['high'].values
- # 计算EMA
- H1_5 = MyTT.EMA(close, 8)
- H2_5 = MyTT.EMA(H1_5, 20)
- # 计算CROSS
- try:
- H1H2_CROSS = MyTT.CROSS(H1_5, H2_5)
- H2H1_CROSS = MyTT.CROSS(H2_5, H1_5)
- except:
- H1H2_CROSS = (H1_5 > H2_5) & (np.roll(H1_5, 1) <= np.roll(H2_5, 1))
- H2H1_CROSS = (H2_5 > H1_5) & (np.roll(H2_5, 1) <= np.roll(H1_5, 1))
- # KDJ相关计算
- rsv = (close - MyTT.LLV(low, 7)) / (MyTT.HHV(high, 7) - MyTT.LLV(low, 7)) * 100
- rsv = np.nan_to_num(rsv)
- Y0 = MyTT.SMA(rsv, 3, 1)
- Y0 = np.nan_to_num(Y0)
- Y1 = MyTT.SMA(Y0, 3, 1)
- Y1 = np.nan_to_num(Y1)
- try:
- CROSS_Y0_Y1 = MyTT.CROSS(Y0, Y1)
- CROSS_Y1_Y0 = MyTT.CROSS(Y1, Y0)
- except:
- CROSS_Y0_Y1 = (Y0 > Y1) & (np.roll(Y0, 1) <= np.roll(Y1, 1))
- CROSS_Y1_Y0 = (Y1 > Y0) & (np.roll(Y1, 1) <= np.roll(Y0, 1))
- RSV1 = (close - MyTT.LLV(low, 38)) / (MyTT.HHV(high, 38) - MyTT.LLV(low, 38)) * 100
- RSV1 = np.nan_to_num(RSV1)
- Y2 = MyTT.SMA(RSV1, 5, 1)
- Y2 = np.nan_to_num(Y2)
- Y3 = MyTT.SMA(Y2, 10, 1)
- Y3 = np.nan_to_num(Y3)
- a1 = (H1_5[-1] - H2_5[-1]) / ((H1_5[-1] + H2_5[-1]) / 2) if (H1_5[-1] + H2_5[-1]) / 2 != 0 else 0
- b1 = (Y2[-1] - Y3[-1]) / 100
- def safe_bool(value):
- if isinstance(value, (bool, np.bool_)):
- return bool(value)
- elif isinstance(value, (int, float)):
- return bool(value)
- else:
- try:
- return bool(value[-1] if hasattr(value, '__iter__') else value)
- except:
- return False
- return {
- 'H1_5': H1_5[-1],
- 'H2_5': H2_5[-1],
- 'Y0': Y0[-1],
- 'Y1': Y1[-1],
- 'Y2': Y2[-1],
- 'Y3': Y3[-1],
- 'a1': a1,
- 'b1': b1,
- 'cross_y0_y1': safe_bool(CROSS_Y0_Y1[-1]),
- 'cross_y1_y0': safe_bool(CROSS_Y1_Y0[-1]),
- }
- def analyze_signal(
- self,
- indicators: dict,
- has_position: bool,
- current_date: datetime,
- close_price: float
- ) -> TradingSignal:
- """分析交易信号"""
- y0 = indicators['Y0']
- y1 = indicators['Y1']
- y2 = indicators['Y2']
- y3 = indicators['Y3']
- h1 = indicators['H1_5']
- h2 = indicators['H2_5']
- a1 = indicators['a1']
- b1 = indicators['b1']
- cross_y0_y1 = indicators['cross_y0_y1']
- cross_y1_y0 = indicators['cross_y1_y0']
- signal_type = 'NONE'
- reason = ''
- # 1. 首先检查交叉信号条件
- if not cross_y0_y1 or not cross_y1_y0:
- # 交叉条件不满足
- cross_status = []
- if not cross_y0_y1:
- cross_status.append("Y0未上穿Y1")
- if not cross_y1_y0:
- cross_status.append("Y1未上穿Y0")
- reason = f"交叉条件不满足: {', '.join(cross_status)}"
- # 2. 交叉条件满足后的买入判断
- elif y0 > y1 and b1 > 0 and (a1 > -0.02 or a1 < 0.02):
- # 检查排除条件
- if a1 < -0.04:
- reason = f"买入信号排除: a1({a1:.4f}) < -0.04 (趋势过弱)"
- elif b1 < -0.17:
- reason = f"买入信号排除: b1({b1:.4f}) < -0.17 (动量过弱)"
- elif not has_position:
- signal_type = 'BUY'
- reason = f"买入信号触发: Y0({y0:.2f})>Y1({y1:.2f}), b1({b1:.4f})>0"
- else:
- reason = f"已有持仓,不追加买入: Y0({y0:.2f})>Y1({y1:.2f}), b1({b1:.4f})>0"
- # 3. 交叉条件满足后的卖出判断
- elif y0 <= y1 and (a1 > -0.02 or a1 < 0.02):
- # 检查排除条件
- if a1 > 0.05:
- reason = f"卖出信号排除: a1({a1:.4f}) > 0.05 (趋势过强)"
- elif has_position:
- signal_type = 'SELL'
- reason = f"卖出信号触发: Y0({y0:.2f})<=Y1({y1:.2f}), a1({a1:.4f})条件满足"
- else:
- reason = f"无持仓,无法卖出: Y0({y0:.2f})<=Y1({y1:.2f})"
- # 4. 其他情况
- else:
- conditions = []
- if y0 <= y1:
- conditions.append(f"Y0({y0:.2f})<=Y1({y1:.2f})")
- else:
- conditions.append(f"Y0({y0:.2f})>Y1({y1:.2f})")
- if b1 <= 0:
- conditions.append(f"b1({b1:.4f})<=0")
- if not (a1 > -0.02 or a1 < 0.02):
- conditions.append(f"a1({a1:.4f})不在[-0.02, 0.02]范围")
- reason = f"不满足任何交易条件: {', '.join(conditions)}"
- return TradingSignal(
- date=current_date,
- symbol=self.symbol,
- close_price=close_price,
- y0=y0,
- y1=y1,
- y2=y2,
- y3=y3,
- h1=h1,
- h2=h2,
- a1=a1,
- b1=b1,
- cross_y0_y1=cross_y0_y1,
- cross_y1_y0=cross_y1_y0,
- signal_type=signal_type,
- reason=reason,
- position_status="持仓" if has_position else "空仓"
- )
- def run_t0_backtest(
- self,
- start_date: str,
- end_date: str,
- initial_cash: float = 1000000
- ):
- """运行T0回测并分析信号"""
- print("=" * 80)
- print("T0策略信号分析")
- print("=" * 80)
- # 获取数据
- data = self.fetch_data(start_date, end_date)
- if data.empty:
- return
- # 模拟持仓和现金
- cash = initial_cash
- position_volume = 0
- position_cost = 0
- # 滑动窗口分析
- window_size = self.period
- print(f"\n开始分析交易信号...")
- print("-" * 80)
- for i in range(window_size, len(data)):
- current_date = data.index[i]
- current_data = data.iloc[:i+1]
- # 检查是否有足够数据
- if len(current_data) < self.period:
- continue
- # 计算指标
- indicators = self.calculate_indicators(current_data)
- if indicators is None:
- continue
- close_price = data.iloc[i]['close']
- # 分析信号
- has_position = position_volume > 0
- signal = self.analyze_signal(
- indicators,
- has_position,
- current_date,
- close_price
- )
- self.signals.append(signal)
- # T0交易模拟
- if signal.signal_type == 'BUY':
- # 计算可买入数量
- volume = int(cash / close_price)
- volume = max(100, (volume // 100) * 100)
- if volume > 0:
- cost = volume * close_price
- cash -= cost
- # T0: 更新持仓成本和数量(使用加权平均)
- if position_volume > 0:
- total_cost = position_cost * position_volume + cost
- position_volume += volume
- position_cost = total_cost / position_volume
- else:
- position_volume = volume
- position_cost = close_price
- elif signal.signal_type == 'SELL':
- if position_volume > 0:
- # T0: 卖出全部持仓
- proceeds = position_volume * close_price
- cash += proceeds
- position_volume = 0
- position_cost = 0
- # 打印最近10天的信号
- self.print_recent_signals(days=10)
- def print_recent_signals(self, days: int = 10):
- """打印最近N天的交易信号"""
- print("\n" + "=" * 140)
- print(f"最近 {days} 个交易日信号分析")
- print("=" * 140)
- recent_signals = self.signals[-days:] if len(self.signals) >= days else self.signals
- for signal in recent_signals:
- # 打印日期和价格
- print(f"\n📅 {signal.date.strftime('%Y-%m-%d')} | 收盘价: {signal.close_price:.2f} | 持仓: {signal.position_status}")
- # 打印指标值
- print(f" 指标: Y0={signal.y0:.2f} Y1={signal.y1:.2f} Y2={signal.y2:.2f} Y3={signal.y3:.2f} | "
- f"H1={signal.h1:.2f} H2={signal.h2:.2f} | a1={signal.a1:.4f} b1={signal.b1:.4f}")
- # 打印交叉状态
- cross_str = ""
- if signal.cross_y0_y1:
- cross_str += "Y0↑Y1 "
- if signal.cross_y1_y0:
- cross_str += "Y1↑Y0 "
- print(f" 交叉: {cross_str if cross_str else '无交叉'}")
- # 打印信号
- if signal.signal_type == 'BUY':
- print(f" 🟢 买入信号: {signal.reason}")
- elif signal.signal_type == 'SELL':
- print(f" 🔴 卖出信号: {signal.reason}")
- else:
- print(f" ⚪ 无信号: {signal.reason}")
- print("\n" + "=" * 140)
- def export_signals_to_csv(self, filename: str = "t0_signals.csv"):
- """导出信号到CSV"""
- if not self.signals:
- print("没有信号可导出")
- return
- data = []
- for signal in self.signals:
- data.append({
- 'date': signal.date.strftime('%Y-%m-%d'),
- 'symbol': signal.symbol,
- 'close_price': signal.close_price,
- 'y0': signal.y0,
- 'y1': signal.y1,
- 'y2': signal.y2,
- 'y3': signal.y3,
- 'h1': signal.h1,
- 'h2': signal.h2,
- 'a1': signal.a1,
- 'b1': signal.b1,
- 'cross_y0_y1': signal.cross_y0_y1,
- 'cross_y1_y0': signal.cross_y1_y0,
- 'signal_type': signal.signal_type,
- 'reason': signal.reason,
- 'position_status': signal.position_status
- })
- df = pd.DataFrame(data)
- df.to_csv(filename, index=False, encoding='utf-8-sig')
- print(f"✅ 信号已导出到: {filename}")
- def main():
- """主函数"""
- # 配置 - 扩大时间范围以确保有足够数据
- END_DATE = datetime.now().strftime('%Y-%m-%d')
- START_DATE = (datetime.now() - timedelta(days=365*2)).strftime('%Y-%m-%d') # 获取2年数据
- print(f"分析期间: {START_DATE} 至 {END_DATE}")
- # 创建分析器
- analyzer = T0StrategyAnalyzer()
- # 运行分析
- analyzer.run_t0_backtest(
- start_date=START_DATE,
- end_date=END_DATE,
- initial_cash=1000000
- )
- # 导出信号
- analyzer.export_signals_to_csv("t0_signals.csv")
- if __name__ == '__main__':
- main()
|