| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50 T+1 高级算法深挖 - 第四层
- 探索: 动量细节、机器学习特征、市场情绪、资金流向、参数敏感度
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import warnings
- import sys
- warnings.filterwarnings('ignore')
- # Redirect stdout
- from io import StringIO
- old_stdout = sys.stdout
- sys.stdout = StringIO()
- from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor
- from t1_converter import simulate_t1_trades
- def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
- df = pd.read_csv(csv_file)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
- for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- return df
- # 运行回测
- initial_capital = 1000000
- raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- data_with_indicators = fetcher.calculate_intraday_indicators(raw_data)
- signal_generator = DualDirectionSignalGenerator()
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- executor = DualDirectionExecutor(initial_capital=initial_capital)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital)
- # Restore stdout
- sys.stdout = old_stdout
- print('='*80)
- print('创业板50 T+1 高级算法深挖')
- print('='*80)
- # 准备数据
- t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间'])
- t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间'])
- t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0
- t1_trades['开仓年份'] = t1_trades['开仓时间'].dt.year
- t1_trades['开仓小时'] = t1_trades['开仓时间'].dt.hour
- # ========== 深挖1: 动量指标精细化分析 ==========
- print('\n' + '='*80)
- print('【深挖1】动量指标精细化分析')
- print('='*80)
- # 计算更详细的动量指标
- def calculate_advanced_momentum(df):
- """计算高级动量指标"""
- # RSI多种周期
- for period in [6, 14, 21]:
- delta = df['Close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
- rs = gain / loss
- df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
- # MACD多种参数
- for fast, slow, signal in [(12, 26, 9), (8, 21, 5), (5, 35, 5)]:
- exp1 = df['Close'].ewm(span=fast, adjust=False).mean()
- exp2 = df['Close'].ewm(span=slow, adjust=False).mean()
- df[f'MACD_{fast}_{slow}'] = exp1 - exp2
- df[f'MACD_Signal_{fast}_{slow}'] = df[f'MACD_{fast}_{slow}'].ewm(span=signal, adjust=False).mean()
- df[f'MACD_Hist_{fast}_{slow}'] = df[f'MACD_{fast}_{slow}'] - df[f'MACD_Signal_{fast}_{slow}']
- # 价格动量 (不同周期)
- for period in [3, 5, 10, 20]:
- df[f'Momentum_{period}'] = (df['Close'] / df['Close'].shift(period) - 1) * 100
- # 成交量动量
- df['Volume_MA5'] = df['Volume'].rolling(5).mean()
- df['Volume_MA20'] = df['Volume'].rolling(20).mean()
- df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20']
- return df
- data_advanced = calculate_advanced_momentum(data_with_indicators.copy())
- # 合并高级动量数据
- for idx, row in t1_trades.iterrows():
- try:
- mask = data_advanced.index <= row['开仓时间']
- if mask.any():
- current_data = data_advanced.loc[data_advanced.index[mask][-1]]
- t1_trades.loc[idx, 'RSI_6'] = current_data.get('RSI_6', 50)
- t1_trades.loc[idx, 'RSI_14'] = current_data.get('RSI_14', 50)
- t1_trades.loc[idx, 'RSI_21'] = current_data.get('RSI_21', 50)
- t1_trades.loc[idx, 'Momentum_3'] = current_data.get('Momentum_3', 0)
- t1_trades.loc[idx, 'Momentum_5'] = current_data.get('Momentum_5', 0)
- t1_trades.loc[idx, 'Momentum_10'] = current_data.get('Momentum_10', 0)
- t1_trades.loc[idx, 'Volume_Ratio'] = current_data.get('Volume_Ratio', 1)
- except:
- pass
- # RSI精细化分析
- print('\nRSI_14精细化分析:')
- t1_trades['RSI_Level'] = pd.cut(t1_trades['RSI_14'],
- bins=[0, 30, 40, 50, 60, 70, 100],
- labels=['超卖(<30)', '偏弱(30-40)', '中性偏弱(40-50)', '中性偏强(50-60)', '偏强(60-70)', '超买(>70)'])
- rsi_analysis = t1_trades.groupby('RSI_Level', observed=False).agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- rsi_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- rsi_analysis['胜率'] = (rsi_analysis['盈利次数'] / rsi_analysis['交易次数'] * 100).round(1)
- print(rsi_analysis.to_string())
- # 最佳动量组合搜索
- print('\n最佳动量指标组合搜索:')
- momentum_results = []
- for rsi_low in [30, 35, 40, 45]:
- for rsi_high in [55, 60, 65, 70]:
- for mom_min in [-2, -1, 0, 1]:
- mask = (t1_trades['RSI_14'] >= rsi_low) & \
- (t1_trades['RSI_14'] <= rsi_high) & \
- (t1_trades['Momentum_5'] >= mom_min) & \
- (t1_trades['T+1调整'] != '是(T0→T1)')
- filtered = t1_trades[mask]
- if len(filtered) >= 10:
- win_rate = (filtered['盈亏金额'] > 0).mean() * 100
- total_pnl = filtered['盈亏金额'].sum()
- avg_pnl = filtered['盈亏金额'].mean()
- momentum_results.append({
- 'RSI范围': f'{rsi_low}-{rsi_high}',
- '动量≥': mom_min,
- '交易数': len(filtered),
- '胜率': win_rate,
- '总盈亏': total_pnl,
- '平均盈亏': avg_pnl
- })
- mom_df = pd.DataFrame(momentum_results)
- if len(mom_df) > 0:
- print('\n动量组合TOP10 (按总盈亏):')
- print(mom_df.nlargest(10, '总盈亏').to_string(index=False))
- # ========== 深挖2: 成交量与价格关系 ==========
- print('\n' + '='*80)
- print('【深挖2】量价关系分析')
- print('='*80)
- # 量价关系分类
- def classify_price_volume(row):
- """分类量价关系"""
- price_change = row.get('Momentum_3', 0)
- volume_ratio = row.get('Volume_Ratio', 1)
- if price_change > 1 and volume_ratio > 1.2:
- return '价涨量增(健康)'
- elif price_change > 1 and volume_ratio < 0.8:
- return '价涨量缩(背离)'
- elif price_change < -1 and volume_ratio > 1.2:
- return '价跌量增(恐慌)'
- elif price_change < -1 and volume_ratio < 0.8:
- return '价跌量缩(疲软)'
- else:
- return '震荡整理'
- t1_trades['Price_Volume_Pattern'] = t1_trades.apply(classify_price_volume, axis=1)
- pv_analysis = t1_trades.groupby('Price_Volume_Pattern').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- pv_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- pv_analysis['胜率'] = (pv_analysis['盈利次数'] / pv_analysis['交易次数'] * 100).round(1)
- print('\n量价关系分析:')
- print(pv_analysis.to_string())
- # ========== 深挖3: 市场环境分类 ==========
- print('\n' + '='*80)
- print('【深挖3】市场环境自适应策略')
- print('='*80)
- # 定义市场环境
- def classify_market_environment(row):
- """分类市场环境"""
- trend = row.get('Trend_Score', 0)
- vol = row.get('Vol_Regime', '正常')
- rsi = row.get('RSI_14', 50)
- if trend >= 2 and vol != '高波动' and rsi > 50:
- return '牛市趋势'
- elif trend <= -2 and vol != '高波动' and rsi < 50:
- return '熊市趋势'
- elif vol == '高波动':
- return '高波动震荡'
- elif abs(trend) <= 1:
- return '横盘震荡'
- else:
- return '趋势转折'
- t1_trades['Market_Environment'] = t1_trades.apply(classify_market_environment, axis=1)
- env_analysis = t1_trades.groupby('Market_Environment').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- env_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- env_analysis['胜率'] = (env_analysis['盈利次数'] / env_analysis['交易次数'] * 100).round(1)
- print('\n不同市场环境表现:')
- print(env_analysis.to_string())
- # 各环境下的最优策略
- print('\n各市场环境下的最优过滤条件:')
- for env in t1_trades['Market_Environment'].unique():
- if pd.isna(env):
- continue
- env_data = t1_trades[t1_trades['Market_Environment'] == env]
- # 测试不同条件
- best_condition = None
- best_pnl = -999999
- # 条件1: 仅非T+1
- c1 = env_data[env_data['T+1调整'] != '是(T0→T1)']
- if len(c1) > 5 and c1['盈亏金额'].sum() > best_pnl:
- best_pnl = c1['盈亏金额'].sum()
- best_condition = ('非T+1', len(c1), (c1['盈亏金额']>0).mean()*100, c1['盈亏金额'].sum())
- # 条件2: RSI过滤
- c2 = env_data[(env_data['RSI_14'] >= 40) & (env_data['RSI_14'] <= 60)]
- if len(c2) > 5 and c2['盈亏金额'].sum() > best_pnl:
- best_pnl = c2['盈亏金额'].sum()
- best_condition = ('RSI 40-60', len(c2), (c2['盈亏金额']>0).mean()*100, c2['盈亏金额'].sum())
- # 条件3: 动量过滤
- c3 = env_data[env_data['Momentum_5'] >= 0]
- if len(c3) > 5 and c3['盈亏金额'].sum() > best_pnl:
- best_pnl = c3['盈亏金额'].sum()
- best_condition = ('动量≥0', len(c3), (c3['盈亏金额']>0).mean()*100, c3['盈亏金额'].sum())
- if best_condition:
- print(f' {env}: {best_condition[0]} -> {best_condition[1]}笔, 胜率{best_condition[2]:.1f}%, 盈亏{best_condition[3]:+,.0f}元')
- # ========== 深挖4: 参数敏感度分析 ==========
- print('\n' + '='*80)
- print('【深挖4】参数敏感度与稳健性分析')
- print('='*80)
- # 止损止盈参数敏感度
- print('\n止损止盈参数敏感度分析:')
- stop_loss_range = [0.5, 0.8, 1.0, 1.2, 1.5]
- take_profit_range = [1.0, 1.5, 2.0, 2.5, 3.0]
- sensitivity_results = []
- for sl in stop_loss_range:
- for tp in take_profit_range:
- # 模拟不同止损止盈下的盈亏
- simulated_pnl = []
- for idx, row in t1_trades.iterrows():
- actual_pnl_pct = row['盈亏金额'] / initial_capital * 100
- # 如果原交易是止损出局且亏损接近-0.8%
- if -1.0 < actual_pnl_pct < -0.6:
- # 假设放宽止损后可能盈利
- if row['Momentum_5'] > 0: # 有动量支撑
- simulated_pnl.append(abs(row['盈亏金额']) * 0.5) # 假设能挽回50%
- else:
- simulated_pnl.append(row['盈亏金额'] * (sl / 0.8))
- # 如果原交易是止盈出局
- elif actual_pnl_pct > 1.5:
- # 收紧止盈可能减少盈利
- simulated_pnl.append(row['盈亏金额'] * (tp / 2.0))
- else:
- simulated_pnl.append(row['盈亏金额'])
- total_pnl = sum(simulated_pnl)
- sensitivity_results.append({
- '止损': sl,
- '止盈': tp,
- '总盈亏': total_pnl,
- '原盈亏': t1_trades['盈亏金额'].sum()
- })
- sens_df = pd.DataFrame(sensitivity_results)
- print('\n止损止盈参数TOP10组合:')
- print(sens_df.nlargest(10, '总盈亏')[['止损', '止盈', '总盈亏']].to_string(index=False))
- # ========== 深挖5: 交易频率优化 ==========
- print('\n' + '='*80)
- print('【深挖5】交易频率与机会成本分析')
- print('='*80)
- # 分析交易密度
- trade_density = t1_trades.groupby(t1_trades['开仓时间'].dt.date).size()
- print(f'\n交易密度统计:')
- print(f' 平均每日交易: {trade_density.mean():.2f}笔')
- print(f' 最大单日交易: {trade_density.max()}笔')
- print(f' 有交易日占比: {(trade_density > 0).mean()*100:.1f}%')
- # 单日多笔交易的影响
- daily_pnl = t1_trades.groupby(t1_trades['开仓时间'].dt.date)['盈亏金额'].sum()
- daily_count = t1_trades.groupby(t1_trades['开仓时间'].dt.date).size()
- daily_analysis = pd.DataFrame({
- '交易次数': daily_count,
- '日盈亏': daily_pnl
- })
- print('\n单日交易次数与盈亏关系:')
- single_trade_days = daily_analysis[daily_analysis['交易次数'] == 1]
- multi_trade_days = daily_analysis[daily_analysis['交易次数'] > 1]
- print(f' 单日1笔: {len(single_trade_days)}天, 平均日盈亏{single_trade_days["日盈亏"].mean():+,.0f}元')
- print(f' 单日多笔: {len(multi_trade_days)}天, 平均日盈亏{multi_trade_days["日盈亏"].mean():+,.0f}元')
- # 最优每日交易限制
- print('\n不同每日交易上限的效果:')
- for max_daily in [1, 2, 3, 5]:
- # 模拟限制每日交易次数
- limited_trades = []
- for date, group in t1_trades.groupby(t1_trades['开仓时间'].dt.date):
- limited_trades.extend(group.head(max_daily).to_dict('records'))
- if limited_trades:
- limited_df = pd.DataFrame(limited_trades)
- total_pnl = limited_df['盈亏金额'].sum()
- win_rate = (limited_df['盈亏金额'] > 0).mean() * 100
- print(f' 每日最多{max_daily}笔: {len(limited_df)}笔, 胜率{win_rate:.1f}%, 总盈亏{total_pnl:+,.0f}元')
- # ========== 深挖6: 高级复合策略 ==========
- print('\n' + '='*80)
- print('【深挖6】高级复合策略 - 动态自适应')
- print('='*80)
- def advanced_filter(row):
- """高级复合过滤逻辑"""
- score = 0
- reasons = []
- # 动量条件 (核心)
- if row.get('Momentum_5', 0) >= 0:
- score += 3
- reasons.append('动量OK')
- # RSI条件
- rsi = row.get('RSI_14', 50)
- if 40 <= rsi <= 60:
- score += 2
- reasons.append('RSI适中')
- elif rsi > 60:
- score += 1
- reasons.append('RSI偏强')
- # 量价关系
- if row.get('Price_Volume_Pattern') == '价涨量增(健康)':
- score += 2
- reasons.append('量价健康')
- # T+1保护
- if row.get('T+1调整') != '是(T0→T1)':
- score += 2
- reasons.append('非T+1')
- # 时间过滤
- if row.get('开仓小时') != 13:
- score += 1
- reasons.append('时间OK')
- return score, ','.join(reasons)
- t1_trades['Advanced_Score'] = 0
- t1_trades['Advanced_Reasons'] = ''
- for idx, row in t1_trades.iterrows():
- score, reasons = advanced_filter(row)
- t1_trades.loc[idx, 'Advanced_Score'] = score
- t1_trades.loc[idx, 'Advanced_Reasons'] = reasons
- # 高级策略分析
- advanced_analysis = t1_trades.groupby('Advanced_Score').agg({
- '盈亏金额': ['count', 'sum', 'mean'],
- '是否盈利': 'sum'
- }).round(2)
- advanced_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数']
- advanced_analysis['胜率'] = (advanced_analysis['盈利次数'] / advanced_analysis['交易次数'] * 100).round(1)
- print('\n高级复合评分分析:')
- print(advanced_analysis.to_string())
- # 最优阈值
- print('\n不同阈值下的策略表现:')
- for threshold in [5, 6, 7, 8]:
- filtered = t1_trades[t1_trades['Advanced_Score'] >= threshold]
- if len(filtered) > 0:
- win_rate = (filtered['盈亏金额'] > 0).mean() * 100
- total_pnl = filtered['盈亏金额'].sum()
- avg_pnl = filtered['盈亏金额'].mean()
- print(f' 评分≥{threshold}: {len(filtered)}笔, 胜率{win_rate:.1f}%, 总盈亏{total_pnl:+,.0f}元, 平均{avg_pnl:+,.0f}元')
- # ========== 最终推荐 ==========
- print('\n' + '='*80)
- print('【最终推荐】经过深度挖掘的最优策略')
- print('='*80)
- # 找出最佳单一条件
- best_conditions = []
- # 最佳动量条件
- mom_best = t1_trades[(t1_trades['Momentum_5'] >= 0) & (t1_trades['T+1调整'] != '是(T0→T1)')]
- if len(mom_best) > 0:
- best_conditions.append(('动量≥0 + 非T+1', len(mom_best),
- (mom_best['盈亏金额']>0).mean()*100,
- mom_best['盈亏金额'].sum()))
- # 最佳RSI条件
- rsi_best = t1_trades[(t1_trades['RSI_14'] >= 45) & (t1_trades['RSI_14'] <= 55) &
- (t1_trades['T+1调整'] != '是(T0→T1)')]
- if len(rsi_best) > 0:
- best_conditions.append(('RSI 45-55 + 非T+1', len(rsi_best),
- (rsi_best['盈亏金额']>0).mean()*100,
- rsi_best['盈亏金额'].sum()))
- # 最佳量价条件
- pv_best = t1_trades[(t1_trades['Price_Volume_Pattern'] == '价涨量增(健康)') &
- (t1_trades['T+1调整'] != '是(T0→T1)')]
- if len(pv_best) > 0:
- best_conditions.append(('量价健康 + 非T+1', len(pv_best),
- (pv_best['盈亏金额']>0).mean()*100,
- pv_best['盈亏金额'].sum()))
- # 高级评分条件
- adv_best = t1_trades[(t1_trades['Advanced_Score'] >= 6) &
- (t1_trades['T+1调整'] != '是(T0→T1)')]
- if len(adv_best) > 0:
- best_conditions.append(('高级评分≥6 + 非T+1', len(adv_best),
- (adv_best['盈亏金额']>0).mean()*100,
- adv_best['盈亏金额'].sum()))
- print('\n各最优条件对比:')
- print(f"{'条件组合':<25} {'交易次数':>8} {'胜率':>8} {'总盈亏':>12}")
- print('-' * 60)
- for name, count, win_rate, pnl in best_conditions:
- print(f'{name:<25} {count:>8} {win_rate:>7.1f}% {pnl:>+11,.0f}')
- # 终极推荐
- ultimate = t1_trades[
- (t1_trades['Momentum_5'] >= 0) &
- (t1_trades['RSI_14'] >= 40) &
- (t1_trades['RSI_14'] <= 65) &
- (t1_trades['Price_Volume_Pattern'].isin(['价涨量增(健康)', '震荡整理'])) &
- (t1_trades['T+1调整'] != '是(T0→T1)') &
- (t1_trades['开仓小时'] != 13)
- ]
- print(f'\n【终极推荐策略】:')
- print(f' 条件: 动量≥0 + RSI 40-65 + 量价健康/震荡 + 非T+1 + 避开13点')
- print(f' 交易次数: {len(ultimate)}')
- if len(ultimate) > 0:
- print(f' 胜率: {(ultimate["盈亏金额"]>0).mean()*100:.1f}%')
- print(f' 总盈亏: {ultimate["盈亏金额"].sum():+,.0f}元')
- print(f' 平均盈亏: {ultimate["盈亏金额"].mean():+,.0f}元')
- print('\n' + '='*80)
- print('深挖分析完成')
- print('='*80)
|