t0_signal_analyzer.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. #!/usr/bin/env python3
  2. # coding=utf-8
  3. """
  4. T0交易信号分析器
  5. 支持日内T0交易,打印最近10天的买卖交易信号及原因
  6. """
  7. import sys
  8. import io
  9. import pandas as pd
  10. import numpy as np
  11. from datetime import datetime, timedelta
  12. from typing import Dict, List, Optional
  13. from dataclasses import dataclass
  14. # 设置标准输出为UTF-8编码
  15. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  16. from data_fetcher_v2 import DataFetcherV2, DataManagerV2
  17. import MyTT
  18. # 尝试导入requests用于实时数据
  19. try:
  20. import requests
  21. REQUESTS_AVAILABLE = True
  22. except ImportError:
  23. REQUESTS_AVAILABLE = False
  24. print("⚠️ requests未安装,无法获取实时数据")
  25. @dataclass
  26. class TradingSignal:
  27. """交易信号记录"""
  28. date: datetime
  29. symbol: str
  30. close_price: float
  31. y0: float
  32. y1: float
  33. y2: float
  34. y3: float
  35. h1: float
  36. h2: float
  37. a1: float
  38. b1: float
  39. cross_y0_y1: bool
  40. cross_y1_y0: bool
  41. signal_type: str # 'BUY', 'SELL', 'NONE'
  42. reason: str # 信号原因或不触发原因
  43. position_status: str # 当前持仓状态
  44. class T0StrategyAnalyzer:
  45. """T0策略分析器 - 支持日内交易"""
  46. def __init__(self):
  47. self.maPeriod = 26
  48. self.stdPeriod = 150
  49. self.stdRange = 1
  50. self.symbol = '399673'
  51. self.period = max(self.maPeriod, self.stdPeriod, self.stdRange) + 1
  52. # T0相关变量
  53. self.t0_bought_today = {} # 当日买入的股票 {symbol: volume}
  54. self.t0_available_to_sell = {} # T0可用卖出数量
  55. # 信号记录
  56. self.signals: List[TradingSignal] = []
  57. def fetch_realtime_data(self) -> Optional[pd.Series]:
  58. """获取实时数据 - 使用新浪API(策略计算不需要成交量)"""
  59. if not REQUESTS_AVAILABLE:
  60. print("⚠️ requests模块不可用")
  61. return None
  62. try:
  63. print("正在获取实时行情数据...")
  64. # 新浪指数实时行情API
  65. code = self.symbol.replace('sz', '').replace('sh', '')
  66. # 判断前缀 - 399xxx是深交所指数,使用sz前缀
  67. if self.symbol.startswith('sz') or (len(code) == 6 and code.startswith('3')):
  68. prefix = 'sz'
  69. elif self.symbol.startswith('sh') or (len(code) == 6 and code.startswith('0')):
  70. prefix = 'sh'
  71. else:
  72. prefix = 'sz'
  73. sina_url = f'http://hq.sinajs.cn/list={prefix}{code}'
  74. headers = {
  75. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
  76. 'Referer': 'http://finance.sina.com.cn'
  77. }
  78. response = requests.get(sina_url, headers=headers, timeout=10)
  79. if response.status_code == 200:
  80. # 修复编码问题
  81. response.encoding = 'gbk'
  82. content = response.text
  83. if content and '"' in content:
  84. # 解析新浪数据格式
  85. data_str = content.split('"')[1]
  86. parts = data_str.split(',')
  87. # 新浪数据格式: 名称,今开,昨收,现价,最高,最低,...
  88. if len(parts) >= 6:
  89. try:
  90. name = parts[0]
  91. open_price = float(parts[1])
  92. close_prev = float(parts[2])
  93. current_price = float(parts[3])
  94. high_price = float(parts[4])
  95. low_price = float(parts[5])
  96. # 数据有效性检查
  97. if current_price > 0:
  98. change_pct = ((current_price - close_prev) / close_prev) * 100
  99. print(f"✅ {name} 现价={current_price:.2f} ({change_pct:+.2f}%), "
  100. f"今开={open_price:.2f}, 最高={high_price:.2f}, 最低={low_price:.2f}")
  101. return pd.Series({
  102. 'date': datetime.now(),
  103. 'open': open_price,
  104. 'high': high_price,
  105. 'low': low_price,
  106. 'close': current_price,
  107. 'volume': 0 # 策略不需要成交量
  108. })
  109. except (ValueError, IndexError) as e:
  110. print(f"⚠️ 数据解析失败: {e}")
  111. except requests.exceptions.Timeout:
  112. print("⚠️ 请求超时")
  113. except requests.exceptions.ConnectionError:
  114. print("⚠️ 网络连接失败")
  115. except Exception as e:
  116. print(f"⚠️ 获取实时数据失败: {e}")
  117. return None
  118. def _format_index_code(self, symbol: str) -> str:
  119. """格式化指数代码"""
  120. symbol = symbol.strip()
  121. if len(symbol) == 6:
  122. if symbol.startswith(('00', '30')):
  123. return f"sz{symbol}"
  124. elif symbol.startswith(('60', '68')):
  125. return f"sh{symbol}"
  126. return symbol.lower()
  127. def fetch_data(self, start_date: str, end_date: str) -> pd.DataFrame:
  128. """获取数据"""
  129. print(f"获取历史数据: {start_date} 至 {end_date}")
  130. data_fetcher = DataFetcherV2()
  131. data = data_fetcher.fetch_index_data_v2(
  132. symbol=self.symbol,
  133. start_date=start_date,
  134. end_date=end_date
  135. )
  136. if data.empty:
  137. print("❌ 数据获取失败")
  138. return pd.DataFrame()
  139. # 检查数据的实际日期范围
  140. actual_start = data.index[0].strftime('%Y-%m-%d')
  141. actual_end = data.index[-1].strftime('%Y-%m-%d')
  142. print(f"✅ 获取到 {len(data)} 条历史数据")
  143. print(f" 数据范围: {actual_start} 至 {actual_end}")
  144. # 检查是否有今天的数据
  145. today = datetime.now()
  146. today_str = today.strftime('%Y-%m-%d')
  147. actual_end_dt = pd.to_datetime(actual_end)
  148. if actual_end_dt.date() < today.date():
  149. print(f"\n⚠️ 历史数据未包含今天({today_str}),最后交易日为 {actual_end}")
  150. # 检查今天是否是交易日(排除周末)
  151. if today.weekday() < 5: # 0-4是周一到周五
  152. print(f"今天是{['周一', '周二', '周三', '周四', '周五'][today.weekday()]},尝试获取实时数据...")
  153. # 尝试获取实时数据
  154. realtime = self.fetch_realtime_data()
  155. if realtime is not None:
  156. # 将实时数据添加到历史数据
  157. realtime_df = pd.DataFrame([realtime])
  158. realtime_df = realtime_df.set_index('date')
  159. realtime_df.index = pd.to_datetime(realtime_df.index)
  160. # 使用concat合并数据
  161. data = pd.concat([data, realtime_df], ignore_index=False)
  162. # 重新排序索引
  163. data = data.sort_index()
  164. print(f" ✅ 已添加今日({today_str})实时数据,总数据量: {len(data)} 条")
  165. else:
  166. print(f" ⚠️ 未能获取实时数据,将使用历史数据分析")
  167. else:
  168. print(f" ℹ️ 今天是周末,非交易日")
  169. return data
  170. def calculate_indicators(self, data: pd.DataFrame) -> dict:
  171. """计算技术指标"""
  172. if data.empty or len(data) < self.period:
  173. return None
  174. close = data['close'].values
  175. low = data['low'].values
  176. high = data['high'].values
  177. # 计算EMA
  178. H1_5 = MyTT.EMA(close, 8)
  179. H2_5 = MyTT.EMA(H1_5, 20)
  180. # 计算CROSS
  181. try:
  182. H1H2_CROSS = MyTT.CROSS(H1_5, H2_5)
  183. H2H1_CROSS = MyTT.CROSS(H2_5, H1_5)
  184. except:
  185. H1H2_CROSS = (H1_5 > H2_5) & (np.roll(H1_5, 1) <= np.roll(H2_5, 1))
  186. H2H1_CROSS = (H2_5 > H1_5) & (np.roll(H2_5, 1) <= np.roll(H1_5, 1))
  187. # KDJ相关计算
  188. rsv = (close - MyTT.LLV(low, 7)) / (MyTT.HHV(high, 7) - MyTT.LLV(low, 7)) * 100
  189. rsv = np.nan_to_num(rsv)
  190. Y0 = MyTT.SMA(rsv, 3, 1)
  191. Y0 = np.nan_to_num(Y0)
  192. Y1 = MyTT.SMA(Y0, 3, 1)
  193. Y1 = np.nan_to_num(Y1)
  194. try:
  195. CROSS_Y0_Y1 = MyTT.CROSS(Y0, Y1)
  196. CROSS_Y1_Y0 = MyTT.CROSS(Y1, Y0)
  197. except:
  198. CROSS_Y0_Y1 = (Y0 > Y1) & (np.roll(Y0, 1) <= np.roll(Y1, 1))
  199. CROSS_Y1_Y0 = (Y1 > Y0) & (np.roll(Y1, 1) <= np.roll(Y0, 1))
  200. RSV1 = (close - MyTT.LLV(low, 38)) / (MyTT.HHV(high, 38) - MyTT.LLV(low, 38)) * 100
  201. RSV1 = np.nan_to_num(RSV1)
  202. Y2 = MyTT.SMA(RSV1, 5, 1)
  203. Y2 = np.nan_to_num(Y2)
  204. Y3 = MyTT.SMA(Y2, 10, 1)
  205. Y3 = np.nan_to_num(Y3)
  206. a1 = (H1_5[-1] - H2_5[-1]) / ((H1_5[-1] + H2_5[-1]) / 2) if (H1_5[-1] + H2_5[-1]) / 2 != 0 else 0
  207. b1 = (Y2[-1] - Y3[-1]) / 100
  208. def safe_bool(value):
  209. if isinstance(value, (bool, np.bool_)):
  210. return bool(value)
  211. elif isinstance(value, (int, float)):
  212. return bool(value)
  213. else:
  214. try:
  215. return bool(value[-1] if hasattr(value, '__iter__') else value)
  216. except:
  217. return False
  218. return {
  219. 'H1_5': H1_5[-1],
  220. 'H2_5': H2_5[-1],
  221. 'Y0': Y0[-1],
  222. 'Y1': Y1[-1],
  223. 'Y2': Y2[-1],
  224. 'Y3': Y3[-1],
  225. 'a1': a1,
  226. 'b1': b1,
  227. 'cross_y0_y1': safe_bool(CROSS_Y0_Y1[-1]),
  228. 'cross_y1_y0': safe_bool(CROSS_Y1_Y0[-1]),
  229. }
  230. def analyze_signal(
  231. self,
  232. indicators: dict,
  233. has_position: bool,
  234. current_date: datetime,
  235. close_price: float
  236. ) -> TradingSignal:
  237. """分析交易信号"""
  238. y0 = indicators['Y0']
  239. y1 = indicators['Y1']
  240. y2 = indicators['Y2']
  241. y3 = indicators['Y3']
  242. h1 = indicators['H1_5']
  243. h2 = indicators['H2_5']
  244. a1 = indicators['a1']
  245. b1 = indicators['b1']
  246. cross_y0_y1 = indicators['cross_y0_y1']
  247. cross_y1_y0 = indicators['cross_y1_y0']
  248. signal_type = 'NONE'
  249. reason = ''
  250. # 1. 首先检查交叉信号条件
  251. if not cross_y0_y1 or not cross_y1_y0:
  252. # 交叉条件不满足
  253. cross_status = []
  254. if not cross_y0_y1:
  255. cross_status.append("Y0未上穿Y1")
  256. if not cross_y1_y0:
  257. cross_status.append("Y1未上穿Y0")
  258. reason = f"交叉条件不满足: {', '.join(cross_status)}"
  259. # 2. 交叉条件满足后的买入判断
  260. elif y0 > y1 and b1 > 0 and (a1 > -0.02 or a1 < 0.02):
  261. # 检查排除条件
  262. if a1 < -0.04:
  263. reason = f"买入信号排除: a1({a1:.4f}) < -0.04 (趋势过弱)"
  264. elif b1 < -0.17:
  265. reason = f"买入信号排除: b1({b1:.4f}) < -0.17 (动量过弱)"
  266. elif not has_position:
  267. signal_type = 'BUY'
  268. reason = f"买入信号触发: Y0({y0:.2f})>Y1({y1:.2f}), b1({b1:.4f})>0"
  269. else:
  270. reason = f"已有持仓,不追加买入: Y0({y0:.2f})>Y1({y1:.2f}), b1({b1:.4f})>0"
  271. # 3. 交叉条件满足后的卖出判断
  272. elif y0 <= y1 and (a1 > -0.02 or a1 < 0.02):
  273. # 检查排除条件
  274. if a1 > 0.05:
  275. reason = f"卖出信号排除: a1({a1:.4f}) > 0.05 (趋势过强)"
  276. elif has_position:
  277. signal_type = 'SELL'
  278. reason = f"卖出信号触发: Y0({y0:.2f})<=Y1({y1:.2f}), a1({a1:.4f})条件满足"
  279. else:
  280. reason = f"无持仓,无法卖出: Y0({y0:.2f})<=Y1({y1:.2f})"
  281. # 4. 其他情况
  282. else:
  283. conditions = []
  284. if y0 <= y1:
  285. conditions.append(f"Y0({y0:.2f})<=Y1({y1:.2f})")
  286. else:
  287. conditions.append(f"Y0({y0:.2f})>Y1({y1:.2f})")
  288. if b1 <= 0:
  289. conditions.append(f"b1({b1:.4f})<=0")
  290. if not (a1 > -0.02 or a1 < 0.02):
  291. conditions.append(f"a1({a1:.4f})不在[-0.02, 0.02]范围")
  292. reason = f"不满足任何交易条件: {', '.join(conditions)}"
  293. return TradingSignal(
  294. date=current_date,
  295. symbol=self.symbol,
  296. close_price=close_price,
  297. y0=y0,
  298. y1=y1,
  299. y2=y2,
  300. y3=y3,
  301. h1=h1,
  302. h2=h2,
  303. a1=a1,
  304. b1=b1,
  305. cross_y0_y1=cross_y0_y1,
  306. cross_y1_y0=cross_y1_y0,
  307. signal_type=signal_type,
  308. reason=reason,
  309. position_status="持仓" if has_position else "空仓"
  310. )
  311. def run_t0_backtest(
  312. self,
  313. start_date: str,
  314. end_date: str,
  315. initial_cash: float = 1000000
  316. ):
  317. """运行T0回测并分析信号"""
  318. print("=" * 80)
  319. print("T0策略信号分析")
  320. print("=" * 80)
  321. # 获取数据
  322. data = self.fetch_data(start_date, end_date)
  323. if data.empty:
  324. return
  325. # 模拟持仓和现金
  326. cash = initial_cash
  327. position_volume = 0
  328. position_cost = 0
  329. # 滑动窗口分析
  330. window_size = self.period
  331. print(f"\n开始分析交易信号...")
  332. print("-" * 80)
  333. for i in range(window_size, len(data)):
  334. current_date = data.index[i]
  335. current_data = data.iloc[:i+1]
  336. # 检查是否有足够数据
  337. if len(current_data) < self.period:
  338. continue
  339. # 计算指标
  340. indicators = self.calculate_indicators(current_data)
  341. if indicators is None:
  342. continue
  343. close_price = data.iloc[i]['close']
  344. # 分析信号
  345. has_position = position_volume > 0
  346. signal = self.analyze_signal(
  347. indicators,
  348. has_position,
  349. current_date,
  350. close_price
  351. )
  352. self.signals.append(signal)
  353. # T0交易模拟
  354. if signal.signal_type == 'BUY':
  355. # 计算可买入数量
  356. volume = int(cash / close_price)
  357. volume = max(100, (volume // 100) * 100)
  358. if volume > 0:
  359. cost = volume * close_price
  360. cash -= cost
  361. # T0: 更新持仓成本和数量(使用加权平均)
  362. if position_volume > 0:
  363. total_cost = position_cost * position_volume + cost
  364. position_volume += volume
  365. position_cost = total_cost / position_volume
  366. else:
  367. position_volume = volume
  368. position_cost = close_price
  369. elif signal.signal_type == 'SELL':
  370. if position_volume > 0:
  371. # T0: 卖出全部持仓
  372. proceeds = position_volume * close_price
  373. cash += proceeds
  374. position_volume = 0
  375. position_cost = 0
  376. # 打印最近10天的信号
  377. self.print_recent_signals(days=10)
  378. def print_recent_signals(self, days: int = 10):
  379. """打印最近N天的交易信号"""
  380. print("\n" + "=" * 140)
  381. print(f"最近 {days} 个交易日信号分析")
  382. print("=" * 140)
  383. recent_signals = self.signals[-days:] if len(self.signals) >= days else self.signals
  384. for signal in recent_signals:
  385. # 打印日期和价格
  386. print(f"\n📅 {signal.date.strftime('%Y-%m-%d')} | 收盘价: {signal.close_price:.2f} | 持仓: {signal.position_status}")
  387. # 打印指标值
  388. print(f" 指标: Y0={signal.y0:.2f} Y1={signal.y1:.2f} Y2={signal.y2:.2f} Y3={signal.y3:.2f} | "
  389. f"H1={signal.h1:.2f} H2={signal.h2:.2f} | a1={signal.a1:.4f} b1={signal.b1:.4f}")
  390. # 打印交叉状态
  391. cross_str = ""
  392. if signal.cross_y0_y1:
  393. cross_str += "Y0↑Y1 "
  394. if signal.cross_y1_y0:
  395. cross_str += "Y1↑Y0 "
  396. print(f" 交叉: {cross_str if cross_str else '无交叉'}")
  397. # 打印信号
  398. if signal.signal_type == 'BUY':
  399. print(f" 🟢 买入信号: {signal.reason}")
  400. elif signal.signal_type == 'SELL':
  401. print(f" 🔴 卖出信号: {signal.reason}")
  402. else:
  403. print(f" ⚪ 无信号: {signal.reason}")
  404. print("\n" + "=" * 140)
  405. def export_signals_to_csv(self, filename: str = "t0_signals.csv"):
  406. """导出信号到CSV"""
  407. if not self.signals:
  408. print("没有信号可导出")
  409. return
  410. data = []
  411. for signal in self.signals:
  412. data.append({
  413. 'date': signal.date.strftime('%Y-%m-%d'),
  414. 'symbol': signal.symbol,
  415. 'close_price': signal.close_price,
  416. 'y0': signal.y0,
  417. 'y1': signal.y1,
  418. 'y2': signal.y2,
  419. 'y3': signal.y3,
  420. 'h1': signal.h1,
  421. 'h2': signal.h2,
  422. 'a1': signal.a1,
  423. 'b1': signal.b1,
  424. 'cross_y0_y1': signal.cross_y0_y1,
  425. 'cross_y1_y0': signal.cross_y1_y0,
  426. 'signal_type': signal.signal_type,
  427. 'reason': signal.reason,
  428. 'position_status': signal.position_status
  429. })
  430. df = pd.DataFrame(data)
  431. df.to_csv(filename, index=False, encoding='utf-8-sig')
  432. print(f"✅ 信号已导出到: {filename}")
  433. def main():
  434. """主函数"""
  435. # 配置 - 扩大时间范围以确保有足够数据
  436. END_DATE = datetime.now().strftime('%Y-%m-%d')
  437. START_DATE = (datetime.now() - timedelta(days=365*2)).strftime('%Y-%m-%d') # 获取2年数据
  438. print(f"分析期间: {START_DATE} 至 {END_DATE}")
  439. # 创建分析器
  440. analyzer = T0StrategyAnalyzer()
  441. # 运行分析
  442. analyzer.run_t0_backtest(
  443. start_date=START_DATE,
  444. end_date=END_DATE,
  445. initial_cash=1000000
  446. )
  447. # 导出信号
  448. analyzer.export_signals_to_csv("t0_signals.csv")
  449. if __name__ == '__main__':
  450. main()