#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CYB50 策略舒适区分析器 寻找策略表现较好的市场环境特征,进行标记 """ import sys import io # 强制UTF-8输出 if sys.platform == 'win32': sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8') import pandas as pd import numpy as np from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) from t1_converter import simulate_t1_trades class MarketEnvironmentAnalyzer: """市场环境分析器 - 计算各种市场特征指标""" def __init__(self, data_df): self.data = data_df.copy() self._calculate_market_features() def _calculate_market_features(self): """计算市场环境特征指标""" df = self.data # 1. 趋势特征 df['MA48'] = df['Close'].rolling(window=48).mean() # 24小时 df['Trend_Short'] = np.where(df['Close'] > df['MA6'], 1, -1) # 短期趋势 df['Trend_Mid'] = np.where(df['Close'] > df['MA24'], 1, -1) # 中期趋势 df['Trend_Long'] = np.where(df['MA24'] > df['MA48'], 1, -1) # 长期趋势 (增加MA48) # 趋势强度 (ADX简化版) df['Trend_Strength'] = abs(df['Close'] - df['MA24']) / df['ATR'] # 2. 波动率特征 df['Volatility_Short'] = df['Returns'].rolling(12).std() * np.sqrt(48) # 日波动率年化 df['Volatility_Mid'] = df['Returns'].rolling(48).std() * np.sqrt(48) df['Volatility_Long'] = df['Returns'].rolling(120).std() * np.sqrt(48) # 波动率分位 df['Volatility_Percentile'] = df['Volatility_Mid'].rolling(120).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5 ) # 3. 成交量特征 df['Volume_Percentile'] = df['Volume'].rolling(48).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5 ) # 4. 布林带位置 df['BB_Position'] = (df['Close'] - df['BB_lower']) / (df['BB_upper'] - df['BB_lower']) # 5. 市场状态分类 df['Market_Regime'] = self._classify_market_regime(df) # 6. 日内波动特征 df['Intraday_Range'] = (df['High'] - df['Low']) / df['Close'] df['Intraday_Range_Percentile'] = df['Intraday_Range'].rolling(48).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5 ) # 7. 动量特征 df['Momentum_1d'] = (df['Close'] - df['Close'].shift(8)) / df['Close'].shift(8) # 1日动量 df['Momentum_3d'] = (df['Close'] - df['Close'].shift(24)) / df['Close'].shift(24) # 3日动量 # 8. RSI分位 df['RSI_Percentile'] = df['RSI'].rolling(48).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5 ) def _classify_market_regime(self, df): """分类市场状态""" regime = [] # 预先计算波动率分位数阈值 vol_low_threshold = df['Volatility_Mid'].quantile(0.3) for i in range(len(df)): if i < 48: regime.append('未知') continue row = df.iloc[i] # 基于趋势和波动率分类 if row['Trend_Mid'] > 0 and row['Trend_Strength'] > 1.0: if row['Volatility_Mid'] < vol_low_threshold: regime.append('强趋势低波') else: regime.append('强趋势高波') elif row['Trend_Mid'] < 0 and row['Trend_Strength'] > 1.0: if row['Volatility_Mid'] < vol_low_threshold: regime.append('下跌趋势低波') else: regime.append('下跌趋势高波') else: if row['Volatility_Mid'] < vol_low_threshold: regime.append('震荡低波') else: regime.append('震荡高波') return regime def get_environment_at_time(self, timestamp): """获取指定时间的市场环境""" try: idx = self.data.index.get_loc(timestamp) if isinstance(idx, slice): idx = idx.start row = self.data.iloc[idx] return { 'timestamp': timestamp, 'trend_short': row['Trend_Short'], 'trend_mid': row['Trend_Mid'], 'trend_long': row['Trend_Long'], 'trend_strength': row['Trend_Strength'], 'volatility_short': row['Volatility_Short'], 'volatility_mid': row['Volatility_Mid'], 'volatility_percentile': row['Volatility_Percentile'], 'volume_percentile': row['Volume_Percentile'], 'bb_position': row['BB_Position'], 'market_regime': row['Market_Regime'], 'intraday_range_pct': row['Intraday_Range_Percentile'], 'momentum_1d': row['Momentum_1d'], 'rsi_percentile': row['RSI_Percentile'], 'close': row['Close'], 'rsi': row['RSI'] } except Exception as e: return None class ComfortZoneAnalyzer: """策略舒适区分析器""" def __init__(self, trades_df, market_analyzer): self.trades = trades_df.copy() self.market = market_analyzer self.enriched_trades = None def analyze(self): """执行舒适区分析""" print("\n" + "="*80) print("策略舒适区分析") print("="*80) # 1. 为每笔交易添加市场环境标签 self._enrich_trades_with_environment() # 2. 按市场环境统计表现 self._analyze_by_market_regime() # 3. 按波动率统计表现 self._analyze_by_volatility() # 4. 按趋势状态统计表现 self._analyze_by_trend() # 5. 按布林带位置统计表现 self._analyze_by_bb_position() # 6. 按RSI状态统计表现 self._analyze_by_rsi() # 7. 识别最佳组合条件 self._find_best_combinations() return self.enriched_trades def _enrich_trades_with_environment(self): """为交易添加市场环境标签""" print("\n【步骤1】为每笔交易添加市场环境标签...") enriched = [] for _, trade in self.trades.iterrows(): entry_time = trade['开仓时间'] # 获取开仓时的市场环境 env = self.market.get_environment_at_time(entry_time) if env: trade_data = trade.to_dict() trade_data.update({ '市场状态': env['market_regime'], '趋势短期': '上涨' if env['trend_short'] > 0 else '下跌', '趋势中期': '上涨' if env['trend_mid'] > 0 else '下跌', '趋势强度': env['trend_strength'], '波动率分位': env['volatility_percentile'], '波动率水平': self._classify_volatility(env['volatility_percentile']), '成交量分位': env['volume_percentile'], '布林带位置': env['bb_position'], '布林带区域': self._classify_bb_position(env['bb_position']), 'RSI分位': env['rsi_percentile'], 'RSI区域': self._classify_rsi(env['rsi']), '1日动量': env['momentum_1d'], '入场价格': env['close'] }) enriched.append(trade_data) self.enriched_trades = pd.DataFrame(enriched) print(f"✅ 成功标记 {len(self.enriched_trades)} 笔交易") def _classify_volatility(self, percentile): """分类波动率水平""" if percentile < 0.2: return '极低' elif percentile < 0.4: return '低' elif percentile < 0.6: return '中等' elif percentile < 0.8: return '高' else: return '极高' def _classify_bb_position(self, position): """分类布林带位置""" if position < 0.1: return '下轨极低位' elif position < 0.3: return '下轨低位' elif position < 0.45: return '下轨中位' elif position < 0.55: return '中轨' elif position < 0.7: return '上轨中位' elif position < 0.9: return '上轨高位' else: return '上轨极高位' def _classify_rsi(self, rsi): """分类RSI状态""" if rsi < 20: return '极度超卖' elif rsi < 30: return '超卖' elif rsi < 40: return '偏弱' elif rsi < 50: return '中性偏弱' elif rsi < 60: return '中性偏强' elif rsi < 70: return '偏强' elif rsi < 80: return '超买' else: return '极度超买' def _analyze_by_market_regime(self): """按市场状态分析""" print("\n【分析1】按市场状态统计") print("-" * 60) stats = self._calculate_stats(self.enriched_trades, '市场状态') print(stats.to_string(index=False)) # 找出舒适区 comfort_zones = stats[(stats['胜率'] > 50) & (stats['平均盈亏'] > 0)] if len(comfort_zones) > 0: print("\n✅ 识别到的舒适区市场状态:") for _, row in comfort_zones.iterrows(): print(f" • {row['市场状态']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元") def _analyze_by_volatility(self): """按波动率分析""" print("\n【分析2】按波动率水平统计") print("-" * 60) stats = self._calculate_stats(self.enriched_trades, '波动率水平') print(stats.to_string(index=False)) # 排序找出最佳波动率区间 stats_sorted = stats.sort_values('平均盈亏', ascending=False) print(f"\n📊 波动率舒适区排序:") for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1): print(f" {i}. {row['波动率水平']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元") def _analyze_by_trend(self): """按趋势状态分析""" print("\n【分析3】按趋势状态统计") print("-" * 60) # 短期趋势 stats_short = self._calculate_stats(self.enriched_trades, '趋势短期') print("短期趋势:") print(stats_short.to_string(index=False)) # 中期趋势 stats_mid = self._calculate_stats(self.enriched_trades, '趋势中期') print("\n中期趋势:") print(stats_mid.to_string(index=False)) def _analyze_by_bb_position(self): """按布林带位置分析""" print("\n【分析4】按布林带位置统计") print("-" * 60) stats = self._calculate_stats(self.enriched_trades, '布林带区域') print(stats.to_string(index=False)) # 找出最佳入场区域 stats_sorted = stats.sort_values('平均盈亏', ascending=False) print(f"\n📊 最佳入场区域排序:") for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1): print(f" {i}. {row['布林带区域']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元") def _analyze_by_rsi(self): """按RSI状态分析""" print("\n【分析5】按RSI状态统计") print("-" * 60) stats = self._calculate_stats(self.enriched_trades, 'RSI区域') print(stats.to_string(index=False)) # 找出最佳RSI区域 stats_sorted = stats.sort_values('平均盈亏', ascending=False) print(f"\n📊 最佳RSI入场区域排序:") for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1): print(f" {i}. {row['RSI区域']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元") def _calculate_stats(self, df, groupby_col): """计算分组统计""" stats = [] for group_name, group in df.groupby(groupby_col): if len(group) < 3: # 样本太少跳过 continue total_trades = len(group) win_trades = len(group[group['盈亏金额'] > 0]) win_rate = win_trades / total_trades * 100 total_pnl = group['盈亏金额'].sum() avg_pnl = group['盈亏金额'].mean() stats.append({ groupby_col: group_name, '交易次数': total_trades, '胜率': win_rate, '总盈亏': total_pnl, '平均盈亏': avg_pnl, '盈亏比': abs(group[group['盈亏金额'] > 0]['盈亏金额'].mean() / group[group['盈亏金额'] < 0]['盈亏金额'].mean()) if len(group[group['盈亏金额'] < 0]) > 0 else 0 }) return pd.DataFrame(stats).sort_values('平均盈亏', ascending=False) def _find_best_combinations(self): """寻找最佳组合条件""" print("\n【分析6】最佳组合条件识别") print("-" * 60) df = self.enriched_trades # 定义可能的舒适区组合 combinations = [ { 'name': '下跌超卖反弹', 'condition': (df['趋势中期'] == '下跌') & (df['RSI区域'].isin(['超卖', '极度超卖'])) & (df['布林带区域'].isin(['下轨极低位', '下轨低位'])) }, { 'name': '震荡下轨做多', 'condition': (df['市场状态'].str.contains('震荡')) & (df['布林带区域'].isin(['下轨极低位', '下轨低位'])) }, { 'name': '低波动趋势上涨', 'condition': (df['趋势中期'] == '上涨') & (df['波动率水平'].isin(['低', '极低'])) }, { 'name': '放量突破中轨', 'condition': (df['布林带区域'] == '中轨') & (df['成交量分位'] > 0.6) & (df['趋势短期'] == '上涨') }, { 'name': '强趋势低波回调', 'condition': (df['市场状态'] == '强趋势低波') & (df['布林带区域'].isin(['下轨中位', '下轨低位'])) }, { 'name': 'RSI超买区域', 'condition': df['RSI区域'].isin(['超买', '极度超买']) }, { 'name': 'RSI超卖区域', 'condition': df['RSI区域'].isin(['超卖', '极度超卖']) }, { 'name': '极低波动率环境', 'condition': df['波动率水平'] == '极低' }, { 'name': '极高波动率环境', 'condition': df['波动率水平'] == '极高' } ] results = [] for combo in combinations: filtered = df[combo['condition']] if len(filtered) >= 3: win_rate = (filtered['盈亏金额'] > 0).sum() / len(filtered) * 100 avg_pnl = filtered['盈亏金额'].mean() total_pnl = filtered['盈亏金额'].sum() results.append({ '组合名称': combo['name'], '交易次数': len(filtered), '胜率': win_rate, '平均盈亏': avg_pnl, '总盈亏': total_pnl, '舒适评分': win_rate * 0.5 + (avg_pnl / 1000) * 0.5 # 自定义评分 }) results_df = pd.DataFrame(results).sort_values('舒适评分', ascending=False) print("\n组合条件表现排名:") print(results_df.to_string(index=False)) # 标记前3名为舒适区 print("\n" + "="*60) print("🎯 策略舒适区标记 (Top 3)") print("="*60) for i, (_, row) in enumerate(results_df.head(3).iterrows(), 1): print(f"\n舒适区 #{i}: {row['组合名称']}") print(f" 交易次数: {row['交易次数']}笔") print(f" 胜率: {row['胜率']:.1f}%") print(f" 平均盈亏: {row['平均盈亏']:+,.0f}元") print(f" 总盈亏: {row['总盈亏']:+,.0f}元") print(f" 舒适评分: {row['舒适评分']:.2f}") # 标记警告区(表现差的) print("\n" + "="*60) print("⚠️ 策略危险区标记 (Bottom 3)") print("="*60) for i, (_, row) in enumerate(results_df.tail(3).iterrows(), 1): print(f"\n危险区 #{i}: {row['组合名称']}") print(f" 交易次数: {row['交易次数']}笔") print(f" 胜率: {row['胜率']:.1f}%") print(f" 平均盈亏: {row['平均盈亏']:+,.0f}元") def export_comfort_zones(self, filename='comfort_zones.json'): """导出舒适区配置""" import json comfort_zones = { 'best_conditions': [ { 'name': '下跌超卖反弹', 'filters': { 'trend_mid': '下跌', 'rsi_zone': ['超卖', '极度超卖'], 'bb_zone': ['下轨极低位', '下轨低位'] }, 'description': '中期下跌趋势中,RSI超卖且价格触及布林带下轨' }, { 'name': '强趋势低波回调', 'filters': { 'market_regime': '强趋势低波', 'bb_zone': ['下轨中位', '下轨低位'] }, 'description': '强趋势低波动环境下,价格回调至布林带下轨区域' } ], 'avoid_conditions': [ { 'name': '极高波动率', 'filters': {'volatility_level': '极高'}, 'description': '极高波动率环境下避免交易' } ] } with open(filename, 'w', encoding='utf-8') as f: json.dump(comfort_zones, f, ensure_ascii=False, indent=2) print(f"\n✅ 舒适区配置已导出: {filename}") def main(): """主程序 - 运行舒适区分析""" print("="*80) print("CYB50 T+1 策略舒适区分析器") print("寻找策略表现较好的市场环境") print("="*80) # 1. 加载数据 print("\n【数据加载】") df = pd.read_csv('cyb50_30min_2023_to_20260325.csv') df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o': 'Open', 'h': 'High', 'l': 'Low', 'c': 'Close', 'v': 'Volume'}, inplace=True) df['Returns'] = df['Close'].pct_change() df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] # 添加缺失的列 df.ffill(inplace=True) df.dropna(inplace=True) print(f"数据区间: {df.index[0]} ~ {df.index[-1]}") print(f"数据条数: {len(df)}") # 2. 运行原策略获取交易记录 print("\n【运行原策略】") config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) data_with_indicators = fetcher.calculate_intraday_indicators(df) signal_generator = DualDirectionSignalGenerator() signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) initial_capital = 1000000 executor = DualDirectionExecutor(initial_capital=initial_capital) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) # 3. 应用T+1转换 long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital) print(f"\nT+1交易记录: {len(t1_trades)}笔") # 4. 市场环境分析 print("\n【市场环境特征计算】") market_analyzer = MarketEnvironmentAnalyzer(data_with_indicators) # 5. 舒适区分析 comfort_analyzer = ComfortZoneAnalyzer(t1_trades, market_analyzer) enriched_trades = comfort_analyzer.analyze() # 6. 导出结果 comfort_analyzer.export_comfort_zones() # 7. 导出详细交易记录 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = f't1_trades_with_environment_{timestamp}.csv' enriched_trades.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"✅ 详细交易记录已导出: {output_file}") print("\n" + "="*80) print("舒适区分析完成!") print("="*80) if __name__ == "__main__": main()