| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- CYB50 策略舒适区分析器
- 寻找策略表现较好的市场环境特征,进行标记
- """
- import sys
- import io
- # 强制UTF-8输出
- if sys.platform == 'win32':
- sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
- sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import warnings
- warnings.filterwarnings('ignore')
- from cyb50_30min_dual_direction import (
- ConfigManager, IntradayDataFetcher,
- DualDirectionSignalGenerator, DualDirectionExecutor
- )
- from t1_converter import simulate_t1_trades
- class MarketEnvironmentAnalyzer:
- """市场环境分析器 - 计算各种市场特征指标"""
- def __init__(self, data_df):
- self.data = data_df.copy()
- self._calculate_market_features()
- def _calculate_market_features(self):
- """计算市场环境特征指标"""
- df = self.data
- # 1. 趋势特征
- df['MA48'] = df['Close'].rolling(window=48).mean() # 24小时
- df['Trend_Short'] = np.where(df['Close'] > df['MA6'], 1, -1) # 短期趋势
- df['Trend_Mid'] = np.where(df['Close'] > df['MA24'], 1, -1) # 中期趋势
- df['Trend_Long'] = np.where(df['MA24'] > df['MA48'], 1, -1) # 长期趋势 (增加MA48)
- # 趋势强度 (ADX简化版)
- df['Trend_Strength'] = abs(df['Close'] - df['MA24']) / df['ATR']
- # 2. 波动率特征
- df['Volatility_Short'] = df['Returns'].rolling(12).std() * np.sqrt(48) # 日波动率年化
- df['Volatility_Mid'] = df['Returns'].rolling(48).std() * np.sqrt(48)
- df['Volatility_Long'] = df['Returns'].rolling(120).std() * np.sqrt(48)
- # 波动率分位
- df['Volatility_Percentile'] = df['Volatility_Mid'].rolling(120).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
- )
- # 3. 成交量特征
- df['Volume_Percentile'] = df['Volume'].rolling(48).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
- )
- # 4. 布林带位置
- df['BB_Position'] = (df['Close'] - df['BB_lower']) / (df['BB_upper'] - df['BB_lower'])
- # 5. 市场状态分类
- df['Market_Regime'] = self._classify_market_regime(df)
- # 6. 日内波动特征
- df['Intraday_Range'] = (df['High'] - df['Low']) / df['Close']
- df['Intraday_Range_Percentile'] = df['Intraday_Range'].rolling(48).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
- )
- # 7. 动量特征
- df['Momentum_1d'] = (df['Close'] - df['Close'].shift(8)) / df['Close'].shift(8) # 1日动量
- df['Momentum_3d'] = (df['Close'] - df['Close'].shift(24)) / df['Close'].shift(24) # 3日动量
- # 8. RSI分位
- df['RSI_Percentile'] = df['RSI'].rolling(48).apply(
- lambda x: pd.Series(x).rank(pct=True).iloc[-1] if len(x) > 0 else 0.5
- )
- def _classify_market_regime(self, df):
- """分类市场状态"""
- regime = []
- # 预先计算波动率分位数阈值
- vol_low_threshold = df['Volatility_Mid'].quantile(0.3)
- for i in range(len(df)):
- if i < 48:
- regime.append('未知')
- continue
- row = df.iloc[i]
- # 基于趋势和波动率分类
- if row['Trend_Mid'] > 0 and row['Trend_Strength'] > 1.0:
- if row['Volatility_Mid'] < vol_low_threshold:
- regime.append('强趋势低波')
- else:
- regime.append('强趋势高波')
- elif row['Trend_Mid'] < 0 and row['Trend_Strength'] > 1.0:
- if row['Volatility_Mid'] < vol_low_threshold:
- regime.append('下跌趋势低波')
- else:
- regime.append('下跌趋势高波')
- else:
- if row['Volatility_Mid'] < vol_low_threshold:
- regime.append('震荡低波')
- else:
- regime.append('震荡高波')
- return regime
- def get_environment_at_time(self, timestamp):
- """获取指定时间的市场环境"""
- try:
- idx = self.data.index.get_loc(timestamp)
- if isinstance(idx, slice):
- idx = idx.start
- row = self.data.iloc[idx]
- return {
- 'timestamp': timestamp,
- 'trend_short': row['Trend_Short'],
- 'trend_mid': row['Trend_Mid'],
- 'trend_long': row['Trend_Long'],
- 'trend_strength': row['Trend_Strength'],
- 'volatility_short': row['Volatility_Short'],
- 'volatility_mid': row['Volatility_Mid'],
- 'volatility_percentile': row['Volatility_Percentile'],
- 'volume_percentile': row['Volume_Percentile'],
- 'bb_position': row['BB_Position'],
- 'market_regime': row['Market_Regime'],
- 'intraday_range_pct': row['Intraday_Range_Percentile'],
- 'momentum_1d': row['Momentum_1d'],
- 'rsi_percentile': row['RSI_Percentile'],
- 'close': row['Close'],
- 'rsi': row['RSI']
- }
- except Exception as e:
- return None
- class ComfortZoneAnalyzer:
- """策略舒适区分析器"""
- def __init__(self, trades_df, market_analyzer):
- self.trades = trades_df.copy()
- self.market = market_analyzer
- self.enriched_trades = None
- def analyze(self):
- """执行舒适区分析"""
- print("\n" + "="*80)
- print("策略舒适区分析")
- print("="*80)
- # 1. 为每笔交易添加市场环境标签
- self._enrich_trades_with_environment()
- # 2. 按市场环境统计表现
- self._analyze_by_market_regime()
- # 3. 按波动率统计表现
- self._analyze_by_volatility()
- # 4. 按趋势状态统计表现
- self._analyze_by_trend()
- # 5. 按布林带位置统计表现
- self._analyze_by_bb_position()
- # 6. 按RSI状态统计表现
- self._analyze_by_rsi()
- # 7. 识别最佳组合条件
- self._find_best_combinations()
- return self.enriched_trades
- def _enrich_trades_with_environment(self):
- """为交易添加市场环境标签"""
- print("\n【步骤1】为每笔交易添加市场环境标签...")
- enriched = []
- for _, trade in self.trades.iterrows():
- entry_time = trade['开仓时间']
- # 获取开仓时的市场环境
- env = self.market.get_environment_at_time(entry_time)
- if env:
- trade_data = trade.to_dict()
- trade_data.update({
- '市场状态': env['market_regime'],
- '趋势短期': '上涨' if env['trend_short'] > 0 else '下跌',
- '趋势中期': '上涨' if env['trend_mid'] > 0 else '下跌',
- '趋势强度': env['trend_strength'],
- '波动率分位': env['volatility_percentile'],
- '波动率水平': self._classify_volatility(env['volatility_percentile']),
- '成交量分位': env['volume_percentile'],
- '布林带位置': env['bb_position'],
- '布林带区域': self._classify_bb_position(env['bb_position']),
- 'RSI分位': env['rsi_percentile'],
- 'RSI区域': self._classify_rsi(env['rsi']),
- '1日动量': env['momentum_1d'],
- '入场价格': env['close']
- })
- enriched.append(trade_data)
- self.enriched_trades = pd.DataFrame(enriched)
- print(f"✅ 成功标记 {len(self.enriched_trades)} 笔交易")
- def _classify_volatility(self, percentile):
- """分类波动率水平"""
- if percentile < 0.2:
- return '极低'
- elif percentile < 0.4:
- return '低'
- elif percentile < 0.6:
- return '中等'
- elif percentile < 0.8:
- return '高'
- else:
- return '极高'
- def _classify_bb_position(self, position):
- """分类布林带位置"""
- if position < 0.1:
- return '下轨极低位'
- elif position < 0.3:
- return '下轨低位'
- elif position < 0.45:
- return '下轨中位'
- elif position < 0.55:
- return '中轨'
- elif position < 0.7:
- return '上轨中位'
- elif position < 0.9:
- return '上轨高位'
- else:
- return '上轨极高位'
- def _classify_rsi(self, rsi):
- """分类RSI状态"""
- if rsi < 20:
- return '极度超卖'
- elif rsi < 30:
- return '超卖'
- elif rsi < 40:
- return '偏弱'
- elif rsi < 50:
- return '中性偏弱'
- elif rsi < 60:
- return '中性偏强'
- elif rsi < 70:
- return '偏强'
- elif rsi < 80:
- return '超买'
- else:
- return '极度超买'
- def _analyze_by_market_regime(self):
- """按市场状态分析"""
- print("\n【分析1】按市场状态统计")
- print("-" * 60)
- stats = self._calculate_stats(self.enriched_trades, '市场状态')
- print(stats.to_string(index=False))
- # 找出舒适区
- comfort_zones = stats[(stats['胜率'] > 50) & (stats['平均盈亏'] > 0)]
- if len(comfort_zones) > 0:
- print("\n✅ 识别到的舒适区市场状态:")
- for _, row in comfort_zones.iterrows():
- print(f" • {row['市场状态']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
- def _analyze_by_volatility(self):
- """按波动率分析"""
- print("\n【分析2】按波动率水平统计")
- print("-" * 60)
- stats = self._calculate_stats(self.enriched_trades, '波动率水平')
- print(stats.to_string(index=False))
- # 排序找出最佳波动率区间
- stats_sorted = stats.sort_values('平均盈亏', ascending=False)
- print(f"\n📊 波动率舒适区排序:")
- for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1):
- print(f" {i}. {row['波动率水平']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
- def _analyze_by_trend(self):
- """按趋势状态分析"""
- print("\n【分析3】按趋势状态统计")
- print("-" * 60)
- # 短期趋势
- stats_short = self._calculate_stats(self.enriched_trades, '趋势短期')
- print("短期趋势:")
- print(stats_short.to_string(index=False))
- # 中期趋势
- stats_mid = self._calculate_stats(self.enriched_trades, '趋势中期')
- print("\n中期趋势:")
- print(stats_mid.to_string(index=False))
- def _analyze_by_bb_position(self):
- """按布林带位置分析"""
- print("\n【分析4】按布林带位置统计")
- print("-" * 60)
- stats = self._calculate_stats(self.enriched_trades, '布林带区域')
- print(stats.to_string(index=False))
- # 找出最佳入场区域
- stats_sorted = stats.sort_values('平均盈亏', ascending=False)
- print(f"\n📊 最佳入场区域排序:")
- for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1):
- print(f" {i}. {row['布林带区域']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
- def _analyze_by_rsi(self):
- """按RSI状态分析"""
- print("\n【分析5】按RSI状态统计")
- print("-" * 60)
- stats = self._calculate_stats(self.enriched_trades, 'RSI区域')
- print(stats.to_string(index=False))
- # 找出最佳RSI区域
- stats_sorted = stats.sort_values('平均盈亏', ascending=False)
- print(f"\n📊 最佳RSI入场区域排序:")
- for i, (_, row) in enumerate(stats_sorted.head(3).iterrows(), 1):
- print(f" {i}. {row['RSI区域']}: 胜率{row['胜率']:.1f}%, 平均盈亏{row['平均盈亏']:+,.0f}元")
- def _calculate_stats(self, df, groupby_col):
- """计算分组统计"""
- stats = []
- for group_name, group in df.groupby(groupby_col):
- if len(group) < 3: # 样本太少跳过
- continue
- total_trades = len(group)
- win_trades = len(group[group['盈亏金额'] > 0])
- win_rate = win_trades / total_trades * 100
- total_pnl = group['盈亏金额'].sum()
- avg_pnl = group['盈亏金额'].mean()
- stats.append({
- groupby_col: group_name,
- '交易次数': total_trades,
- '胜率': win_rate,
- '总盈亏': total_pnl,
- '平均盈亏': avg_pnl,
- '盈亏比': abs(group[group['盈亏金额'] > 0]['盈亏金额'].mean() /
- group[group['盈亏金额'] < 0]['盈亏金额'].mean()) if len(group[group['盈亏金额'] < 0]) > 0 else 0
- })
- return pd.DataFrame(stats).sort_values('平均盈亏', ascending=False)
- def _find_best_combinations(self):
- """寻找最佳组合条件"""
- print("\n【分析6】最佳组合条件识别")
- print("-" * 60)
- df = self.enriched_trades
- # 定义可能的舒适区组合
- combinations = [
- {
- 'name': '下跌超卖反弹',
- 'condition': (df['趋势中期'] == '下跌') & (df['RSI区域'].isin(['超卖', '极度超卖'])) & (df['布林带区域'].isin(['下轨极低位', '下轨低位']))
- },
- {
- 'name': '震荡下轨做多',
- 'condition': (df['市场状态'].str.contains('震荡')) & (df['布林带区域'].isin(['下轨极低位', '下轨低位']))
- },
- {
- 'name': '低波动趋势上涨',
- 'condition': (df['趋势中期'] == '上涨') & (df['波动率水平'].isin(['低', '极低']))
- },
- {
- 'name': '放量突破中轨',
- 'condition': (df['布林带区域'] == '中轨') & (df['成交量分位'] > 0.6) & (df['趋势短期'] == '上涨')
- },
- {
- 'name': '强趋势低波回调',
- 'condition': (df['市场状态'] == '强趋势低波') & (df['布林带区域'].isin(['下轨中位', '下轨低位']))
- },
- {
- 'name': 'RSI超买区域',
- 'condition': df['RSI区域'].isin(['超买', '极度超买'])
- },
- {
- 'name': 'RSI超卖区域',
- 'condition': df['RSI区域'].isin(['超卖', '极度超卖'])
- },
- {
- 'name': '极低波动率环境',
- 'condition': df['波动率水平'] == '极低'
- },
- {
- 'name': '极高波动率环境',
- 'condition': df['波动率水平'] == '极高'
- }
- ]
- results = []
- for combo in combinations:
- filtered = df[combo['condition']]
- if len(filtered) >= 3:
- win_rate = (filtered['盈亏金额'] > 0).sum() / len(filtered) * 100
- avg_pnl = filtered['盈亏金额'].mean()
- total_pnl = filtered['盈亏金额'].sum()
- results.append({
- '组合名称': combo['name'],
- '交易次数': len(filtered),
- '胜率': win_rate,
- '平均盈亏': avg_pnl,
- '总盈亏': total_pnl,
- '舒适评分': win_rate * 0.5 + (avg_pnl / 1000) * 0.5 # 自定义评分
- })
- results_df = pd.DataFrame(results).sort_values('舒适评分', ascending=False)
- print("\n组合条件表现排名:")
- print(results_df.to_string(index=False))
- # 标记前3名为舒适区
- print("\n" + "="*60)
- print("🎯 策略舒适区标记 (Top 3)")
- print("="*60)
- for i, (_, row) in enumerate(results_df.head(3).iterrows(), 1):
- print(f"\n舒适区 #{i}: {row['组合名称']}")
- print(f" 交易次数: {row['交易次数']}笔")
- print(f" 胜率: {row['胜率']:.1f}%")
- print(f" 平均盈亏: {row['平均盈亏']:+,.0f}元")
- print(f" 总盈亏: {row['总盈亏']:+,.0f}元")
- print(f" 舒适评分: {row['舒适评分']:.2f}")
- # 标记警告区(表现差的)
- print("\n" + "="*60)
- print("⚠️ 策略危险区标记 (Bottom 3)")
- print("="*60)
- for i, (_, row) in enumerate(results_df.tail(3).iterrows(), 1):
- print(f"\n危险区 #{i}: {row['组合名称']}")
- print(f" 交易次数: {row['交易次数']}笔")
- print(f" 胜率: {row['胜率']:.1f}%")
- print(f" 平均盈亏: {row['平均盈亏']:+,.0f}元")
- def export_comfort_zones(self, filename='comfort_zones.json'):
- """导出舒适区配置"""
- import json
- comfort_zones = {
- 'best_conditions': [
- {
- 'name': '下跌超卖反弹',
- 'filters': {
- 'trend_mid': '下跌',
- 'rsi_zone': ['超卖', '极度超卖'],
- 'bb_zone': ['下轨极低位', '下轨低位']
- },
- 'description': '中期下跌趋势中,RSI超卖且价格触及布林带下轨'
- },
- {
- 'name': '强趋势低波回调',
- 'filters': {
- 'market_regime': '强趋势低波',
- 'bb_zone': ['下轨中位', '下轨低位']
- },
- 'description': '强趋势低波动环境下,价格回调至布林带下轨区域'
- }
- ],
- 'avoid_conditions': [
- {
- 'name': '极高波动率',
- 'filters': {'volatility_level': '极高'},
- 'description': '极高波动率环境下避免交易'
- }
- ]
- }
- with open(filename, 'w', encoding='utf-8') as f:
- json.dump(comfort_zones, f, ensure_ascii=False, indent=2)
- print(f"\n✅ 舒适区配置已导出: {filename}")
- def main():
- """主程序 - 运行舒适区分析"""
- print("="*80)
- print("CYB50 T+1 策略舒适区分析器")
- print("寻找策略表现较好的市场环境")
- print("="*80)
- # 1. 加载数据
- print("\n【数据加载】")
- df = pd.read_csv('cyb50_30min_2023_to_20260325.csv')
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o': 'Open', 'h': 'High', 'l': 'Low', 'c': 'Close', 'v': 'Volume'}, inplace=True)
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] # 添加缺失的列
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- print(f"数据区间: {df.index[0]} ~ {df.index[-1]}")
- print(f"数据条数: {len(df)}")
- # 2. 运行原策略获取交易记录
- print("\n【运行原策略】")
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- data_with_indicators = fetcher.calculate_intraday_indicators(df)
- signal_generator = DualDirectionSignalGenerator()
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- initial_capital = 1000000
- executor = DualDirectionExecutor(initial_capital=initial_capital)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- # 3. 应用T+1转换
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital)
- print(f"\nT+1交易记录: {len(t1_trades)}笔")
- # 4. 市场环境分析
- print("\n【市场环境特征计算】")
- market_analyzer = MarketEnvironmentAnalyzer(data_with_indicators)
- # 5. 舒适区分析
- comfort_analyzer = ComfortZoneAnalyzer(t1_trades, market_analyzer)
- enriched_trades = comfort_analyzer.analyze()
- # 6. 导出结果
- comfort_analyzer.export_comfort_zones()
- # 7. 导出详细交易记录
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- output_file = f't1_trades_with_environment_{timestamp}.csv'
- enriched_trades.to_csv(output_file, index=False, encoding='utf-8-sig')
- print(f"✅ 详细交易记录已导出: {output_file}")
- print("\n" + "="*80)
- print("舒适区分析完成!")
- print("="*80)
- if __name__ == "__main__":
- main()
|