#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50 T+1 高级算法深挖 - 第四层 探索: 动量细节、机器学习特征、市场情绪、资金流向、参数敏感度 """ import pandas as pd import numpy as np from datetime import datetime, timedelta import warnings import sys warnings.filterwarnings('ignore') # Redirect stdout from io import StringIO old_stdout = sys.stdout sys.stdout = StringIO() from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor from t1_converter import simulate_t1_trades def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'): df = pd.read_csv(csv_file) df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True) for col in ['Open', 'High', 'Low', 'Close', 'Volume']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') df['Returns'] = df['Close'].pct_change() df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] df.ffill(inplace=True) df.dropna(inplace=True) return df # 运行回测 initial_capital = 1000000 raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv') config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) data_with_indicators = fetcher.calculate_intraday_indicators(raw_data) signal_generator = DualDirectionSignalGenerator() signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) executor = DualDirectionExecutor(initial_capital=initial_capital) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades = simulate_t1_trades(data_with_indicators, long_trades, initial_capital) # Restore stdout sys.stdout = old_stdout print('='*80) print('创业板50 T+1 高级算法深挖') print('='*80) # 准备数据 t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间']) t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间']) t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0 t1_trades['开仓年份'] = t1_trades['开仓时间'].dt.year t1_trades['开仓小时'] = t1_trades['开仓时间'].dt.hour # ========== 深挖1: 动量指标精细化分析 ========== print('\n' + '='*80) print('【深挖1】动量指标精细化分析') print('='*80) # 计算更详细的动量指标 def calculate_advanced_momentum(df): """计算高级动量指标""" # RSI多种周期 for period in [6, 14, 21]: delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss df[f'RSI_{period}'] = 100 - (100 / (1 + rs)) # MACD多种参数 for fast, slow, signal in [(12, 26, 9), (8, 21, 5), (5, 35, 5)]: exp1 = df['Close'].ewm(span=fast, adjust=False).mean() exp2 = df['Close'].ewm(span=slow, adjust=False).mean() df[f'MACD_{fast}_{slow}'] = exp1 - exp2 df[f'MACD_Signal_{fast}_{slow}'] = df[f'MACD_{fast}_{slow}'].ewm(span=signal, adjust=False).mean() df[f'MACD_Hist_{fast}_{slow}'] = df[f'MACD_{fast}_{slow}'] - df[f'MACD_Signal_{fast}_{slow}'] # 价格动量 (不同周期) for period in [3, 5, 10, 20]: df[f'Momentum_{period}'] = (df['Close'] / df['Close'].shift(period) - 1) * 100 # 成交量动量 df['Volume_MA5'] = df['Volume'].rolling(5).mean() df['Volume_MA20'] = df['Volume'].rolling(20).mean() df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20'] return df data_advanced = calculate_advanced_momentum(data_with_indicators.copy()) # 合并高级动量数据 for idx, row in t1_trades.iterrows(): try: mask = data_advanced.index <= row['开仓时间'] if mask.any(): current_data = data_advanced.loc[data_advanced.index[mask][-1]] t1_trades.loc[idx, 'RSI_6'] = current_data.get('RSI_6', 50) t1_trades.loc[idx, 'RSI_14'] = current_data.get('RSI_14', 50) t1_trades.loc[idx, 'RSI_21'] = current_data.get('RSI_21', 50) t1_trades.loc[idx, 'Momentum_3'] = current_data.get('Momentum_3', 0) t1_trades.loc[idx, 'Momentum_5'] = current_data.get('Momentum_5', 0) t1_trades.loc[idx, 'Momentum_10'] = current_data.get('Momentum_10', 0) t1_trades.loc[idx, 'Volume_Ratio'] = current_data.get('Volume_Ratio', 1) except: pass # RSI精细化分析 print('\nRSI_14精细化分析:') t1_trades['RSI_Level'] = pd.cut(t1_trades['RSI_14'], bins=[0, 30, 40, 50, 60, 70, 100], labels=['超卖(<30)', '偏弱(30-40)', '中性偏弱(40-50)', '中性偏强(50-60)', '偏强(60-70)', '超买(>70)']) rsi_analysis = t1_trades.groupby('RSI_Level', observed=False).agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) rsi_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] rsi_analysis['胜率'] = (rsi_analysis['盈利次数'] / rsi_analysis['交易次数'] * 100).round(1) print(rsi_analysis.to_string()) # 最佳动量组合搜索 print('\n最佳动量指标组合搜索:') momentum_results = [] for rsi_low in [30, 35, 40, 45]: for rsi_high in [55, 60, 65, 70]: for mom_min in [-2, -1, 0, 1]: mask = (t1_trades['RSI_14'] >= rsi_low) & \ (t1_trades['RSI_14'] <= rsi_high) & \ (t1_trades['Momentum_5'] >= mom_min) & \ (t1_trades['T+1调整'] != '是(T0→T1)') filtered = t1_trades[mask] if len(filtered) >= 10: win_rate = (filtered['盈亏金额'] > 0).mean() * 100 total_pnl = filtered['盈亏金额'].sum() avg_pnl = filtered['盈亏金额'].mean() momentum_results.append({ 'RSI范围': f'{rsi_low}-{rsi_high}', '动量≥': mom_min, '交易数': len(filtered), '胜率': win_rate, '总盈亏': total_pnl, '平均盈亏': avg_pnl }) mom_df = pd.DataFrame(momentum_results) if len(mom_df) > 0: print('\n动量组合TOP10 (按总盈亏):') print(mom_df.nlargest(10, '总盈亏').to_string(index=False)) # ========== 深挖2: 成交量与价格关系 ========== print('\n' + '='*80) print('【深挖2】量价关系分析') print('='*80) # 量价关系分类 def classify_price_volume(row): """分类量价关系""" price_change = row.get('Momentum_3', 0) volume_ratio = row.get('Volume_Ratio', 1) if price_change > 1 and volume_ratio > 1.2: return '价涨量增(健康)' elif price_change > 1 and volume_ratio < 0.8: return '价涨量缩(背离)' elif price_change < -1 and volume_ratio > 1.2: return '价跌量增(恐慌)' elif price_change < -1 and volume_ratio < 0.8: return '价跌量缩(疲软)' else: return '震荡整理' t1_trades['Price_Volume_Pattern'] = t1_trades.apply(classify_price_volume, axis=1) pv_analysis = t1_trades.groupby('Price_Volume_Pattern').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) pv_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] pv_analysis['胜率'] = (pv_analysis['盈利次数'] / pv_analysis['交易次数'] * 100).round(1) print('\n量价关系分析:') print(pv_analysis.to_string()) # ========== 深挖3: 市场环境分类 ========== print('\n' + '='*80) print('【深挖3】市场环境自适应策略') print('='*80) # 定义市场环境 def classify_market_environment(row): """分类市场环境""" trend = row.get('Trend_Score', 0) vol = row.get('Vol_Regime', '正常') rsi = row.get('RSI_14', 50) if trend >= 2 and vol != '高波动' and rsi > 50: return '牛市趋势' elif trend <= -2 and vol != '高波动' and rsi < 50: return '熊市趋势' elif vol == '高波动': return '高波动震荡' elif abs(trend) <= 1: return '横盘震荡' else: return '趋势转折' t1_trades['Market_Environment'] = t1_trades.apply(classify_market_environment, axis=1) env_analysis = t1_trades.groupby('Market_Environment').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) env_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] env_analysis['胜率'] = (env_analysis['盈利次数'] / env_analysis['交易次数'] * 100).round(1) print('\n不同市场环境表现:') print(env_analysis.to_string()) # 各环境下的最优策略 print('\n各市场环境下的最优过滤条件:') for env in t1_trades['Market_Environment'].unique(): if pd.isna(env): continue env_data = t1_trades[t1_trades['Market_Environment'] == env] # 测试不同条件 best_condition = None best_pnl = -999999 # 条件1: 仅非T+1 c1 = env_data[env_data['T+1调整'] != '是(T0→T1)'] if len(c1) > 5 and c1['盈亏金额'].sum() > best_pnl: best_pnl = c1['盈亏金额'].sum() best_condition = ('非T+1', len(c1), (c1['盈亏金额']>0).mean()*100, c1['盈亏金额'].sum()) # 条件2: RSI过滤 c2 = env_data[(env_data['RSI_14'] >= 40) & (env_data['RSI_14'] <= 60)] if len(c2) > 5 and c2['盈亏金额'].sum() > best_pnl: best_pnl = c2['盈亏金额'].sum() best_condition = ('RSI 40-60', len(c2), (c2['盈亏金额']>0).mean()*100, c2['盈亏金额'].sum()) # 条件3: 动量过滤 c3 = env_data[env_data['Momentum_5'] >= 0] if len(c3) > 5 and c3['盈亏金额'].sum() > best_pnl: best_pnl = c3['盈亏金额'].sum() best_condition = ('动量≥0', len(c3), (c3['盈亏金额']>0).mean()*100, c3['盈亏金额'].sum()) if best_condition: print(f' {env}: {best_condition[0]} -> {best_condition[1]}笔, 胜率{best_condition[2]:.1f}%, 盈亏{best_condition[3]:+,.0f}元') # ========== 深挖4: 参数敏感度分析 ========== print('\n' + '='*80) print('【深挖4】参数敏感度与稳健性分析') print('='*80) # 止损止盈参数敏感度 print('\n止损止盈参数敏感度分析:') stop_loss_range = [0.5, 0.8, 1.0, 1.2, 1.5] take_profit_range = [1.0, 1.5, 2.0, 2.5, 3.0] sensitivity_results = [] for sl in stop_loss_range: for tp in take_profit_range: # 模拟不同止损止盈下的盈亏 simulated_pnl = [] for idx, row in t1_trades.iterrows(): actual_pnl_pct = row['盈亏金额'] / initial_capital * 100 # 如果原交易是止损出局且亏损接近-0.8% if -1.0 < actual_pnl_pct < -0.6: # 假设放宽止损后可能盈利 if row['Momentum_5'] > 0: # 有动量支撑 simulated_pnl.append(abs(row['盈亏金额']) * 0.5) # 假设能挽回50% else: simulated_pnl.append(row['盈亏金额'] * (sl / 0.8)) # 如果原交易是止盈出局 elif actual_pnl_pct > 1.5: # 收紧止盈可能减少盈利 simulated_pnl.append(row['盈亏金额'] * (tp / 2.0)) else: simulated_pnl.append(row['盈亏金额']) total_pnl = sum(simulated_pnl) sensitivity_results.append({ '止损': sl, '止盈': tp, '总盈亏': total_pnl, '原盈亏': t1_trades['盈亏金额'].sum() }) sens_df = pd.DataFrame(sensitivity_results) print('\n止损止盈参数TOP10组合:') print(sens_df.nlargest(10, '总盈亏')[['止损', '止盈', '总盈亏']].to_string(index=False)) # ========== 深挖5: 交易频率优化 ========== print('\n' + '='*80) print('【深挖5】交易频率与机会成本分析') print('='*80) # 分析交易密度 trade_density = t1_trades.groupby(t1_trades['开仓时间'].dt.date).size() print(f'\n交易密度统计:') print(f' 平均每日交易: {trade_density.mean():.2f}笔') print(f' 最大单日交易: {trade_density.max()}笔') print(f' 有交易日占比: {(trade_density > 0).mean()*100:.1f}%') # 单日多笔交易的影响 daily_pnl = t1_trades.groupby(t1_trades['开仓时间'].dt.date)['盈亏金额'].sum() daily_count = t1_trades.groupby(t1_trades['开仓时间'].dt.date).size() daily_analysis = pd.DataFrame({ '交易次数': daily_count, '日盈亏': daily_pnl }) print('\n单日交易次数与盈亏关系:') single_trade_days = daily_analysis[daily_analysis['交易次数'] == 1] multi_trade_days = daily_analysis[daily_analysis['交易次数'] > 1] print(f' 单日1笔: {len(single_trade_days)}天, 平均日盈亏{single_trade_days["日盈亏"].mean():+,.0f}元') print(f' 单日多笔: {len(multi_trade_days)}天, 平均日盈亏{multi_trade_days["日盈亏"].mean():+,.0f}元') # 最优每日交易限制 print('\n不同每日交易上限的效果:') for max_daily in [1, 2, 3, 5]: # 模拟限制每日交易次数 limited_trades = [] for date, group in t1_trades.groupby(t1_trades['开仓时间'].dt.date): limited_trades.extend(group.head(max_daily).to_dict('records')) if limited_trades: limited_df = pd.DataFrame(limited_trades) total_pnl = limited_df['盈亏金额'].sum() win_rate = (limited_df['盈亏金额'] > 0).mean() * 100 print(f' 每日最多{max_daily}笔: {len(limited_df)}笔, 胜率{win_rate:.1f}%, 总盈亏{total_pnl:+,.0f}元') # ========== 深挖6: 高级复合策略 ========== print('\n' + '='*80) print('【深挖6】高级复合策略 - 动态自适应') print('='*80) def advanced_filter(row): """高级复合过滤逻辑""" score = 0 reasons = [] # 动量条件 (核心) if row.get('Momentum_5', 0) >= 0: score += 3 reasons.append('动量OK') # RSI条件 rsi = row.get('RSI_14', 50) if 40 <= rsi <= 60: score += 2 reasons.append('RSI适中') elif rsi > 60: score += 1 reasons.append('RSI偏强') # 量价关系 if row.get('Price_Volume_Pattern') == '价涨量增(健康)': score += 2 reasons.append('量价健康') # T+1保护 if row.get('T+1调整') != '是(T0→T1)': score += 2 reasons.append('非T+1') # 时间过滤 if row.get('开仓小时') != 13: score += 1 reasons.append('时间OK') return score, ','.join(reasons) t1_trades['Advanced_Score'] = 0 t1_trades['Advanced_Reasons'] = '' for idx, row in t1_trades.iterrows(): score, reasons = advanced_filter(row) t1_trades.loc[idx, 'Advanced_Score'] = score t1_trades.loc[idx, 'Advanced_Reasons'] = reasons # 高级策略分析 advanced_analysis = t1_trades.groupby('Advanced_Score').agg({ '盈亏金额': ['count', 'sum', 'mean'], '是否盈利': 'sum' }).round(2) advanced_analysis.columns = ['交易次数', '总盈亏', '平均盈亏', '盈利次数'] advanced_analysis['胜率'] = (advanced_analysis['盈利次数'] / advanced_analysis['交易次数'] * 100).round(1) print('\n高级复合评分分析:') print(advanced_analysis.to_string()) # 最优阈值 print('\n不同阈值下的策略表现:') for threshold in [5, 6, 7, 8]: filtered = t1_trades[t1_trades['Advanced_Score'] >= threshold] if len(filtered) > 0: win_rate = (filtered['盈亏金额'] > 0).mean() * 100 total_pnl = filtered['盈亏金额'].sum() avg_pnl = filtered['盈亏金额'].mean() print(f' 评分≥{threshold}: {len(filtered)}笔, 胜率{win_rate:.1f}%, 总盈亏{total_pnl:+,.0f}元, 平均{avg_pnl:+,.0f}元') # ========== 最终推荐 ========== print('\n' + '='*80) print('【最终推荐】经过深度挖掘的最优策略') print('='*80) # 找出最佳单一条件 best_conditions = [] # 最佳动量条件 mom_best = t1_trades[(t1_trades['Momentum_5'] >= 0) & (t1_trades['T+1调整'] != '是(T0→T1)')] if len(mom_best) > 0: best_conditions.append(('动量≥0 + 非T+1', len(mom_best), (mom_best['盈亏金额']>0).mean()*100, mom_best['盈亏金额'].sum())) # 最佳RSI条件 rsi_best = t1_trades[(t1_trades['RSI_14'] >= 45) & (t1_trades['RSI_14'] <= 55) & (t1_trades['T+1调整'] != '是(T0→T1)')] if len(rsi_best) > 0: best_conditions.append(('RSI 45-55 + 非T+1', len(rsi_best), (rsi_best['盈亏金额']>0).mean()*100, rsi_best['盈亏金额'].sum())) # 最佳量价条件 pv_best = t1_trades[(t1_trades['Price_Volume_Pattern'] == '价涨量增(健康)') & (t1_trades['T+1调整'] != '是(T0→T1)')] if len(pv_best) > 0: best_conditions.append(('量价健康 + 非T+1', len(pv_best), (pv_best['盈亏金额']>0).mean()*100, pv_best['盈亏金额'].sum())) # 高级评分条件 adv_best = t1_trades[(t1_trades['Advanced_Score'] >= 6) & (t1_trades['T+1调整'] != '是(T0→T1)')] if len(adv_best) > 0: best_conditions.append(('高级评分≥6 + 非T+1', len(adv_best), (adv_best['盈亏金额']>0).mean()*100, adv_best['盈亏金额'].sum())) print('\n各最优条件对比:') print(f"{'条件组合':<25} {'交易次数':>8} {'胜率':>8} {'总盈亏':>12}") print('-' * 60) for name, count, win_rate, pnl in best_conditions: print(f'{name:<25} {count:>8} {win_rate:>7.1f}% {pnl:>+11,.0f}') # 终极推荐 ultimate = t1_trades[ (t1_trades['Momentum_5'] >= 0) & (t1_trades['RSI_14'] >= 40) & (t1_trades['RSI_14'] <= 65) & (t1_trades['Price_Volume_Pattern'].isin(['价涨量增(健康)', '震荡整理'])) & (t1_trades['T+1调整'] != '是(T0→T1)') & (t1_trades['开仓小时'] != 13) ] print(f'\n【终极推荐策略】:') print(f' 条件: 动量≥0 + RSI 40-65 + 量价健康/震荡 + 非T+1 + 避开13点') print(f' 交易次数: {len(ultimate)}') if len(ultimate) > 0: print(f' 胜率: {(ultimate["盈亏金额"]>0).mean()*100:.1f}%') print(f' 总盈亏: {ultimate["盈亏金额"].sum():+,.0f}元') print(f' 平均盈亏: {ultimate["盈亏金额"].mean():+,.0f}元') print('\n' + '='*80) print('深挖分析完成') print('='*80)