import pandas as pd import numpy as np import itertools import json import time from datetime import datetime, timedelta from typing import Dict, List, Tuple, Any from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock import warnings import sys import io warnings.filterwarnings('ignore') # 导入原有策略模块 from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor, validate_dual_direction_results ) class ParameterOptimizer: """参数优化器 - 网格搜索最优参数组合""" def __init__(self, config_file='config.json', max_workers=4): self.config_manager = ConfigManager(config_file) self.optimization_results = [] self.best_params = None self.best_score = None self.max_workers = max_workers self.results_lock = Lock() # 线程锁,保护共享数据 def define_parameter_grid(self) -> Dict[str, List]: """定义参数搜索网格 - 精简优化版本""" parameter_grid = { # 核心交易参数(重点优化) 'position_size_pct': [0.7, 0.85, 1.0], 'stop_loss_pct': [0.006, 0.008, 0.010, 0.012], 'take_profit_pct': [0.012, 0.015, 0.018, 0.022], 'max_hold_bars': [14, 16, 18, 22], 'min_signal_strength': [3, 4, 5], # RSI参数(精简) 'rsi_oversold': [25, 30], 'rsi_overbought': [70, 75], # KDJ参数(精简) 'kdj_oversold': [20], 'kdj_overbought': [80], # 成交量参数(精简) 'volume_ratio_threshold': [1.2], # 布林带参数(精简) 'bb_upper_threshold': [0.995], 'bb_lower_threshold': [1.005], # MACD参数(固定) 'macd_fast': [12], 'macd_slow': [26], # 连续涨跌参数(精简) 'consecutive_bars': [4], 'consecutive_change_pct': [0.015] } return parameter_grid def generate_parameter_combinations(self, parameter_grid: Dict[str, List]) -> List[Dict[str, Any]]: """生成所有参数组合""" param_names = list(parameter_grid.keys()) param_values = list(parameter_grid.values()) total_combinations = 1 for values in param_values: total_combinations *= len(values) print(f"参数网格总组合数: {total_combinations:,}") print(f"参数维度: {len(param_names)}") print("\n参数搜索范围:") for name, values in parameter_grid.items(): print(f" {name}: {values}") # 生成所有组合 combinations = [] for combination in itertools.product(*param_values): params = dict(zip(param_names, combination)) combinations.append(params) return combinations def calculate_performance_score(self, trades_df: pd.DataFrame, results_df: pd.DataFrame, initial_capital: float) -> Dict[str, float]: """计算综合性能得分""" if len(trades_df) == 0: return { 'total_return': -100, 'sharpe_ratio': -999, 'max_drawdown': -100, 'win_rate': 0, 'profit_factor': 0, 'total_trades': 0, 'composite_score': -9999 } # 基础指标 final_capital = results_df['net_value'].iloc[-1] total_return = (final_capital - initial_capital) / initial_capital * 100 # 夏普比率计算 returns = results_df['net_value'].pct_change().dropna() sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252 * 16) if returns.std() > 0 else 0 # 最大回撤 cumulative_returns = (1 + returns).cumprod() running_max = cumulative_returns.expanding().max() drawdown = (cumulative_returns - running_max) / running_max max_drawdown = drawdown.min() * 100 # 胜率 winning_trades = trades_df[trades_df['盈亏金额'] > 0] losing_trades = trades_df[trades_df['盈亏金额'] < 0] win_rate = len(winning_trades) / len(trades_df) * 100 if len(trades_df) > 0 else 0 # 盈亏比 avg_win = winning_trades['盈亏金额'].mean() if len(winning_trades) > 0 else 0 avg_loss = abs(losing_trades['盈亏金额'].mean()) if len(losing_trades) > 0 else 1 profit_factor = avg_win / avg_loss if avg_loss > 0 else 0 # 综合得分计算 (可自定义权重) composite_score = ( total_return * 0.3 + # 30%权重收益率 sharpe_ratio * 10 + # 夏普比率权重 max_drawdown * 0.2 + # 20%权重回撤控制 win_rate * 0.2 + # 20%权重胜率 profit_factor * 5 # 盈亏比权重 ) return { 'total_return': total_return, 'sharpe_ratio': sharpe_ratio, 'max_drawdown': max_drawdown, 'win_rate': win_rate, 'profit_factor': profit_factor, 'total_trades': len(trades_df), 'composite_score': composite_score } def run_single_backtest(self, params: Dict[str, Any], backtest_data: pd.DataFrame, initial_capital: float) -> Tuple[bool, Dict, Any]: """运行单次回测 - 静默模式(使用预加载数据)""" try: # 创建自定义信号生成器和执行器 signal_gen = CustomSignalGenerator(params, silent_mode=True) executor = CustomExecutor(params, initial_capital, silent_mode=True) if len(backtest_data) < 50: return False, {'error': '数据不足'}, None, None # 生成信号 signals_df = signal_gen.generate_dual_direction_signals(backtest_data) # 执行交易 results_df, trades_df = executor.execute_dual_direction_trades(signals_df) # 计算性能指标 performance = self.calculate_performance_score(trades_df, results_df, initial_capital) return True, performance, trades_df, results_df except Exception as e: return False, {'error': str(e)}, None, None def run_single_backtest_thread(self, params: Dict[str, Any], backtest_data: pd.DataFrame, initial_capital: float) -> Tuple[bool, Dict, Any]: """线程安全的单次回测(使用预加载数据)""" return self.run_single_backtest(params, backtest_data, initial_capital) def run_optimization(self, max_iterations: int = None, time_limit: int = 3600): """运行参数优化 - 多线程版本""" print("=" * 80) print("创业板50策略参数优化 - 多线程网格搜索") print("=" * 80) # 获取回测配置 BACKTEST_START_DATE = self.config_manager.get('strategy', 'backtest_start_date', "2025-10-01") backtest_end_config = self.config_manager.get('strategy', 'backtest_end_date', "now") BACKTEST_END_DATE = datetime.now().strftime('%Y-%m-%d') if backtest_end_config.lower() == "now" else backtest_end_config INITIAL_CAPITAL = self.config_manager.get('strategy', 'initial_capital', 1000000) start_date = datetime.strptime(BACKTEST_START_DATE, "%Y-%m-%d") end_date = datetime.strptime(BACKTEST_END_DATE, "%Y-%m-%d").replace(hour=23, minute=59, second=59) print(f"回测期间: {BACKTEST_START_DATE} 至 {BACKTEST_END_DATE}") print(f"初始资金: {INITIAL_CAPITAL:,}元") print(f"并发线程数: {self.max_workers}") # 预加载数据(只加载一次) print(f"\n预加载回测数据...") # 重定向标准输出来控制数据加载时的混乱输出 old_stdout = sys.stdout sys.stdout = io.StringIO() try: fetcher = IntradayDataFetcher(self.config_manager) prewamp_days = self.config_manager.get('strategy', 'prewamp_days', 30) data_start_date = start_date - timedelta(days=prewamp_days) full_data = fetcher.fetch_30min_data(start_date=data_start_date, end_date=end_date) full_data = fetcher.calculate_intraday_indicators(full_data) backtest_data = full_data[(full_data.index >= start_date) & (full_data.index <= end_date)].copy() finally: # 恢复标准输出 sys.stdout = old_stdout print(f"数据预加载完成: {len(backtest_data)}条数据") print(f"数据范围: {backtest_data.index[0]} 到 {backtest_data.index[-1]}") # 生成参数组合 parameter_grid = self.define_parameter_grid() all_combinations = self.generate_parameter_combinations(parameter_grid) # 限制迭代次数 if max_iterations and max_iterations < len(all_combinations): import random all_combinations = random.sample(all_combinations, max_iterations) print(f"随机选择 {max_iterations} 个组合进行测试") # 运行优化 start_time = time.time() successful_tests = 0 failed_tests = 0 last_progress_time = start_time print(f"\n开始多线程网格搜索...") print(f"时间限制: {time_limit}秒 ({time_limit/3600:.1f}小时)" if time_limit else "无时间限制") print(f"预期总时间: {len(all_combinations)/self.max_workers * 2:.0f}-{len(all_combinations)/self.max_workers * 5:.0f}分钟") # 使用线程池执行 with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 提交所有任务 future_to_params = {} for i, params in enumerate(all_combinations): future = executor.submit( self.run_single_backtest_thread, params, backtest_data, INITIAL_CAPITAL ) future_to_params[future] = (i, params) # 控制提交速度,避免内存占用过大 if len(future_to_params) >= 100: # 最多同时处理100个任务 # 等待一些任务完成 completed_futures = [] for future in as_completed(list(future_to_params.keys())): completed_futures.append(future) if len(completed_futures) >= 50: # 完成一半后继续提交 break # 处理完成的任务 for future in completed_futures: i, params = future_to_params[future] try: success, performance, trades_df, results_df = future.result() with self.results_lock: # 使用锁保护共享数据 if success: successful_tests += 1 result = { 'params': params.copy(), 'performance': performance, 'trades_count': len(trades_df) if trades_df is not None else 0 } self.optimization_results.append(result) # 更新最优参数 if self.best_score is None or performance['composite_score'] > self.best_score: self.best_score = performance['composite_score'] self.best_params = params.copy() self.best_performance = performance print(f" 🎯 发现更优参数! 综合得分: {performance['composite_score']:.2f}") else: failed_tests += 1 # 增加进度打印频率 total_tests = successful_tests + failed_tests current_time = time.time() if total_tests % 5 == 0 or (current_time - last_progress_time) > 30: # 每5个测试或每30秒打印一次 elapsed = current_time - start_time progress = total_tests / len(all_combinations) * 100 avg_time_per_test = elapsed / total_tests remaining_tests = len(all_combinations) - total_tests eta_seconds = remaining_tests * avg_time_per_test / self.max_workers print(f"进度: {total_tests}/{len(all_combinations)} ({progress:.1f}%) | " f"成功: {successful_tests} 失败: {failed_tests} | " f"已用时: {elapsed/60:.1f}分钟 | " f"预计剩余: {eta_seconds/60:.1f}分钟") last_progress_time = current_time # 检查时间限制 if time_limit and (current_time - start_time) > time_limit: print(f"\n⏰ 达到时间限制 {time_limit/3600:.1f}小时,停止优化") executor.shutdown(wait=False) break except Exception as e: with self.results_lock: failed_tests += 1 print(f"回测异常: {e}") # 移除已处理的future del future_to_params[future] # 处理剩余的任务 for future in as_completed(future_to_params.keys()): i, params = future_to_params[future] try: success, performance, trades_df, results_df = future.result() with self.results_lock: if success: successful_tests += 1 result = { 'params': params.copy(), 'performance': performance, 'trades_count': len(trades_df) if trades_df is not None else 0 } self.optimization_results.append(result) if self.best_score is None or performance['composite_score'] > self.best_score: self.best_score = performance['composite_score'] self.best_params = params.copy() self.best_performance = performance print(f" 🎯 发现更优参数! 综合得分: {performance['composite_score']:.2f}") else: failed_tests += 1 # 增加进度打印 total_tests = successful_tests + failed_tests current_time = time.time() if total_tests % 5 == 0 or (current_time - last_progress_time) > 30: elapsed = current_time - start_time progress = total_tests / len(all_combinations) * 100 avg_time_per_test = elapsed / total_tests remaining_tests = len(all_combinations) - total_tests eta_seconds = remaining_tests * avg_time_per_test / self.max_workers print(f"进度: {total_tests}/{len(all_combinations)} ({progress:.1f}%) | " f"成功: {successful_tests} 失败: {failed_tests} | " f"已用时: {elapsed/60:.1f}分钟 | " f"预计剩余: {eta_seconds/60:.1f}分钟") last_progress_time = current_time except Exception as e: with self.results_lock: failed_tests += 1 print(f"回测异常: {e}") elapsed_time = time.time() - start_time # 输出优化结果 print(f"\n优化完成!") print(f"总测试次数: {len(all_combinations)}") print(f"成功测试: {successful_tests}") print(f"失败测试: {failed_tests}") print(f"总用时: {elapsed_time:.1f}秒") print(f"平均每次测试: {elapsed_time/len(all_combinations):.2f}秒") print(f"线程并发效果: 理论加速比 {self.max_workers}x,实际加速比 {elapsed_time/(elapsed_time/self.max_workers):.1f}x") # 分析结果 self.analyze_results() def analyze_results(self): """分析优化结果""" if not self.optimization_results: print("没有成功的优化结果") return print("\n" + "=" * 80) print("优化结果分析") print("=" * 80) # 转换为DataFrame便于分析 results_data = [] for result in self.optimization_results: row = result['params'].copy() row.update(result['performance']) results_data.append(row) results_df = pd.DataFrame(results_data) # 排序找到最优参数 top_results = results_df.nlargest(10, 'composite_score') print(f"\n🏆 TOP 10 最优参数组合:") print("-" * 80) for i, (_, row) in enumerate(top_results.iterrows(), 1): print(f"\n第{i}名 (综合得分: {row['composite_score']:.2f})") print(f" 收益率: {row['total_return']:.2f}%") print(f" 夏普比率: {row['sharpe_ratio']:.2f}") print(f" 最大回撤: {row['max_drawdown']:.2f}%") print(f" 胜率: {row['win_rate']:.1f}%") print(f" 盈亏比: {row['profit_factor']:.2f}") print(f" 交易次数: {int(row['total_trades'])}") print(f" 关键参数:") print(f" 仓位比例: {row['position_size_pct']:.1f}") print(f" 止损: {row['stop_loss_pct']*100:.1f}%") print(f" 止盈: {row['take_profit_pct']*100:.1f}%") print(f" 最大持仓: {int(row['max_hold_bars'])}周期") print(f" 信号强度: {int(row['min_signal_strength'])}") print(f" RSI超卖/超买: {int(row['rsi_oversold'])}/{int(row['rsi_overbought'])}") print(f" KDJ超卖/超买: {int(row['kdj_oversold'])}/{int(row['kdj_overbought'])}") # 参数敏感性分析 print(f"\n📊 参数敏感性分析:") print("-" * 80) key_params = ['position_size_pct', 'stop_loss_pct', 'take_profit_pct', 'max_hold_bars'] for param in key_params: if param in results_df.columns: param_analysis = results_df.groupby(param)['composite_score'].agg(['mean', 'std', 'min', 'max']) print(f"\n{param}:") for value, stats in param_analysis.iterrows(): print(f" {value}: 平均得分 {stats['mean']:.2f} (±{stats['std']:.2f})") # 保存结果 self.save_optimization_results(results_df) def save_optimization_results(self, results_df: pd.DataFrame): """保存优化结果""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_file = f'cyb50_optimization_results_{timestamp}.csv' # 按综合得分排序 results_df = results_df.sort_values('composite_score', ascending=False) results_df.to_csv(output_file, index=False, encoding='utf-8-sig') print(f"\n💾 优化结果已保存到: {output_file}") # 保存最优参数到配置文件 if self.best_params: best_params_file = 'best_parameters.json' with open(best_params_file, 'w', encoding='utf-8') as f: json.dump({ 'best_params': self.best_params, 'best_performance': self.best_performance, 'optimization_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') }, f, indent=2, ensure_ascii=False) print(f"💾 最优参数已保存到: {best_params_file}") class CustomSignalGenerator(DualDirectionSignalGenerator): """自定义信号生成器 - 支持动态参数""" def __init__(self, params: Dict[str, Any], silent_mode=False): super().__init__() self.params = params self.silent_mode = silent_mode def _calculate_long_signals(self, current_bar, df, i): """计算做多信号强度 - 使用自定义参数""" long_score = 0 long_signals = [] # RSI超卖做多 rsi_oversold = self.params.get('rsi_oversold', 30) if current_bar['RSI'] < rsi_oversold: long_score += 2 long_signals.append(f"RSI超卖(<{rsi_oversold})") elif current_bar['RSI'] < rsi_oversold + 5: long_score += 1 long_signals.append("RSI偏弱") # KDJ超卖做多 kdj_oversold = self.params.get('kdj_oversold', 20) if current_bar['K'] < kdj_oversold and current_bar['D'] < kdj_oversold: long_score += 2 long_signals.append(f"KDJ超卖(<{kdj_oversold})") elif current_bar['J'] < 0: long_score += 1 long_signals.append("KDJ极端超卖") # MACD金叉 macd_fast = self.params.get('macd_fast', 12) macd_slow = self.params.get('macd_slow', 26) if current_bar['MACD_hist'] > 0 and df.iloc[i-1]['MACD_hist'] <= 0: long_score += 2 long_signals.append("MACD金叉") elif current_bar['MACD_hist'] > df.iloc[i-1]['MACD_hist']: long_score += 1 long_signals.append("MACD改善") # 价格触及布林带下轨 bb_lower_threshold = self.params.get('bb_lower_threshold', 1.005) if current_bar['Close'] <= current_bar['BB_lower'] * bb_lower_threshold: long_score += 2 long_signals.append("触及下轨") elif current_bar['Close'] <= current_bar['BB_lower'] * (bb_lower_threshold + 0.005): long_score += 1 long_signals.append("接近下轨") # 连续下跌后的反转 consecutive_bars = self.params.get('consecutive_bars', 4) consecutive_change_pct = self.params.get('consecutive_change_pct', 0.015) recent_returns = df.iloc[i-6:i]['Returns'] if recent_returns.min() < -consecutive_change_pct: consecutive_decline = sum(recent_returns < 0) if consecutive_decline >= consecutive_bars: long_score += 2 long_signals.append("连续下跌反转") # 成交量配合 volume_ratio_threshold = self.params.get('volume_ratio_threshold', 1.2) if current_bar['Volume_Ratio'] > volume_ratio_threshold: long_score += 1 long_signals.append("放量配合") # 当日开盘价格关系 try: daily_high = df[df.index.date == df.index[i].date()]['High'].max() daily_low = df[df.index.date == df.index[i].date()]['Low'].min() daily_range = daily_high - daily_low if daily_range > 0: position_in_day = (current_bar['Close'] - daily_low) / daily_range if position_in_day < 0.3: long_score += 1 long_signals.append("日内低位") except: pass # MA趋势过滤 if current_bar['MA6'] < current_bar['MA12'] < current_bar['MA24']: long_score -= 1 long_signals.append("MA下降趋势惩罚") elif current_bar['MA6'] > current_bar['MA12']: long_score += 1 long_signals.append("MA短期上行") return long_score, long_signals def _calculate_short_signals(self, current_bar, df, i): """计算做空信号强度 - 使用自定义参数""" short_score = 0 short_signals = [] # RSI超买做空 rsi_overbought = self.params.get('rsi_overbought', 70) if current_bar['RSI'] > rsi_overbought: short_score += 2 short_signals.append(f"RSI超买(>{rsi_overbought})") elif current_bar['RSI'] > rsi_overbought - 5: short_score += 1 short_signals.append("RSI偏强") # KDJ超买做空 kdj_overbought = self.params.get('kdj_overbought', 80) if current_bar['K'] > kdj_overbought and current_bar['D'] > kdj_overbought: short_score += 2 short_signals.append(f"KDJ超买(>{kdj_overbought})") elif current_bar['J'] > 100: short_score += 1 short_signals.append("KDJ极端超买") # MACD死叉 if current_bar['MACD_hist'] < 0 and df.iloc[i-1]['MACD_hist'] >= 0: short_score += 2 short_signals.append("MACD死叉") elif current_bar['MACD_hist'] < df.iloc[i-1]['MACD_hist']: short_score += 1 short_signals.append("MACD恶化") # 价格触及布林带上轨 bb_upper_threshold = self.params.get('bb_upper_threshold', 0.995) if current_bar['Close'] >= current_bar['BB_upper'] * bb_upper_threshold: short_score += 2 short_signals.append("触及上轨") elif current_bar['Close'] >= current_bar['BB_upper'] * (bb_upper_threshold - 0.005): short_score += 1 short_signals.append("接近上轨") # 连续上涨后的反转 consecutive_bars = self.params.get('consecutive_bars', 4) consecutive_change_pct = self.params.get('consecutive_change_pct', 0.015) recent_returns = df.iloc[i-6:i]['Returns'] if recent_returns.max() > consecutive_change_pct: consecutive_rise = sum(recent_returns > 0) if consecutive_rise >= consecutive_bars: short_score += 2 short_signals.append("连续上涨反转") # 成交量配合 volume_ratio_threshold = self.params.get('volume_ratio_threshold', 1.2) if current_bar['Volume_Ratio'] > volume_ratio_threshold: short_score += 1 short_signals.append("放量配合") # 当日开盘价格关系 try: daily_high = df[df.index.date == df.index[i].date()]['High'].max() daily_low = df[df.index.date == df.index[i].date()]['Low'].min() daily_range = daily_high - daily_low if daily_range > 0: position_in_day = (current_bar['Close'] - daily_low) / daily_range if position_in_day > 0.7: short_score += 1 short_signals.append("日内高位") except: pass # MA趋势过滤 if current_bar['MA6'] > current_bar['MA12'] > current_bar['MA24']: short_score -= 1 short_signals.append("MA上升趋势惩罚") elif current_bar['MA6'] < current_bar['MA12']: short_score += 1 short_signals.append("MA短期下行") return short_score, short_signals def generate_dual_direction_signals(self, data: pd.DataFrame) -> pd.DataFrame: """生成多空双向信号 - 静默模式""" if not self.silent_mode: print("正在生成多空双向信号...") signals = [] df = data.copy() for i in range(24, len(df)): # 至少需要12小时(24个30分钟)的历史数据 current_bar = df.iloc[i] current_time = df.index[i] # 跳过不适合交易的时间段 if hasattr(current_time, 'hour'): # 有小时信息的30分钟数据 hour = current_time.hour if hour < 9 or hour > 15: # 只在交易时间内 continue # 生成基础信号数据 signal = { 'DateTime': str(current_time), 'Open': current_bar['Open'], 'High': current_bar['High'], 'Low': current_bar['Low'], 'Close': current_bar['Close'], 'Volume': current_bar['Volume'], 'RSI': current_bar['RSI'], 'MACD': current_bar['MACD'], 'MACD_hist': current_bar['MACD_hist'], 'K': current_bar['K'], 'D': current_bar['D'], 'J': current_bar['J'], 'ATR_Pct': current_bar['ATR_Pct'], 'Volume_Ratio': current_bar['Volume_Ratio'], 'Price_Momentum': current_bar['Price_Momentum'], 'Close_Open_Pct': current_bar['Close_Open_Pct'] } # 计算做多信号强度 long_score, long_signals = self._calculate_long_signals(current_bar, df, i) # 计算做空信号强度 short_score, short_signals = self._calculate_short_signals(current_bar, df, i) # 设置信号分数和描述 signal['Long_Score'] = long_score signal['Long_Signals'] = ', '.join(long_signals) if long_signals else '' signal['Short_Score'] = short_score signal['Short_Signals'] = ', '.join(short_signals) if short_signals else '' # 决定最终信号方向和强度 final_signal = 0 signal_type = '' # 信号优先级和冲突处理 if long_score >= 4 and short_score >= 4: # 两个方向都达到阈值,选择信号强度更高的 if long_score > short_score: final_signal = 1 signal_type = f'做多翻转(强度{long_score} vs {short_score})' self.long_signal_count += 1 elif short_score > long_score: final_signal = -1 signal_type = f'做空反转(强度{short_score} vs {long_score})' self.short_signal_count += 1 else: # 强度相等时,根据当前价格位置决定 bb_position = (current_bar['Close'] - current_bar['BB_lower']) / (current_bar['BB_upper'] - current_bar['BB_lower']) if bb_position < 0.3: # 偏向下轨,优先做多 final_signal = 1 signal_type = f'做多翻转(位置优先)' self.long_signal_count += 1 elif bb_position > 0.7: # 偏向上轨,优先做空 final_signal = -1 signal_type = f'做空反转(位置优先)' self.short_signal_count += 1 else: # 中间位置,暂不开仓 final_signal = 0 signal_type = '信号冲突(强度相等)' elif long_score >= 4: final_signal = 1 signal_type = '做多翻转' self.long_signal_count += 1 elif short_score >= 4: final_signal = -1 signal_type = '做空反转' self.short_signal_count += 1 self.total_signal_count = self.long_signal_count + self.short_signal_count signal['Signal'] = final_signal signal['Signal_Type'] = signal_type signals.append(signal) signals_df = pd.DataFrame(signals) if len(signals_df) > 0: signals_df.set_index('DateTime', inplace=True) if not self.silent_mode: print(f"多空双向信号生成完成") print(f"做多信号: {self.long_signal_count}个") print(f"做空信号: {self.short_signal_count}个") print(f"总信号: {self.total_signal_count}个") if len(signals_df) > 0: print(f"信号密度: {self.total_signal_count/len(signals_df)*100:.2f}%") return signals_df class CustomExecutor(DualDirectionExecutor): """自定义交易执行器 - 支持动态参数""" def __init__(self, params: Dict[str, Any], initial_capital: float = 1000000, silent_mode=False): super().__init__(initial_capital) # 更新参数 self.params['position_size_pct'] = params.get('position_size_pct', 1.0) self.params['stop_loss_pct'] = params.get('stop_loss_pct', 0.008) self.params['take_profit_pct'] = params.get('take_profit_pct', 0.02) self.params['max_hold_bars'] = params.get('max_hold_bars', 16) self.params['min_signal_strength'] = params.get('min_signal_strength', 4) self.silent_mode = silent_mode def execute_dual_direction_trades(self, signals_df: pd.DataFrame) -> tuple: """执行多空双向交易 - 静默模式""" if not self.silent_mode: print("正在执行多空双向交易...") # 复制原有逻辑,但移除详细打印 df = signals_df.copy() # 初始化 trades = [] capital = self.initial_capital # 持仓状态 long_position = 0 # 做多持仓数量 short_position = 0 # 做空持仓数量 long_entry_price = 0 # 做多开仓价 short_entry_price = 0 # 做空开仓价 long_entry_time = None # 做多开仓时间 short_entry_time = None # 做空开仓时间 long_holding_bars = 0 # 做多持仓周期 short_holding_bars = 0 # 做空持仓周期 long_entry_signals = '' # 做多入场信号 short_entry_signals = '' # 做空入场信号 # 添加资金列 df = df.copy() df['capital'] = capital df['long_position'] = 0 df['short_position'] = 0 df['net_value'] = capital for i in range(len(df)): current_time = df.index[i] current_bar = df.iloc[i] price = current_bar['Close'] # 更新当前净值 current_value = capital if long_position > 0: current_value += long_position * price if short_position < 0: # 做空盈亏 short_pnl = (short_entry_price - price) * abs(short_position) margin_held = abs(short_position) * short_entry_price current_value += margin_held + short_pnl df.iloc[i, df.columns.get_loc('net_value')] = current_value # 开仓逻辑 - 只在无持仓时开仓 if long_position == 0 and short_position == 0: # 做多开仓 if current_bar['Signal'] == 1: position_size = int((capital * self.params['position_size_pct']) / price) if position_size > 0: cost = position_size * price * (1 + self.params['commission_rate'] + self.params['slippage_rate']) if cost <= capital: long_position = position_size long_entry_price = price long_entry_time = current_time long_entry_signals = current_bar.get('Long_Signals', '') long_holding_bars = 0 capital -= cost # 计算预计止损止盈价格 long_stop_loss_price = long_entry_price * (1 - self.params['stop_loss_pct']) long_take_profit_price = long_entry_price * (1 + self.params['take_profit_pct']) df.iloc[i, df.columns.get_loc('long_position')] = long_position # 做空开仓 elif current_bar['Signal'] == -1: position_value = capital * self.params['position_size_pct'] position_size = int(position_value / price) if position_size > 0: margin_required = position_size * price commission = position_size * price * (self.params['commission_rate'] + self.params['slippage_rate']) total_cost = margin_required + commission if total_cost <= capital: short_position = -position_size short_entry_price = price short_entry_time = current_time short_entry_signals = current_bar.get('Short_Signals', '') short_holding_bars = 0 capital -= total_cost # 计算预计止损止盈价格 short_stop_loss_price = short_entry_price * (1 + self.params['stop_loss_pct']) short_take_profit_price = short_entry_price * (1 - self.params['take_profit_pct']) df.iloc[i, df.columns.get_loc('short_position')] = short_position # 平仓逻辑 - 做多平仓 elif long_position > 0: long_holding_bars += 1 # 计算止损止盈价格 stop_loss = long_entry_price * (1 - self.params['stop_loss_pct']) take_profit = long_entry_price * (1 + self.params['take_profit_pct']) exit_signal = False exit_reason = '' exit_price = price # 止损 if price <= stop_loss: exit_signal = True loss_pct = (long_entry_price - stop_loss) / long_entry_price * 100 exit_reason = f"做多止损触发(价格{price:.2f}跌破止损线{stop_loss:.2f},亏损{loss_pct:.2f}%)" exit_price = price # 止盈 elif price >= take_profit: exit_signal = True profit_pct = (price - long_entry_price) / long_entry_price * 100 exit_reason = f"做多止盈触发(价格{price:.2f}突破止盈线{take_profit:.2f},盈利{profit_pct:.2f}%)" exit_price = price # 最大持仓时间 elif long_holding_bars >= self.params['max_hold_bars']: exit_signal = True current_pnl_pct = (price - long_entry_price) / long_entry_price * 100 exit_reason = f"做多时间止损(持仓{long_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)" # 做多信号消失 elif current_bar['RSI'] > 70: exit_signal = True current_pnl_pct = (price - long_entry_price) / long_entry_price * 100 exit_reason = f"做多RSI超买平仓(RSI={current_bar['RSI']:.1f}超买,信号消失,当前盈亏{current_pnl_pct:+.2f}%)" # 执行平仓 if exit_signal: # 计算盈亏 gross_pnl = (exit_price - long_entry_price) * long_position open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) close_revenue = long_position * exit_price close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate']) pnl = gross_pnl - open_cost - close_cost # 更新资金 capital += close_revenue - close_cost # 记录交易 trade = { '交易方向': '做多', '开仓时间': long_entry_time, '平仓时间': current_time, '开仓价格': long_entry_price, '平仓价格': exit_price, '仓位': long_position, '盈亏金额': pnl, '盈亏百分比': (exit_price - long_entry_price) / long_entry_price * 100, '退出原因': exit_reason, '持仓周期数': long_holding_bars, '持仓小时数': long_holding_bars * 0.5, '入场信号': long_entry_signals, '平仓时资金': capital, '开仓市值': long_position * long_entry_price, '预计止损价格': long_stop_loss_price, '预计止盈价格': long_take_profit_price } trades.append(trade) # 重置做多持仓 long_position = 0 long_entry_price = 0 long_entry_time = None long_holding_bars = 0 # 平仓逻辑 - 做空平仓 elif short_position < 0: short_holding_bars += 1 # 计算止损止盈价格(做空逻辑相反) stop_loss_price = short_entry_price * (1 + self.params['stop_loss_pct']) # 价格上涨止损 take_profit_price = short_entry_price * (1 - self.params['take_profit_pct']) # 价格下跌止盈 exit_signal = False exit_reason = '' exit_price = price # 止损(价格上涨) if price >= stop_loss_price: exit_signal = True loss_pct = (stop_loss_price - short_entry_price) / short_entry_price * 100 exit_reason = f"做空止损触发(价格{price:.2f}突破止损线{stop_loss_price:.2f},亏损{loss_pct:.2f}%)" exit_price = price # 止盈(价格下跌) elif price <= take_profit_price: exit_signal = True profit_pct = (short_entry_price - price) / short_entry_price * 100 exit_reason = f"做空止盈触发(价格{price:.2f}跌破止盈线{take_profit_price:.2f},盈利{profit_pct:.2f}%)" exit_price = price # 最大持仓时间 elif short_holding_bars >= self.params['max_hold_bars']: exit_signal = True current_pnl_pct = (short_entry_price - price) / short_entry_price * 100 exit_reason = f"做空时间止损(持仓{short_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)" # 做空信号消失 elif current_bar['RSI'] < 30: exit_signal = True current_pnl_pct = (short_entry_price - price) / short_entry_price * 100 exit_reason = f"做空RSI超卖平仓(RSI={current_bar['RSI']:.1f}超卖,信号消失,当前盈亏{current_pnl_pct:+.2f}%)" # 执行平仓 if exit_signal: # 计算盈亏 gross_pnl = (short_entry_price - exit_price) * abs(short_position) open_commission = abs(short_position) * short_entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) close_commission = abs(short_position) * exit_price * (self.params['commission_rate'] + self.params['slippage_rate']) net_pnl = gross_pnl - close_commission # 更新资金(返还保证金 + 净盈亏) margin_returned = abs(short_position) * short_entry_price capital += margin_returned + net_pnl # 记录交易 trade = { '交易方向': '做空', '开仓时间': short_entry_time, '平仓时间': current_time, '开仓价格': short_entry_price, '平仓价格': exit_price, '仓位': abs(short_position), '盈亏金额': net_pnl, '盈亏百分比': (short_entry_price - exit_price) / short_entry_price * 100, '退出原因': exit_reason, '持仓周期数': short_holding_bars, '持仓小时数': short_holding_bars * 0.5, '入场信号': short_entry_signals, '平仓时资金': capital, '开仓市值': abs(short_position) * short_entry_price, '保证金返还': margin_returned, '预计止损价格': short_stop_loss_price, '预计止盈价格': short_take_profit_price } trades.append(trade) # 重置做空持仓 short_position = 0 short_entry_price = 0 short_entry_time = None short_holding_bars = 0 # 更新资金和持仓状态 df.iloc[i, df.columns.get_loc('capital')] = capital df.iloc[i, df.columns.get_loc('long_position')] = long_position df.iloc[i, df.columns.get_loc('short_position')] = short_position # 强制平仓剩余持仓 - 做多 if long_position > 0: final_price = df.iloc[-1]['Close'] gross_pnl = (final_price - long_entry_price) * long_position open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate']) close_revenue = long_position * final_price close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate']) pnl = gross_pnl - open_cost - close_cost capital += close_revenue - close_cost trade = { '交易方向': '做多', '开仓时间': long_entry_time, '平仓时间': df.index[-1], '开仓价格': long_entry_price, '平仓价格': final_price, '仓位': long_position, '盈亏金额': pnl, '盈亏百分比': (final_price - long_entry_price) / long_entry_price * 100, '退出原因': f'做多强制平仓(回测结束,持仓{long_holding_bars}周期,最终价格{final_price:.2f},盈亏{(final_price - long_entry_price) / long_entry_price * 100:+.2f}%)', '持仓周期数': long_holding_bars, '持仓小时数': long_holding_bars * 0.5, '入场信号': long_entry_signals, '平仓时资金': capital, '开仓市值': long_position * long_entry_price, '预计止损价格': long_stop_loss_price, '预计止盈价格': long_take_profit_price } trades.append(trade) # 做空持仓强制平仓 if short_position < 0: final_price = df.iloc[-1]['Close'] gross_pnl = (short_entry_price - final_price) * abs(short_position) close_commission = abs(short_position) * final_price * (self.params['commission_rate'] + self.params['slippage_rate']) net_pnl = gross_pnl - close_commission margin_returned = abs(short_position) * short_entry_price capital += margin_returned + net_pnl trade = { '交易方向': '做空', '开仓时间': short_entry_time, '平仓时间': df.index[-1], '开仓价格': short_entry_price, '平仓价格': final_price, '仓位': abs(short_position), '盈亏金额': net_pnl, '盈亏百分比': (short_entry_price - final_price) / short_entry_price * 100, '退出原因': f'做空强制平仓(回测结束,持仓{short_holding_bars}周期,最终价格{final_price:.2f},盈亏{(short_entry_price - final_price) / short_entry_price * 100:+.2f}%)', '持仓周期数': short_holding_bars, '持仓小时数': short_holding_bars * 0.5, '入场信号': short_entry_signals, '平仓时资金': capital, '开仓市值': abs(short_position) * short_entry_price, '保证金返还': margin_returned, '预计止损价格': short_stop_loss_price, '预计止盈价格': short_take_profit_price } trades.append(trade) trades_df = pd.DataFrame(trades) if len(trades_df) > 0: # 统一时间格式 for col in trades_df.columns: if '时间' in col: trades_df[col] = pd.to_datetime(trades_df[col]) trades_df = trades_df.sort_values('开仓时间') if not self.silent_mode: print(f"多空双向交易执行完成,共{len(trades_df)}笔交易") return df, trades_df def main(): """主程序 - 运行参数优化""" # 创建优化器 - 设置4个并发线程 optimizer = ParameterOptimizer('config.json', max_workers=4) # 运行优化 (可设置最大迭代次数和时间限制) # max_iterations: 限制测试次数,None表示测试所有组合 # time_limit: 时间限制(秒),None表示无限制 optimizer.run_optimization( max_iterations=None, # 测试所有组合 (现在只有1,152个组合) time_limit=86400 # 24小时 (24*60*60=86400秒) ) print(f"\n参数优化完成!") if __name__ == "__main__": main()