comfort_zone_analyzer.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. CYB50 策略舒适区分析器
  5. 寻找策略表现较好的市场环境特征,进行标记
  6. """
  7. import sys
  8. import io
  9. # 强制UTF-8输出
  10. if sys.platform == 'win32':
  11. sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
  12. sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
  13. import pandas as pd
  14. import numpy as np
  15. from datetime import datetime, timedelta
  16. import warnings
  17. warnings.filterwarnings('ignore')
  18. from cyb50_30min_dual_direction import (
  19. ConfigManager, IntradayDataFetcher,
  20. DualDirectionSignalGenerator, DualDirectionExecutor
  21. )
  22. from t1_converter import simulate_t1_trades
  23. class MarketEnvironmentAnalyzer:
  24. """市场环境分析器 - 计算各种市场特征指标"""
  25. def __init__(self, data_df):
  26. self.data = data_df.copy()
  27. self._calculate_market_features()
  28. def _calculate_market_features(self):
  29. """计算市场环境特征指标"""
  30. df = self.data
  31. # 1. 趋势特征
  32. df['MA48'] = df['Close'].rolling(window=48).mean() # 24小时
  33. df['Trend_Short'] = np.where(df['Close'] > df['MA6'], 1, -1) # 短期趋势
  34. df['Trend_Mid'] = np.where(df['Close'] > df['MA24'], 1, -1) # 中期趋势
  35. df['Trend_Long'] = np.where(df['MA24'] > df['MA48'], 1, -1) # 长期趋势 (增加MA48)
  36. # 趋势强度 (ADX简化版)
  37. df['Trend_Strength'] = abs(df['Close'] - df['MA24']) / df['ATR']
  38. # 2. 波动率特征
  39. df['Volatility_Short'] = df['Returns'].rolling(12).std() * np.sqrt(48) # 日波动率年化
  40. df['Volatility_Mid'] = df['Returns'].rolling(48).std() * np.sqrt(48)
  41. df['Volatility_Long'] = df['Returns'].rolling(120).std() * np.sqrt(48)
  42. # 波动率分位
  43. df['Volatility_Percentile'] = df['Volatility_Mid'].rolling(120).apply(
  44. lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
  45. )
  46. # 3. 成交量特征
  47. df['Volume_Percentile'] = df['Volume'].rolling(48).apply(
  48. lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
  49. )
  50. # 4. 布林带位置
  51. df['BB_Position'] = (df['Close'] - df['BB_lower']) / (df['BB_upper'] - df['BB_lower'])
  52. # 5. 市场状态分类
  53. df['Market_Regime'] = self._classify_market_regime(df)
  54. # 6. 日内波动特征
  55. df['Intraday_Range'] = (df['High'] - df['Low']) / df['Close']
  56. df['Intraday_Range_Percentile'] = df['Intraday_Range'].rolling(48).apply(
  57. lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
  58. )
  59. # 7. 动量特征
  60. df['Momentum_1d'] = (df['Close'] - df['Close'].shift(8)) / df['Close'].shift(8) # 1日动量
  61. df['Momentum_3d'] = (df['Close'] - df['Close'].shift(24)) / df['Close'].shift(24) # 3日动量
  62. # 8. RSI分位
  63. df['RSI_Percentile'] = df['RSI'].rolling(48).apply(
  64. lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
  65. )
  66. def _classify_market_regime(self, df):
  67. """分类市场状态"""
  68. regime = []
  69. # 预先计算波动率分位数阈值
  70. vol_low_threshold = df['Volatility_Mid'].quantile(0.3)
  71. for i in range(len(df)):
  72. if i < 48:
  73. regime.append('未知')
  74. continue
  75. row = df.iloc[i]
  76. # 基于趋势和波动率分类
  77. if row['Trend_Mid'] > 0 and row['Trend_Strength'] > 1.0:
  78. if row['Volatility_Mid'] < vol_low_threshold:
  79. regime.append('强趋势低波')
  80. else:
  81. regime.append('强趋势高波')
  82. elif row['Trend_Mid'] < 0 and row['Trend_Strength'] > 1.0:
  83. if row['Volatility_Mid'] < vol_low_threshold:
  84. regime.append('下跌趋势低波')
  85. else:
  86. regime.append('下跌趋势高波')
  87. else:
  88. if row['Volatility_Mid'] < vol_low_threshold:
  89. regime.append('震荡低波')
  90. else:
  91. regime.append('震荡高波')
  92. return regime
  93. def get_environment_at_time(self, timestamp):
  94. """获取指定时间的市场环境"""
  95. try:
  96. idx = self.data.index.get_loc(timestamp)
  97. if isinstance(idx, slice):
  98. idx = idx.start
  99. row = self.data.iloc[idx]
  100. return {
  101. 'timestamp': timestamp,
  102. 'trend_short': row['Trend_Short'],
  103. 'trend_mid': row['Trend_Mid'],
  104. 'trend_long': row['Trend_Long'],
  105. 'trend_strength': row['Trend_Strength'],
  106. 'volatility_short': row['Volatility_Short'],
  107. 'volatility_mid': row['Volatility_Mid'],
  108. 'volatility_percentile': row['Volatility_Percentile'],
  109. 'volume_percentile': row['Volume_Percentile'],
  110. 'bb_position': row['BB_Position'],
  111. 'market_regime': row['Market_Regime'],
  112. 'intraday_range_pct': row['Intraday_Range_Percentile'],
  113. 'momentum_1d': row['Momentum_1d'],
  114. 'rsi_percentile': row['RSI_Percentile'],
  115. 'close': row['Close'],
  116. 'rsi': row['RSI']
  117. }
  118. except Exception as e:
  119. return None
  120. class ComfortZoneAnalyzer:
  121. """策略舒适区分析器"""
  122. def __init__(self, trades_df, market_analyzer):
  123. self.trades = trades_df.copy()
  124. self.market = market_analyzer
  125. self.enriched_trades = None
  126. def analyze(self):
  127. """执行舒适区分析"""
  128. print("\n" + "="*80)
  129. print("策略舒适区分析")
  130. print("="*80)
  131. # 1. 为每笔交易添加市场环境标签
  132. self._enrich_trades_with_environment()
  133. # 2. 按市场环境统计表现
  134. self._analyze_by_market_regime()
  135. # 3. 按波动率统计表现
  136. self._analyze_by_volatility()
  137. # 4. 按趋势状态统计表现
  138. self._analyze_by_trend()
  139. # 5. 按布林带位置统计表现
  140. self._analyze_by_bb_position()
  141. # 6. 按RSI状态统计表现
  142. self._analyze_by_rsi()
  143. # 7. 识别最佳组合条件
  144. self._find_best_combinations()
  145. return self.enriched_trades
  146. def _enrich_trades_with_environment(self):
  147. """为交易添加市场环境标签"""
  148. print("\n【步骤1】为每笔交易添加市场环境标签...")
  149. enriched = []
  150. for _, trade in self.trades.iterrows():
  151. entry_time = trade['开仓时间']
  152. # 获取开仓时的市场环境
  153. env = self.market.get_environment_at_time(entry_time)
  154. if env:
  155. trade_data = trade.to_dict()
  156. trade_data.update({
  157. '市场状态': env['market_regime'],
  158. '趋势短期': '上涨' if env['trend_short'] > 0 else '下跌',
  159. '趋势中期': '上涨' if env['trend_mid'] > 0 else '下跌',
  160. '趋势强度': env['trend_strength'],
  161. '波动率分位': env['volatility_percentile'],
  162. '波动率水平': self._classify_volatility(env['volatility_percentile']),
  163. '成交量分位': env['volume_percentile'],
  164. '布林带位置': env['bb_position'],
  165. '布林带区域': self._classify_bb_position(env['bb_position']),
  166. 'RSI分位': env['rsi_percentile'],
  167. 'RSI区域': self._classify_rsi(env['rsi']),
  168. '1日动量': env['momentum_1d'],
  169. '入场价格': env['close']
  170. })
  171. enriched.append(trade_data)
  172. self.enriched_trades = pd.DataFrame(enriched)
  173. print(f"✅ 成功标记 {len(self.enriched_trades)} 笔交易")
  174. def _classify_volatility(self, percentile):
  175. """分类波动率水平"""
  176. if percentile < 0.2:
  177. return '极低'
  178. elif percentile < 0.4:
  179. return '低'
  180. elif percentile < 0.6:
  181. return '中等'
  182. elif percentile < 0.8:
  183. return '高'
  184. else:
  185. return '极高'
  186. def _classify_bb_position(self, position):
  187. """分类布林带位置"""
  188. if position < 0.1:
  189. return '下轨极低位'
  190. elif position < 0.3:
  191. return '下轨低位'
  192. elif position < 0.45:
  193. return '下轨中位'
  194. elif position < 0.55:
  195. return '中轨'
  196. elif position < 0.7:
  197. return '上轨中位'
  198. elif position < 0.9:
  199. return '上轨高位'
  200. else:
  201. return '上轨极高位'
  202. def _classify_rsi(self, rsi):
  203. """分类RSI状态"""
  204. if rsi < 20:
  205. return '极度超卖'
  206. elif rsi < 30:
  207. return '超卖'
  208. elif rsi < 40:
  209. return '偏弱'
  210. elif rsi < 50:
  211. return '中性偏弱'
  212. elif rsi < 60:
  213. return '中性偏强'
  214. elif rsi < 70:
  215. return '偏强'
  216. elif rsi < 80:
  217. return '超买'
  218. else:
  219. return '极度超买'
  220. def _analyze_by_market_regime(self):
  221. """按市场状态分析"""
  222. print("\n【分析1】按市场状态统计")
  223. print("-" * 60)
  224. stats = self._calculate_stats(self.enriched_trades, '市场状态')
  225. print(stats.to_string(index=False))
  226. # 找出舒适区
  227. comfort_zones = stats[(stats['胜率'] > 50) & (stats['平均盈亏'] > 0)]
  228. if len(comfort_zones) > 0:
  229. print("\n✅ 识别到的舒适区市场状态:")
  230. for _, row in comfort_zones.iterrows():
  231. print(f" • {row['市场状态']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
  232. def _analyze_by_volatility(self):
  233. """按波动率分析"""
  234. print("\n【分析2】按波动率水平统计")
  235. print("-" * 60)
  236. stats = self._calculate_stats(self.enriched_trades, '波动率水平')
  237. print(stats.to_string(index=False))
  238. # 排序找出最佳波动率区间
  239. stats_sorted = stats.sort_values('平均盈亏', ascending=False)
  240. print(f"\n📊 波动率舒适区排序:")
  241. for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1):
  242. print(f" {i}. {row['波动率水平']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
  243. def _analyze_by_trend(self):
  244. """按趋势状态分析"""
  245. print("\n【分析3】按趋势状态统计")
  246. print("-" * 60)
  247. # 短期趋势
  248. stats_short = self._calculate_stats(self.enriched_trades, '趋势短期')
  249. print("短期趋势:")
  250. print(stats_short.to_string(index=False))
  251. # 中期趋势
  252. stats_mid = self._calculate_stats(self.enriched_trades, '趋势中期')
  253. print("\n中期趋势:")
  254. print(stats_mid.to_string(index=False))
  255. def _analyze_by_bb_position(self):
  256. """按布林带位置分析"""
  257. print("\n【分析4】按布林带位置统计")
  258. print("-" * 60)
  259. stats = self._calculate_stats(self.enriched_trades, '布林带区域')
  260. print(stats.to_string(index=False))
  261. # 找出最佳入场区域
  262. stats_sorted = stats.sort_values('平均盈亏', ascending=False)
  263. print(f"\n📊 最佳入场区域排序:")
  264. for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1):
  265. print(f" {i}. {row['布林带区域']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
  266. def _analyze_by_rsi(self):
  267. """按RSI状态分析"""
  268. print("\n【分析5】按RSI状态统计")
  269. print("-" * 60)
  270. stats = self._calculate_stats(self.enriched_trades, 'RSI区域')
  271. print(stats.to_string(index=False))
  272. # 找出最佳RSI区域
  273. stats_sorted = stats.sort_values('平均盈亏', ascending=False)
  274. print(f"\n📊 最佳RSI入场区域排序:")
  275. for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1):
  276. print(f" {i}. {row['RSI区域']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
  277. def _calculate_stats(self, df, groupby_col):
  278. """计算分组统计"""
  279. stats = []
  280. for group_name, group in df.groupby(groupby_col):
  281. if len(group) < 3: # 样本太少跳过
  282. continue
  283. total_trades = len(group)
  284. win_trades = len(group[group['盈亏金额'] > 0])
  285. win_rate = win_trades / total_trades * 100
  286. total_pnl = group['盈亏金额'].sum()
  287. avg_pnl = group['盈亏金额'].mean()
  288. stats.append({
  289. groupby_col: group_name,
  290. '交易次数': total_trades,
  291. '胜率': win_rate,
  292. '总盈亏': total_pnl,
  293. '平均盈亏': avg_pnl,
  294. '盈亏比': abs(group[group['盈亏金额'] > 0]['盈亏金额'].mean() /
  295. group[group['盈亏金额'] < 0]['盈亏金额'].mean()) if len(group[group['盈亏金额'] < 0]) > 0 else 0
  296. })
  297. return pd.DataFrame(stats).sort_values('平均盈亏', ascending=False)
  298. def _find_best_combinations(self):
  299. """寻找最佳组合条件"""
  300. print("\n【分析6】最佳组合条件识别")
  301. print("-" * 60)
  302. df = self.enriched_trades
  303. # 定义可能的舒适区组合
  304. combinations = [
  305. {
  306. 'name': '下跌超卖反弹',
  307. 'condition': (df['趋势中期'] == '下跌') & (df['RSI区域'].isin(['超卖', '极度超卖'])) & (df['布林带区域'].isin(['下轨极低位', '下轨低位']))
  308. },
  309. {
  310. 'name': '震荡下轨做多',
  311. 'condition': (df['市场状态'].str.contains('震荡')) & (df['布林带区域'].isin(['下轨极低位', '下轨低位']))
  312. },
  313. {
  314. 'name': '低波动趋势上涨',
  315. 'condition': (df['趋势中期'] == '上涨') & (df['波动率水平'].isin(['低', '极低']))
  316. },
  317. {
  318. 'name': '放量突破中轨',
  319. 'condition': (df['布林带区域'] == '中轨') & (df['成交量分位'] > 0.6) & (df['趋势短期'] == '上涨')
  320. },
  321. {
  322. 'name': '强趋势低波回调',
  323. 'condition': (df['市场状态'] == '强趋势低波') & (df['布林带区域'].isin(['下轨中位', '下轨低位']))
  324. },
  325. {
  326. 'name': 'RSI超买区域',
  327. 'condition': df['RSI区域'].isin(['超买', '极度超买'])
  328. },
  329. {
  330. 'name': 'RSI超卖区域',
  331. 'condition': df['RSI区域'].isin(['超卖', '极度超卖'])
  332. },
  333. {
  334. 'name': '极低波动率环境',
  335. 'condition': df['波动率水平'] == '极低'
  336. },
  337. {
  338. 'name': '极高波动率环境',
  339. 'condition': df['波动率水平'] == '极高'
  340. }
  341. ]
  342. results = []
  343. for combo in combinations:
  344. filtered = df[combo['condition']]
  345. if len(filtered) >= 3:
  346. win_rate = (filtered['盈亏金额'] > 0).sum() / len(filtered) * 100
  347. avg_pnl = filtered['盈亏金额'].mean()
  348. total_pnl = filtered['盈亏金额'].sum()
  349. results.append({
  350. '组合名称': combo['name'],
  351. '交易次数': len(filtered),
  352. '胜率': win_rate,
  353. '平均盈亏': avg_pnl,
  354. '总盈亏': total_pnl,
  355. '舒适评分': win_rate * 0.5 + (avg_pnl / 1000) * 0.5 # 自定义评分
  356. })
  357. results_df = pd.DataFrame(results).sort_values('舒适评分', ascending=False)
  358. print("\n组合条件表现排名:")
  359. print(results_df.to_string(index=False))
  360. # 标记前3名为舒适区
  361. print("\n" + "="*60)
  362. print("🎯 策略舒适区标记 (Top 3)")
  363. print("="*60)
  364. for i, (_, row) in enumerate(results_df.head(3).iterrows(), 1):
  365. print(f"\n舒适区 #{i}: {row['组合名称']}")
  366. print(f" 交易次数: {row['交易次数']}笔")
  367. print(f" 胜率: {row['胜率']:.1f}%")
  368. print(f" 平均盈亏: {row['平均盈亏']:+,.0f}元")
  369. print(f" 总盈亏: {row['总盈亏']:+,.0f}元")
  370. print(f" 舒适评分: {row['舒适评分']:.2f}")
  371. # 标记警告区(表现差的)
  372. print("\n" + "="*60)
  373. print("⚠️ 策略危险区标记 (Bottom 3)")
  374. print("="*60)
  375. for i, (_, row) in enumerate(results_df.tail(3).iterrows(), 1):
  376. print(f"\n危险区 #{i}: {row['组合名称']}")
  377. print(f" 交易次数: {row['交易次数']}笔")
  378. print(f" 胜率: {row['胜率']:.1f}%")
  379. print(f" 平均盈亏: {row['平均盈亏']:+,.0f}元")
  380. def export_comfort_zones(self, filename='comfort_zones.json'):
  381. """导出舒适区配置"""
  382. import json
  383. comfort_zones = {
  384. 'best_conditions': [
  385. {
  386. 'name': '下跌超卖反弹',
  387. 'filters': {
  388. 'trend_mid': '下跌',
  389. 'rsi_zone': ['超卖', '极度超卖'],
  390. 'bb_zone': ['下轨极低位', '下轨低位']
  391. },
  392. 'description': '中期下跌趋势中,RSI超卖且价格触及布林带下轨'
  393. },
  394. {
  395. 'name': '强趋势低波回调',
  396. 'filters': {
  397. 'market_regime': '强趋势低波',
  398. 'bb_zone': ['下轨中位', '下轨低位']
  399. },
  400. 'description': '强趋势低波动环境下,价格回调至布林带下轨区域'
  401. }
  402. ],
  403. 'avoid_conditions': [
  404. {
  405. 'name': '极高波动率',
  406. 'filters': {'volatility_level': '极高'},
  407. 'description': '极高波动率环境下避免交易'
  408. }
  409. ]
  410. }
  411. with open(filename, 'w', encoding='utf-8') as f:
  412. json.dump(comfort_zones, f, ensure_ascii=False, indent=2)
  413. print(f"\n✅ 舒适区配置已导出: {filename}")
  414. def main():
  415. """主程序 - 运行舒适区分析"""
  416. print("="*80)
  417. print("CYB50 T+1 策略舒适区分析器")
  418. print("寻找策略表现较好的市场环境")
  419. print("="*80)
  420. # 1. 加载数据
  421. print("\n【数据加载】")
  422. df = pd.read_csv('cyb50_30min_2023_to_20260325.csv')
  423. df['DateTime'] = pd.to_datetime(df['DateTime'])
  424. df.set_index('DateTime', inplace=True)
  425. df.sort_index(inplace=True)
  426. if 'Open' not in df.columns and 'o' in df.columns:
  427. df.rename(columns={'o': 'Open', 'h': 'High', 'l': 'Low', 'c': 'Close', 'v': 'Volume'}, inplace=True)
  428. df['Returns'] = df['Close'].pct_change()
  429. df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
  430. df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] # 添加缺失的列
  431. df.ffill(inplace=True)
  432. df.dropna(inplace=True)
  433. print(f"数据区间: {df.index[0]} ~ {df.index[-1]}")
  434. print(f"数据条数: {len(df)}")
  435. # 2. 运行原策略获取交易记录
  436. print("\n【运行原策略】")
  437. config_manager = ConfigManager('config.json')
  438. fetcher = IntradayDataFetcher(config_manager)
  439. data_with_indicators = fetcher.calculate_intraday_indicators(df)
  440. signal_generator = DualDirectionSignalGenerator()
  441. signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
  442. initial_capital = 1000000
  443. executor = DualDirectionExecutor(initial_capital=initial_capital)
  444. results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
  445. # 3. 应用T+1转换
  446. long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
  447. t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital)
  448. print(f"\nT+1交易记录: {len(t1_trades)}笔")
  449. # 4. 市场环境分析
  450. print("\n【市场环境特征计算】")
  451. market_analyzer = MarketEnvironmentAnalyzer(data_with_indicators)
  452. # 5. 舒适区分析
  453. comfort_analyzer = ComfortZoneAnalyzer(t1_trades, market_analyzer)
  454. enriched_trades = comfort_analyzer.analyze()
  455. # 6. 导出结果
  456. comfort_analyzer.export_comfort_zones()
  457. # 7. 导出详细交易记录
  458. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  459. output_file = f't1_trades_with_environment_{timestamp}.csv'
  460. enriched_trades.to_csv(output_file, index=False, encoding='utf-8-sig')
  461. print(f"✅ 详细交易记录已导出: {output_file}")
  462. print("\n" + "="*80)
  463. print("舒适区分析完成!")
  464. print("="*80)
  465. if __name__ == "__main__":
  466. main()