| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102 |
- import pandas as pd
- import numpy as np
- import itertools
- import json
- import time
- from datetime import datetime, timedelta
- from typing import Dict, List, Tuple, Any
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from threading import Lock
- import warnings
- import sys
- import io
- warnings.filterwarnings('ignore')
- # 导入原有策略模块
- from cyb50_30min_dual_direction import (
- ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator,
- DualDirectionExecutor, validate_dual_direction_results
- )
- class ParameterOptimizer:
- """参数优化器 - 网格搜索最优参数组合"""
-
- def __init__(self, config_file='config.json', max_workers=4):
- self.config_manager = ConfigManager(config_file)
- self.optimization_results = []
- self.best_params = None
- self.best_score = None
- self.max_workers = max_workers
- self.results_lock = Lock() # 线程锁,保护共享数据
-
- def define_parameter_grid(self) -> Dict[str, List]:
- """定义参数搜索网格 - 精简优化版本"""
-
- parameter_grid = {
- # 核心交易参数(重点优化)
- 'position_size_pct': [0.7, 0.85, 1.0],
- 'stop_loss_pct': [0.006, 0.008, 0.010, 0.012],
- 'take_profit_pct': [0.012, 0.015, 0.018, 0.022],
- 'max_hold_bars': [14, 16, 18, 22],
- 'min_signal_strength': [3, 4, 5],
-
- # RSI参数(精简)
- 'rsi_oversold': [25, 30],
- 'rsi_overbought': [70, 75],
-
- # KDJ参数(精简)
- 'kdj_oversold': [20],
- 'kdj_overbought': [80],
-
- # 成交量参数(精简)
- 'volume_ratio_threshold': [1.2],
-
- # 布林带参数(精简)
- 'bb_upper_threshold': [0.995],
- 'bb_lower_threshold': [1.005],
-
- # MACD参数(固定)
- 'macd_fast': [12],
- 'macd_slow': [26],
-
- # 连续涨跌参数(精简)
- 'consecutive_bars': [4],
- 'consecutive_change_pct': [0.015]
- }
-
- return parameter_grid
-
- def generate_parameter_combinations(self, parameter_grid: Dict[str, List]) -> List[Dict[str, Any]]:
- """生成所有参数组合"""
- param_names = list(parameter_grid.keys())
- param_values = list(parameter_grid.values())
-
- total_combinations = 1
- for values in param_values:
- total_combinations *= len(values)
-
- print(f"参数网格总组合数: {total_combinations:,}")
- print(f"参数维度: {len(param_names)}")
- print("\n参数搜索范围:")
- for name, values in parameter_grid.items():
- print(f" {name}: {values}")
-
- # 生成所有组合
- combinations = []
- for combination in itertools.product(*param_values):
- params = dict(zip(param_names, combination))
- combinations.append(params)
-
- return combinations
-
- def calculate_performance_score(self, trades_df: pd.DataFrame, results_df: pd.DataFrame,
- initial_capital: float) -> Dict[str, float]:
- """计算综合性能得分"""
- if len(trades_df) == 0:
- return {
- 'total_return': -100,
- 'sharpe_ratio': -999,
- 'max_drawdown': -100,
- 'win_rate': 0,
- 'profit_factor': 0,
- 'total_trades': 0,
- 'composite_score': -9999
- }
-
- # 基础指标
- final_capital = results_df['net_value'].iloc[-1]
- total_return = (final_capital - initial_capital) / initial_capital * 100
-
- # 夏普比率计算
- returns = results_df['net_value'].pct_change().dropna()
- sharpe_ratio = returns.mean() / returns.std() * np.sqrt(252 * 16) if returns.std() > 0 else 0
-
- # 最大回撤
- cumulative_returns = (1 + returns).cumprod()
- running_max = cumulative_returns.expanding().max()
- drawdown = (cumulative_returns - running_max) / running_max
- max_drawdown = drawdown.min() * 100
-
- # 胜率
- winning_trades = trades_df[trades_df['盈亏金额'] > 0]
- losing_trades = trades_df[trades_df['盈亏金额'] < 0]
- win_rate = len(winning_trades) / len(trades_df) * 100 if len(trades_df) > 0 else 0
-
- # 盈亏比
- avg_win = winning_trades['盈亏金额'].mean() if len(winning_trades) > 0 else 0
- avg_loss = abs(losing_trades['盈亏金额'].mean()) if len(losing_trades) > 0 else 1
- profit_factor = avg_win / avg_loss if avg_loss > 0 else 0
-
- # 综合得分计算 (可自定义权重)
- composite_score = (
- total_return * 0.3 + # 30%权重收益率
- sharpe_ratio * 10 + # 夏普比率权重
- max_drawdown * 0.2 + # 20%权重回撤控制
- win_rate * 0.2 + # 20%权重胜率
- profit_factor * 5 # 盈亏比权重
- )
-
- return {
- 'total_return': total_return,
- 'sharpe_ratio': sharpe_ratio,
- 'max_drawdown': max_drawdown,
- 'win_rate': win_rate,
- 'profit_factor': profit_factor,
- 'total_trades': len(trades_df),
- 'composite_score': composite_score
- }
-
- def run_single_backtest(self, params: Dict[str, Any], backtest_data: pd.DataFrame,
- initial_capital: float) -> Tuple[bool, Dict, Any]:
- """运行单次回测 - 静默模式(使用预加载数据)"""
- try:
- # 创建自定义信号生成器和执行器
- signal_gen = CustomSignalGenerator(params, silent_mode=True)
- executor = CustomExecutor(params, initial_capital, silent_mode=True)
-
- if len(backtest_data) < 50:
- return False, {'error': '数据不足'}, None, None
-
- # 生成信号
- signals_df = signal_gen.generate_dual_direction_signals(backtest_data)
-
- # 执行交易
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
-
- # 计算性能指标
- performance = self.calculate_performance_score(trades_df, results_df, initial_capital)
-
- return True, performance, trades_df, results_df
-
- except Exception as e:
- return False, {'error': str(e)}, None, None
-
- def run_single_backtest_thread(self, params: Dict[str, Any], backtest_data: pd.DataFrame,
- initial_capital: float) -> Tuple[bool, Dict, Any]:
- """线程安全的单次回测(使用预加载数据)"""
- return self.run_single_backtest(params, backtest_data, initial_capital)
-
- def run_optimization(self, max_iterations: int = None, time_limit: int = 3600):
- """运行参数优化 - 多线程版本"""
- print("=" * 80)
- print("创业板50策略参数优化 - 多线程网格搜索")
- print("=" * 80)
-
- # 获取回测配置
- BACKTEST_START_DATE = self.config_manager.get('strategy', 'backtest_start_date', "2025-10-01")
- backtest_end_config = self.config_manager.get('strategy', 'backtest_end_date', "now")
- BACKTEST_END_DATE = datetime.now().strftime('%Y-%m-%d') if backtest_end_config.lower() == "now" else backtest_end_config
- INITIAL_CAPITAL = self.config_manager.get('strategy', 'initial_capital', 1000000)
-
- start_date = datetime.strptime(BACKTEST_START_DATE, "%Y-%m-%d")
- end_date = datetime.strptime(BACKTEST_END_DATE, "%Y-%m-%d").replace(hour=23, minute=59, second=59)
-
- print(f"回测期间: {BACKTEST_START_DATE} 至 {BACKTEST_END_DATE}")
- print(f"初始资金: {INITIAL_CAPITAL:,}元")
- print(f"并发线程数: {self.max_workers}")
-
- # 预加载数据(只加载一次)
- print(f"\n预加载回测数据...")
-
- # 重定向标准输出来控制数据加载时的混乱输出
- old_stdout = sys.stdout
- sys.stdout = io.StringIO()
-
- try:
- fetcher = IntradayDataFetcher(self.config_manager)
- prewamp_days = self.config_manager.get('strategy', 'prewamp_days', 30)
- data_start_date = start_date - timedelta(days=prewamp_days)
-
- full_data = fetcher.fetch_30min_data(start_date=data_start_date, end_date=end_date)
- full_data = fetcher.calculate_intraday_indicators(full_data)
- backtest_data = full_data[(full_data.index >= start_date) & (full_data.index <= end_date)].copy()
- finally:
- # 恢复标准输出
- sys.stdout = old_stdout
-
- print(f"数据预加载完成: {len(backtest_data)}条数据")
- print(f"数据范围: {backtest_data.index[0]} 到 {backtest_data.index[-1]}")
-
- # 生成参数组合
- parameter_grid = self.define_parameter_grid()
- all_combinations = self.generate_parameter_combinations(parameter_grid)
-
- # 限制迭代次数
- if max_iterations and max_iterations < len(all_combinations):
- import random
- all_combinations = random.sample(all_combinations, max_iterations)
- print(f"随机选择 {max_iterations} 个组合进行测试")
-
- # 运行优化
- start_time = time.time()
- successful_tests = 0
- failed_tests = 0
- last_progress_time = start_time
-
- print(f"\n开始多线程网格搜索...")
- print(f"时间限制: {time_limit}秒 ({time_limit/3600:.1f}小时)" if time_limit else "无时间限制")
- print(f"预期总时间: {len(all_combinations)/self.max_workers * 2:.0f}-{len(all_combinations)/self.max_workers * 5:.0f}分钟")
-
- # 使用线程池执行
- with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
- # 提交所有任务
- future_to_params = {}
- for i, params in enumerate(all_combinations):
- future = executor.submit(
- self.run_single_backtest_thread,
- params, backtest_data, INITIAL_CAPITAL
- )
- future_to_params[future] = (i, params)
-
- # 控制提交速度,避免内存占用过大
- if len(future_to_params) >= 100: # 最多同时处理100个任务
- # 等待一些任务完成
- completed_futures = []
- for future in as_completed(list(future_to_params.keys())):
- completed_futures.append(future)
- if len(completed_futures) >= 50: # 完成一半后继续提交
- break
-
- # 处理完成的任务
- for future in completed_futures:
- i, params = future_to_params[future]
- try:
- success, performance, trades_df, results_df = future.result()
-
- with self.results_lock: # 使用锁保护共享数据
- if success:
- successful_tests += 1
- result = {
- 'params': params.copy(),
- 'performance': performance,
- 'trades_count': len(trades_df) if trades_df is not None else 0
- }
- self.optimization_results.append(result)
-
- # 更新最优参数
- if self.best_score is None or performance['composite_score'] > self.best_score:
- self.best_score = performance['composite_score']
- self.best_params = params.copy()
- self.best_performance = performance
- print(f" 🎯 发现更优参数! 综合得分: {performance['composite_score']:.2f}")
- else:
- failed_tests += 1
-
- # 增加进度打印频率
- total_tests = successful_tests + failed_tests
- current_time = time.time()
- if total_tests % 5 == 0 or (current_time - last_progress_time) > 30: # 每5个测试或每30秒打印一次
- elapsed = current_time - start_time
- progress = total_tests / len(all_combinations) * 100
- avg_time_per_test = elapsed / total_tests
- remaining_tests = len(all_combinations) - total_tests
- eta_seconds = remaining_tests * avg_time_per_test / self.max_workers
-
- print(f"进度: {total_tests}/{len(all_combinations)} ({progress:.1f}%) | "
- f"成功: {successful_tests} 失败: {failed_tests} | "
- f"已用时: {elapsed/60:.1f}分钟 | "
- f"预计剩余: {eta_seconds/60:.1f}分钟")
- last_progress_time = current_time
-
- # 检查时间限制
- if time_limit and (current_time - start_time) > time_limit:
- print(f"\n⏰ 达到时间限制 {time_limit/3600:.1f}小时,停止优化")
- executor.shutdown(wait=False)
- break
-
- except Exception as e:
- with self.results_lock:
- failed_tests += 1
- print(f"回测异常: {e}")
-
- # 移除已处理的future
- del future_to_params[future]
-
- # 处理剩余的任务
- for future in as_completed(future_to_params.keys()):
- i, params = future_to_params[future]
- try:
- success, performance, trades_df, results_df = future.result()
-
- with self.results_lock:
- if success:
- successful_tests += 1
- result = {
- 'params': params.copy(),
- 'performance': performance,
- 'trades_count': len(trades_df) if trades_df is not None else 0
- }
- self.optimization_results.append(result)
-
- if self.best_score is None or performance['composite_score'] > self.best_score:
- self.best_score = performance['composite_score']
- self.best_params = params.copy()
- self.best_performance = performance
- print(f" 🎯 发现更优参数! 综合得分: {performance['composite_score']:.2f}")
- else:
- failed_tests += 1
-
- # 增加进度打印
- total_tests = successful_tests + failed_tests
- current_time = time.time()
- if total_tests % 5 == 0 or (current_time - last_progress_time) > 30:
- elapsed = current_time - start_time
- progress = total_tests / len(all_combinations) * 100
- avg_time_per_test = elapsed / total_tests
- remaining_tests = len(all_combinations) - total_tests
- eta_seconds = remaining_tests * avg_time_per_test / self.max_workers
-
- print(f"进度: {total_tests}/{len(all_combinations)} ({progress:.1f}%) | "
- f"成功: {successful_tests} 失败: {failed_tests} | "
- f"已用时: {elapsed/60:.1f}分钟 | "
- f"预计剩余: {eta_seconds/60:.1f}分钟")
- last_progress_time = current_time
-
- except Exception as e:
- with self.results_lock:
- failed_tests += 1
- print(f"回测异常: {e}")
-
- elapsed_time = time.time() - start_time
-
- # 输出优化结果
- print(f"\n优化完成!")
- print(f"总测试次数: {len(all_combinations)}")
- print(f"成功测试: {successful_tests}")
- print(f"失败测试: {failed_tests}")
- print(f"总用时: {elapsed_time:.1f}秒")
- print(f"平均每次测试: {elapsed_time/len(all_combinations):.2f}秒")
- print(f"线程并发效果: 理论加速比 {self.max_workers}x,实际加速比 {elapsed_time/(elapsed_time/self.max_workers):.1f}x")
-
- # 分析结果
- self.analyze_results()
-
- def analyze_results(self):
- """分析优化结果"""
- if not self.optimization_results:
- print("没有成功的优化结果")
- return
-
- print("\n" + "=" * 80)
- print("优化结果分析")
- print("=" * 80)
-
- # 转换为DataFrame便于分析
- results_data = []
- for result in self.optimization_results:
- row = result['params'].copy()
- row.update(result['performance'])
- results_data.append(row)
-
- results_df = pd.DataFrame(results_data)
-
- # 排序找到最优参数
- top_results = results_df.nlargest(10, 'composite_score')
-
- print(f"\n🏆 TOP 10 最优参数组合:")
- print("-" * 80)
- for i, (_, row) in enumerate(top_results.iterrows(), 1):
- print(f"\n第{i}名 (综合得分: {row['composite_score']:.2f})")
- print(f" 收益率: {row['total_return']:.2f}%")
- print(f" 夏普比率: {row['sharpe_ratio']:.2f}")
- print(f" 最大回撤: {row['max_drawdown']:.2f}%")
- print(f" 胜率: {row['win_rate']:.1f}%")
- print(f" 盈亏比: {row['profit_factor']:.2f}")
- print(f" 交易次数: {int(row['total_trades'])}")
- print(f" 关键参数:")
- print(f" 仓位比例: {row['position_size_pct']:.1f}")
- print(f" 止损: {row['stop_loss_pct']*100:.1f}%")
- print(f" 止盈: {row['take_profit_pct']*100:.1f}%")
- print(f" 最大持仓: {int(row['max_hold_bars'])}周期")
- print(f" 信号强度: {int(row['min_signal_strength'])}")
- print(f" RSI超卖/超买: {int(row['rsi_oversold'])}/{int(row['rsi_overbought'])}")
- print(f" KDJ超卖/超买: {int(row['kdj_oversold'])}/{int(row['kdj_overbought'])}")
-
- # 参数敏感性分析
- print(f"\n📊 参数敏感性分析:")
- print("-" * 80)
-
- key_params = ['position_size_pct', 'stop_loss_pct', 'take_profit_pct', 'max_hold_bars']
- for param in key_params:
- if param in results_df.columns:
- param_analysis = results_df.groupby(param)['composite_score'].agg(['mean', 'std', 'min', 'max'])
- print(f"\n{param}:")
- for value, stats in param_analysis.iterrows():
- print(f" {value}: 平均得分 {stats['mean']:.2f} (±{stats['std']:.2f})")
-
- # 保存结果
- self.save_optimization_results(results_df)
-
- def save_optimization_results(self, results_df: pd.DataFrame):
- """保存优化结果"""
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- output_file = f'cyb50_optimization_results_{timestamp}.csv'
-
- # 按综合得分排序
- results_df = results_df.sort_values('composite_score', ascending=False)
- results_df.to_csv(output_file, index=False, encoding='utf-8-sig')
-
- print(f"\n💾 优化结果已保存到: {output_file}")
-
- # 保存最优参数到配置文件
- if self.best_params:
- best_params_file = 'best_parameters.json'
- with open(best_params_file, 'w', encoding='utf-8') as f:
- json.dump({
- 'best_params': self.best_params,
- 'best_performance': self.best_performance,
- 'optimization_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- }, f, indent=2, ensure_ascii=False)
-
- print(f"💾 最优参数已保存到: {best_params_file}")
- class CustomSignalGenerator(DualDirectionSignalGenerator):
- """自定义信号生成器 - 支持动态参数"""
-
- def __init__(self, params: Dict[str, Any], silent_mode=False):
- super().__init__()
- self.params = params
- self.silent_mode = silent_mode
-
- def _calculate_long_signals(self, current_bar, df, i):
- """计算做多信号强度 - 使用自定义参数"""
- long_score = 0
- long_signals = []
-
- # RSI超卖做多
- rsi_oversold = self.params.get('rsi_oversold', 30)
- if current_bar['RSI'] < rsi_oversold:
- long_score += 2
- long_signals.append(f"RSI超卖(<{rsi_oversold})")
- elif current_bar['RSI'] < rsi_oversold + 5:
- long_score += 1
- long_signals.append("RSI偏弱")
-
- # KDJ超卖做多
- kdj_oversold = self.params.get('kdj_oversold', 20)
- if current_bar['K'] < kdj_oversold and current_bar['D'] < kdj_oversold:
- long_score += 2
- long_signals.append(f"KDJ超卖(<{kdj_oversold})")
- elif current_bar['J'] < 0:
- long_score += 1
- long_signals.append("KDJ极端超卖")
-
- # MACD金叉
- macd_fast = self.params.get('macd_fast', 12)
- macd_slow = self.params.get('macd_slow', 26)
- if current_bar['MACD_hist'] > 0 and df.iloc[i-1]['MACD_hist'] <= 0:
- long_score += 2
- long_signals.append("MACD金叉")
- elif current_bar['MACD_hist'] > df.iloc[i-1]['MACD_hist']:
- long_score += 1
- long_signals.append("MACD改善")
-
- # 价格触及布林带下轨
- bb_lower_threshold = self.params.get('bb_lower_threshold', 1.005)
- if current_bar['Close'] <= current_bar['BB_lower'] * bb_lower_threshold:
- long_score += 2
- long_signals.append("触及下轨")
- elif current_bar['Close'] <= current_bar['BB_lower'] * (bb_lower_threshold + 0.005):
- long_score += 1
- long_signals.append("接近下轨")
-
- # 连续下跌后的反转
- consecutive_bars = self.params.get('consecutive_bars', 4)
- consecutive_change_pct = self.params.get('consecutive_change_pct', 0.015)
- recent_returns = df.iloc[i-6:i]['Returns']
- if recent_returns.min() < -consecutive_change_pct:
- consecutive_decline = sum(recent_returns < 0)
- if consecutive_decline >= consecutive_bars:
- long_score += 2
- long_signals.append("连续下跌反转")
-
- # 成交量配合
- volume_ratio_threshold = self.params.get('volume_ratio_threshold', 1.2)
- if current_bar['Volume_Ratio'] > volume_ratio_threshold:
- long_score += 1
- long_signals.append("放量配合")
-
- # 当日开盘价格关系
- try:
- daily_high = df[df.index.date == df.index[i].date()]['High'].max()
- daily_low = df[df.index.date == df.index[i].date()]['Low'].min()
- daily_range = daily_high - daily_low
-
- if daily_range > 0:
- position_in_day = (current_bar['Close'] - daily_low) / daily_range
- if position_in_day < 0.3:
- long_score += 1
- long_signals.append("日内低位")
- except:
- pass
- # MA趋势过滤
- if current_bar['MA6'] < current_bar['MA12'] < current_bar['MA24']:
- long_score -= 1
- long_signals.append("MA下降趋势惩罚")
- elif current_bar['MA6'] > current_bar['MA12']:
- long_score += 1
- long_signals.append("MA短期上行")
- return long_score, long_signals
- def _calculate_short_signals(self, current_bar, df, i):
- """计算做空信号强度 - 使用自定义参数"""
- short_score = 0
- short_signals = []
-
- # RSI超买做空
- rsi_overbought = self.params.get('rsi_overbought', 70)
- if current_bar['RSI'] > rsi_overbought:
- short_score += 2
- short_signals.append(f"RSI超买(>{rsi_overbought})")
- elif current_bar['RSI'] > rsi_overbought - 5:
- short_score += 1
- short_signals.append("RSI偏强")
-
- # KDJ超买做空
- kdj_overbought = self.params.get('kdj_overbought', 80)
- if current_bar['K'] > kdj_overbought and current_bar['D'] > kdj_overbought:
- short_score += 2
- short_signals.append(f"KDJ超买(>{kdj_overbought})")
- elif current_bar['J'] > 100:
- short_score += 1
- short_signals.append("KDJ极端超买")
-
- # MACD死叉
- if current_bar['MACD_hist'] < 0 and df.iloc[i-1]['MACD_hist'] >= 0:
- short_score += 2
- short_signals.append("MACD死叉")
- elif current_bar['MACD_hist'] < df.iloc[i-1]['MACD_hist']:
- short_score += 1
- short_signals.append("MACD恶化")
-
- # 价格触及布林带上轨
- bb_upper_threshold = self.params.get('bb_upper_threshold', 0.995)
- if current_bar['Close'] >= current_bar['BB_upper'] * bb_upper_threshold:
- short_score += 2
- short_signals.append("触及上轨")
- elif current_bar['Close'] >= current_bar['BB_upper'] * (bb_upper_threshold - 0.005):
- short_score += 1
- short_signals.append("接近上轨")
-
- # 连续上涨后的反转
- consecutive_bars = self.params.get('consecutive_bars', 4)
- consecutive_change_pct = self.params.get('consecutive_change_pct', 0.015)
- recent_returns = df.iloc[i-6:i]['Returns']
- if recent_returns.max() > consecutive_change_pct:
- consecutive_rise = sum(recent_returns > 0)
- if consecutive_rise >= consecutive_bars:
- short_score += 2
- short_signals.append("连续上涨反转")
-
- # 成交量配合
- volume_ratio_threshold = self.params.get('volume_ratio_threshold', 1.2)
- if current_bar['Volume_Ratio'] > volume_ratio_threshold:
- short_score += 1
- short_signals.append("放量配合")
-
- # 当日开盘价格关系
- try:
- daily_high = df[df.index.date == df.index[i].date()]['High'].max()
- daily_low = df[df.index.date == df.index[i].date()]['Low'].min()
- daily_range = daily_high - daily_low
-
- if daily_range > 0:
- position_in_day = (current_bar['Close'] - daily_low) / daily_range
- if position_in_day > 0.7:
- short_score += 1
- short_signals.append("日内高位")
- except:
- pass
- # MA趋势过滤
- if current_bar['MA6'] > current_bar['MA12'] > current_bar['MA24']:
- short_score -= 1
- short_signals.append("MA上升趋势惩罚")
- elif current_bar['MA6'] < current_bar['MA12']:
- short_score += 1
- short_signals.append("MA短期下行")
- return short_score, short_signals
- def generate_dual_direction_signals(self, data: pd.DataFrame) -> pd.DataFrame:
- """生成多空双向信号 - 静默模式"""
- if not self.silent_mode:
- print("正在生成多空双向信号...")
-
- signals = []
- df = data.copy()
-
- for i in range(24, len(df)): # 至少需要12小时(24个30分钟)的历史数据
- current_bar = df.iloc[i]
- current_time = df.index[i]
-
- # 跳过不适合交易的时间段
- if hasattr(current_time, 'hour'): # 有小时信息的30分钟数据
- hour = current_time.hour
- if hour < 9 or hour > 15: # 只在交易时间内
- continue
-
- # 生成基础信号数据
- signal = {
- 'DateTime': str(current_time),
- 'Open': current_bar['Open'],
- 'High': current_bar['High'],
- 'Low': current_bar['Low'],
- 'Close': current_bar['Close'],
- 'Volume': current_bar['Volume'],
- 'RSI': current_bar['RSI'],
- 'MACD': current_bar['MACD'],
- 'MACD_hist': current_bar['MACD_hist'],
- 'K': current_bar['K'],
- 'D': current_bar['D'],
- 'J': current_bar['J'],
- 'ATR_Pct': current_bar['ATR_Pct'],
- 'Volume_Ratio': current_bar['Volume_Ratio'],
- 'Price_Momentum': current_bar['Price_Momentum'],
- 'Close_Open_Pct': current_bar['Close_Open_Pct']
- }
-
- # 计算做多信号强度
- long_score, long_signals = self._calculate_long_signals(current_bar, df, i)
-
- # 计算做空信号强度
- short_score, short_signals = self._calculate_short_signals(current_bar, df, i)
-
- # 设置信号分数和描述
- signal['Long_Score'] = long_score
- signal['Long_Signals'] = ', '.join(long_signals) if long_signals else ''
- signal['Short_Score'] = short_score
- signal['Short_Signals'] = ', '.join(short_signals) if short_signals else ''
-
- # 决定最终信号方向和强度
- final_signal = 0
- signal_type = ''
-
- # 信号优先级和冲突处理
- if long_score >= 4 and short_score >= 4:
- # 两个方向都达到阈值,选择信号强度更高的
- if long_score > short_score:
- final_signal = 1
- signal_type = f'做多翻转(强度{long_score} vs {short_score})'
- self.long_signal_count += 1
- elif short_score > long_score:
- final_signal = -1
- signal_type = f'做空反转(强度{short_score} vs {long_score})'
- self.short_signal_count += 1
- else:
- # 强度相等时,根据当前价格位置决定
- bb_position = (current_bar['Close'] - current_bar['BB_lower']) / (current_bar['BB_upper'] - current_bar['BB_lower'])
- if bb_position < 0.3: # 偏向下轨,优先做多
- final_signal = 1
- signal_type = f'做多翻转(位置优先)'
- self.long_signal_count += 1
- elif bb_position > 0.7: # 偏向上轨,优先做空
- final_signal = -1
- signal_type = f'做空反转(位置优先)'
- self.short_signal_count += 1
- else:
- # 中间位置,暂不开仓
- final_signal = 0
- signal_type = '信号冲突(强度相等)'
-
- elif long_score >= 4:
- final_signal = 1
- signal_type = '做多翻转'
- self.long_signal_count += 1
-
- elif short_score >= 4:
- final_signal = -1
- signal_type = '做空反转'
- self.short_signal_count += 1
-
- self.total_signal_count = self.long_signal_count + self.short_signal_count
-
- signal['Signal'] = final_signal
- signal['Signal_Type'] = signal_type
-
- signals.append(signal)
-
- signals_df = pd.DataFrame(signals)
-
- if len(signals_df) > 0:
- signals_df.set_index('DateTime', inplace=True)
-
- if not self.silent_mode:
- print(f"多空双向信号生成完成")
- print(f"做多信号: {self.long_signal_count}个")
- print(f"做空信号: {self.short_signal_count}个")
- print(f"总信号: {self.total_signal_count}个")
- if len(signals_df) > 0:
- print(f"信号密度: {self.total_signal_count/len(signals_df)*100:.2f}%")
-
- return signals_df
- class CustomExecutor(DualDirectionExecutor):
- """自定义交易执行器 - 支持动态参数"""
-
- def __init__(self, params: Dict[str, Any], initial_capital: float = 1000000, silent_mode=False):
- super().__init__(initial_capital)
- # 更新参数
- self.params['position_size_pct'] = params.get('position_size_pct', 1.0)
- self.params['stop_loss_pct'] = params.get('stop_loss_pct', 0.008)
- self.params['take_profit_pct'] = params.get('take_profit_pct', 0.02)
- self.params['max_hold_bars'] = params.get('max_hold_bars', 16)
- self.params['min_signal_strength'] = params.get('min_signal_strength', 4)
- self.silent_mode = silent_mode
-
- def execute_dual_direction_trades(self, signals_df: pd.DataFrame) -> tuple:
- """执行多空双向交易 - 静默模式"""
- if not self.silent_mode:
- print("正在执行多空双向交易...")
-
- # 复制原有逻辑,但移除详细打印
- df = signals_df.copy()
-
- # 初始化
- trades = []
- capital = self.initial_capital
-
- # 持仓状态
- long_position = 0 # 做多持仓数量
- short_position = 0 # 做空持仓数量
- long_entry_price = 0 # 做多开仓价
- short_entry_price = 0 # 做空开仓价
- long_entry_time = None # 做多开仓时间
- short_entry_time = None # 做空开仓时间
- long_holding_bars = 0 # 做多持仓周期
- short_holding_bars = 0 # 做空持仓周期
- long_entry_signals = '' # 做多入场信号
- short_entry_signals = '' # 做空入场信号
-
- # 添加资金列
- df = df.copy()
- df['capital'] = capital
- df['long_position'] = 0
- df['short_position'] = 0
- df['net_value'] = capital
-
- for i in range(len(df)):
- current_time = df.index[i]
- current_bar = df.iloc[i]
- price = current_bar['Close']
-
- # 更新当前净值
- current_value = capital
- if long_position > 0:
- current_value += long_position * price
- if short_position < 0:
- # 做空盈亏
- short_pnl = (short_entry_price - price) * abs(short_position)
- margin_held = abs(short_position) * short_entry_price
- current_value += margin_held + short_pnl
-
- df.iloc[i, df.columns.get_loc('net_value')] = current_value
-
- # 开仓逻辑 - 只在无持仓时开仓
- if long_position == 0 and short_position == 0:
- # 做多开仓
- if current_bar['Signal'] == 1:
- position_size = int((capital * self.params['position_size_pct']) / price)
- if position_size > 0:
- cost = position_size * price * (1 + self.params['commission_rate'] + self.params['slippage_rate'])
-
- if cost <= capital:
- long_position = position_size
- long_entry_price = price
- long_entry_time = current_time
- long_entry_signals = current_bar.get('Long_Signals', '')
- long_holding_bars = 0
- capital -= cost
- # 计算预计止损止盈价格
- long_stop_loss_price = long_entry_price * (1 - self.params['stop_loss_pct'])
- long_take_profit_price = long_entry_price * (1 + self.params['take_profit_pct'])
- df.iloc[i, df.columns.get_loc('long_position')] = long_position
- # 做空开仓
- elif current_bar['Signal'] == -1:
- position_value = capital * self.params['position_size_pct']
- position_size = int(position_value / price)
- if position_size > 0:
- margin_required = position_size * price
- commission = position_size * price * (self.params['commission_rate'] + self.params['slippage_rate'])
- total_cost = margin_required + commission
- if total_cost <= capital:
- short_position = -position_size
- short_entry_price = price
- short_entry_time = current_time
- short_entry_signals = current_bar.get('Short_Signals', '')
- short_holding_bars = 0
- capital -= total_cost
-
- # 计算预计止损止盈价格
- short_stop_loss_price = short_entry_price * (1 + self.params['stop_loss_pct'])
- short_take_profit_price = short_entry_price * (1 - self.params['take_profit_pct'])
-
- df.iloc[i, df.columns.get_loc('short_position')] = short_position
-
- # 平仓逻辑 - 做多平仓
- elif long_position > 0:
- long_holding_bars += 1
- # 计算止损止盈价格
- stop_loss = long_entry_price * (1 - self.params['stop_loss_pct'])
- take_profit = long_entry_price * (1 + self.params['take_profit_pct'])
- exit_signal = False
- exit_reason = ''
- exit_price = price
- # 止损
- if price <= stop_loss:
- exit_signal = True
- loss_pct = (long_entry_price - stop_loss) / long_entry_price * 100
- exit_reason = f"做多止损触发(价格{price:.2f}跌破止损线{stop_loss:.2f},亏损{loss_pct:.2f}%)"
- exit_price = price
- # 止盈
- elif price >= take_profit:
- exit_signal = True
- profit_pct = (price - long_entry_price) / long_entry_price * 100
- exit_reason = f"做多止盈触发(价格{price:.2f}突破止盈线{take_profit:.2f},盈利{profit_pct:.2f}%)"
- exit_price = price
-
- # 最大持仓时间
- elif long_holding_bars >= self.params['max_hold_bars']:
- exit_signal = True
- current_pnl_pct = (price - long_entry_price) / long_entry_price * 100
- exit_reason = f"做多时间止损(持仓{long_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 做多信号消失
- elif current_bar['RSI'] > 70:
- exit_signal = True
- current_pnl_pct = (price - long_entry_price) / long_entry_price * 100
- exit_reason = f"做多RSI超买平仓(RSI={current_bar['RSI']:.1f}超买,信号消失,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 执行平仓
- if exit_signal:
- # 计算盈亏
- gross_pnl = (exit_price - long_entry_price) * long_position
- open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- close_revenue = long_position * exit_price
- close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate'])
- pnl = gross_pnl - open_cost - close_cost
-
- # 更新资金
- capital += close_revenue - close_cost
-
- # 记录交易
- trade = {
- '交易方向': '做多',
- '开仓时间': long_entry_time,
- '平仓时间': current_time,
- '开仓价格': long_entry_price,
- '平仓价格': exit_price,
- '仓位': long_position,
- '盈亏金额': pnl,
- '盈亏百分比': (exit_price - long_entry_price) / long_entry_price * 100,
- '退出原因': exit_reason,
- '持仓周期数': long_holding_bars,
- '持仓小时数': long_holding_bars * 0.5,
- '入场信号': long_entry_signals,
- '平仓时资金': capital,
- '开仓市值': long_position * long_entry_price,
- '预计止损价格': long_stop_loss_price,
- '预计止盈价格': long_take_profit_price
- }
- trades.append(trade)
-
- # 重置做多持仓
- long_position = 0
- long_entry_price = 0
- long_entry_time = None
- long_holding_bars = 0
-
- # 平仓逻辑 - 做空平仓
- elif short_position < 0:
- short_holding_bars += 1
- # 计算止损止盈价格(做空逻辑相反)
- stop_loss_price = short_entry_price * (1 + self.params['stop_loss_pct']) # 价格上涨止损
- take_profit_price = short_entry_price * (1 - self.params['take_profit_pct']) # 价格下跌止盈
- exit_signal = False
- exit_reason = ''
- exit_price = price
- # 止损(价格上涨)
- if price >= stop_loss_price:
- exit_signal = True
- loss_pct = (stop_loss_price - short_entry_price) / short_entry_price * 100
- exit_reason = f"做空止损触发(价格{price:.2f}突破止损线{stop_loss_price:.2f},亏损{loss_pct:.2f}%)"
- exit_price = price
- # 止盈(价格下跌)
- elif price <= take_profit_price:
- exit_signal = True
- profit_pct = (short_entry_price - price) / short_entry_price * 100
- exit_reason = f"做空止盈触发(价格{price:.2f}跌破止盈线{take_profit_price:.2f},盈利{profit_pct:.2f}%)"
- exit_price = price
-
- # 最大持仓时间
- elif short_holding_bars >= self.params['max_hold_bars']:
- exit_signal = True
- current_pnl_pct = (short_entry_price - price) / short_entry_price * 100
- exit_reason = f"做空时间止损(持仓{short_holding_bars}周期达上限{self.params['max_hold_bars']}周期,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 做空信号消失
- elif current_bar['RSI'] < 30:
- exit_signal = True
- current_pnl_pct = (short_entry_price - price) / short_entry_price * 100
- exit_reason = f"做空RSI超卖平仓(RSI={current_bar['RSI']:.1f}超卖,信号消失,当前盈亏{current_pnl_pct:+.2f}%)"
-
- # 执行平仓
- if exit_signal:
- # 计算盈亏
- gross_pnl = (short_entry_price - exit_price) * abs(short_position)
- open_commission = abs(short_position) * short_entry_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- close_commission = abs(short_position) * exit_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- net_pnl = gross_pnl - close_commission
-
- # 更新资金(返还保证金 + 净盈亏)
- margin_returned = abs(short_position) * short_entry_price
- capital += margin_returned + net_pnl
-
- # 记录交易
- trade = {
- '交易方向': '做空',
- '开仓时间': short_entry_time,
- '平仓时间': current_time,
- '开仓价格': short_entry_price,
- '平仓价格': exit_price,
- '仓位': abs(short_position),
- '盈亏金额': net_pnl,
- '盈亏百分比': (short_entry_price - exit_price) / short_entry_price * 100,
- '退出原因': exit_reason,
- '持仓周期数': short_holding_bars,
- '持仓小时数': short_holding_bars * 0.5,
- '入场信号': short_entry_signals,
- '平仓时资金': capital,
- '开仓市值': abs(short_position) * short_entry_price,
- '保证金返还': margin_returned,
- '预计止损价格': short_stop_loss_price,
- '预计止盈价格': short_take_profit_price
- }
- trades.append(trade)
-
- # 重置做空持仓
- short_position = 0
- short_entry_price = 0
- short_entry_time = None
- short_holding_bars = 0
-
- # 更新资金和持仓状态
- df.iloc[i, df.columns.get_loc('capital')] = capital
- df.iloc[i, df.columns.get_loc('long_position')] = long_position
- df.iloc[i, df.columns.get_loc('short_position')] = short_position
-
- # 强制平仓剩余持仓 - 做多
- if long_position > 0:
- final_price = df.iloc[-1]['Close']
-
- gross_pnl = (final_price - long_entry_price) * long_position
- open_cost = long_position * long_entry_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- close_revenue = long_position * final_price
- close_cost = close_revenue * (self.params['commission_rate'] + self.params['slippage_rate'])
- pnl = gross_pnl - open_cost - close_cost
-
- capital += close_revenue - close_cost
-
- trade = {
- '交易方向': '做多',
- '开仓时间': long_entry_time,
- '平仓时间': df.index[-1],
- '开仓价格': long_entry_price,
- '平仓价格': final_price,
- '仓位': long_position,
- '盈亏金额': pnl,
- '盈亏百分比': (final_price - long_entry_price) / long_entry_price * 100,
- '退出原因': f'做多强制平仓(回测结束,持仓{long_holding_bars}周期,最终价格{final_price:.2f},盈亏{(final_price - long_entry_price) / long_entry_price * 100:+.2f}%)',
- '持仓周期数': long_holding_bars,
- '持仓小时数': long_holding_bars * 0.5,
- '入场信号': long_entry_signals,
- '平仓时资金': capital,
- '开仓市值': long_position * long_entry_price,
- '预计止损价格': long_stop_loss_price,
- '预计止盈价格': long_take_profit_price
- }
- trades.append(trade)
-
- # 做空持仓强制平仓
- if short_position < 0:
- final_price = df.iloc[-1]['Close']
-
- gross_pnl = (short_entry_price - final_price) * abs(short_position)
- close_commission = abs(short_position) * final_price * (self.params['commission_rate'] + self.params['slippage_rate'])
- net_pnl = gross_pnl - close_commission
-
- margin_returned = abs(short_position) * short_entry_price
- capital += margin_returned + net_pnl
-
- trade = {
- '交易方向': '做空',
- '开仓时间': short_entry_time,
- '平仓时间': df.index[-1],
- '开仓价格': short_entry_price,
- '平仓价格': final_price,
- '仓位': abs(short_position),
- '盈亏金额': net_pnl,
- '盈亏百分比': (short_entry_price - final_price) / short_entry_price * 100,
- '退出原因': f'做空强制平仓(回测结束,持仓{short_holding_bars}周期,最终价格{final_price:.2f},盈亏{(short_entry_price - final_price) / short_entry_price * 100:+.2f}%)',
- '持仓周期数': short_holding_bars,
- '持仓小时数': short_holding_bars * 0.5,
- '入场信号': short_entry_signals,
- '平仓时资金': capital,
- '开仓市值': abs(short_position) * short_entry_price,
- '保证金返还': margin_returned,
- '预计止损价格': short_stop_loss_price,
- '预计止盈价格': short_take_profit_price
- }
- trades.append(trade)
-
- trades_df = pd.DataFrame(trades)
-
- if len(trades_df) > 0:
- # 统一时间格式
- for col in trades_df.columns:
- if '时间' in col:
- trades_df[col] = pd.to_datetime(trades_df[col])
- trades_df = trades_df.sort_values('开仓时间')
-
- if not self.silent_mode:
- print(f"多空双向交易执行完成,共{len(trades_df)}笔交易")
-
- return df, trades_df
- def main():
- """主程序 - 运行参数优化"""
-
- # 创建优化器 - 设置4个并发线程
- optimizer = ParameterOptimizer('config.json', max_workers=4)
-
- # 运行优化 (可设置最大迭代次数和时间限制)
- # max_iterations: 限制测试次数,None表示测试所有组合
- # time_limit: 时间限制(秒),None表示无限制
- optimizer.run_optimization(
- max_iterations=None, # 测试所有组合 (现在只有1,152个组合)
- time_limit=86400 # 24小时 (24*60*60=86400秒)
- )
-
- print(f"\n参数优化完成!")
- if __name__ == "__main__":
- main()
|