#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50 T+1 高级优化策略V2 探索: 移动止损、分批止盈、多因子评分、市场环境自适应 """ import pandas as pd import numpy as np from datetime import datetime, timedelta import warnings from itertools import product warnings.filterwarnings('ignore') # 导入原策略组件 from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) from t1_converter import simulate_t1_trades def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'): """加载本地数据""" print(f"📊 加载数据 {csv_file}...") df = pd.read_csv(csv_file) df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True) for col in ['Open', 'High', 'Low', 'Close', 'Volume']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') df['Returns'] = df['Close'].pct_change() df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] df.ffill(inplace=True) df.dropna(inplace=True) print(f"✅ 数据加载完成: {len(df)}条K线") return df def calculate_advanced_indicators(df): """计算高级技术指标""" print("📈 计算高级指标...") # RSI多种周期 for period in [6, 14, 21]: delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss df[f'RSI_{period}'] = 100 - (100 / (1 + rs)) # 动量 for period in [3, 5, 10]: df[f'Momentum_{period}'] = (df['Close'] / df['Close'].shift(period) - 1) * 100 # 均线 for period in [5, 20, 60]: df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean() # 趋势评分 df['Trend_Score'] = 0 df.loc[df['Close'] > df['EMA_5'], 'Trend_Score'] += 1 df.loc[df['Close'] > df['EMA_20'], 'Trend_Score'] += 1 df.loc[df['Close'] > df['EMA_60'], 'Trend_Score'] += 1 # 波动率 df['Volatility'] = df['Returns'].rolling(20).std() * np.sqrt(48) df['Vol_MA'] = df['Volatility'].rolling(20).mean() df['Vol_Regime'] = '正常' df.loc[df['Volatility'] > df['Vol_MA'] * 1.5, 'Vol_Regime'] = '高波动' df.loc[df['Volatility'] < df['Vol_MA'] * 0.5, 'Vol_Regime'] = '低波动' # 成交量 df['Volume_MA20'] = df['Volume'].rolling(20).mean() df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20'] # MACD exp1 = df['Close'].ewm(span=12, adjust=False).mean() exp2 = df['Close'].ewm(span=26, adjust=False).mean() df['MACD'] = exp1 - exp2 df['MACD_Signal'] = df['MACD'].ewm(span=9, adjust=False).mean() df['MACD_Hist'] = df['MACD'] - df['MACD_Signal'] # 布林带 df['BB_Middle'] = df['Close'].rolling(20).mean() df['BB_Std'] = df['Close'].rolling(20).std() df['BB_Upper'] = df['BB_Middle'] + 2 * df['BB_Std'] df['BB_Lower'] = df['BB_Middle'] - 2 * df['BB_Std'] df['BB_Position'] = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower']) df.dropna(inplace=True) return df class AdvancedOptimizedStrategy: """高级优化策略V2""" def __init__(self, params=None): self.initial_capital = 1000000 self.current_capital = 1000000 self.trades = [] # 默认参数 self.params = params or { 'rsi_low': 30, 'rsi_high': 60, 'momentum_min': -2, 'trend_min': 0, 'avoid_hour': 13, 'max_daily_trades': 3, 'position_base': 0.7, 'stop_loss': 0.008, 'take_profit': 0.025, 'trailing_stop': True, 'trailing_activation': 0.01, 'trailing_distance': 0.005, 'partial_exit': True, 'partial_ratio': 0.5, 'partial_target': 0.015, } def calculate_multi_factor_score(self, row): """多因子评分系统 (0-100分)""" score = 50 # 基础分 # RSI因子 (0-20分) rsi = row['RSI_14'] if 40 <= rsi <= 50: score += 20 elif 35 <= rsi <= 55: score += 15 elif 30 <= rsi <= 60: score += 10 else: score -= 10 # 趋势因子 (0-20分) trend = row['Trend_Score'] score += trend * 5 # 动量因子 (0-15分) mom = row['Momentum_5'] if mom > 2: score += 15 elif mom > 0: score += 10 elif mom > -1: score += 5 else: score -= 5 # 波动率因子 (0-15分) if row['Vol_Regime'] == '低波动': score += 15 elif row['Vol_Regime'] == '正常': score += 10 else: score -= 5 # MACD因子 (0-15分) if row['MACD_Hist'] > 0 and row['MACD'] > 0: score += 15 elif row['MACD_Hist'] > 0: score += 10 elif row['MACD'] > row['MACD_Signal']: score += 5 # 布林带因子 (0-15分) bb_pos = row['BB_Position'] if 0.2 <= bb_pos <= 0.4: score += 15 elif 0.4 <= bb_pos <= 0.6: score += 10 elif bb_pos < 0.2: score += 5 return min(max(score, 0), 100) def should_trade(self, timestamp, row, daily_count): """判断是否应该交易""" # 时间过滤 if timestamp.hour == self.params['avoid_hour']: return False, "避开13点" # T+1过滤 if timestamp.time() > datetime.strptime('14:30', '%H:%M').time(): return False, "避开T+1" # 每日交易次数限制 date = timestamp.date() if daily_count.get(date, 0) >= self.params['max_daily_trades']: return False, "已达日交易上限" # 基础RSI过滤 rsi = row['RSI_14'] if not (self.params['rsi_low'] <= rsi <= self.params['rsi_high']): return False, f"RSI {rsi:.1f} 不在范围内" # 动量过滤 if row['Momentum_5'] < self.params['momentum_min']: return False, f"动量 {row['Momentum_5']:.2f} 不足" return True, "通过" def simulate_advanced_exit(self, entry_price, exit_price, high_price, low_price, entry_time, exit_time, position_size): """模拟高级退出策略(移动止损+分批止盈)""" pnl_pct = (exit_price - entry_price) / entry_price # 计算期间最大盈利(用于移动止损) if high_price > entry_price: max_profit_pct = (high_price - entry_price) / entry_price else: max_profit_pct = 0 exit_reason = '正常平仓' actual_pnl_pct = pnl_pct # 1. 硬止损检查 if pnl_pct <= -self.params['stop_loss']: actual_pnl_pct = -self.params['stop_loss'] exit_reason = '止损' return actual_pnl_pct, exit_reason # 2. 分批止盈检查 if self.params['partial_exit'] and max_profit_pct >= self.params['partial_target']: # 达到分批止盈条件,假设50%仓位止盈 partial_pnl = self.params['partial_target'] * self.params['partial_ratio'] remaining_pnl = pnl_pct * (1 - self.params['partial_ratio']) actual_pnl_pct = partial_pnl + remaining_pnl exit_reason = '分批止盈' # 3. 移动止损检查 elif self.params['trailing_stop'] and max_profit_pct >= self.params['trailing_activation']: # 激活移动止损 trailing_stop_price = high_price * (1 - self.params['trailing_distance']) trailing_stop_pct = (trailing_stop_price - entry_price) / entry_price if pnl_pct <= trailing_stop_pct: actual_pnl_pct = trailing_stop_pct exit_reason = '移动止损' # 4. 目标止盈检查 elif pnl_pct >= self.params['take_profit']: actual_pnl_pct = self.params['take_profit'] exit_reason = '止盈' return actual_pnl_pct, exit_reason def backtest(self, data, trades_df): """执行回测""" print("\n🚀 开始高级优化策略回测") t1_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间']) t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间']) daily_count = {} for idx, trade in t1_trades.iterrows(): try: mask = data.index <= trade['开仓时间'] if not mask.any(): continue current_idx = data.index[mask][-1] current_data = data.loc[current_idx] should_trade, reason = self.should_trade(trade['开仓时间'], current_data, daily_count) if should_trade: position_size = self.params['position_base'] # 多因子评分调整仓位 factor_score = self.calculate_multi_factor_score(current_data) if factor_score >= 80: position_size = min(position_size * 1.2, 1.0) elif factor_score < 50: position_size = position_size * 0.7 # 获取期间高低点(简化处理) exit_mask = (data.index >= trade['开仓时间']) & (data.index <= trade['平仓时间']) if exit_mask.any(): period_data = data.loc[exit_mask] high_price = period_data['High'].max() low_price = period_data['Low'].min() else: high_price = trade['平仓价格'] low_price = trade['平仓价格'] pnl_pct, exit_reason = self.simulate_advanced_exit( trade['开仓价格'], trade['平仓价格'], high_price, low_price, trade['开仓时间'], trade['平仓时间'], position_size ) position_value = self.current_capital * position_size pnl_amount = pnl_pct * position_value self.trades.append({ '开仓时间': trade['开仓时间'], '平仓时间': trade['平仓时间'], '仓位': position_size, '因子评分': factor_score, '盈亏比例': pnl_pct, '实际盈亏': pnl_amount, '退出原因': exit_reason, }) self.current_capital += pnl_amount date = trade['开仓时间'].date() daily_count[date] = daily_count.get(date, 0) + 1 except Exception as e: continue return pd.DataFrame(self.trades) def report(self): """生成报告""" if len(self.trades) == 0: return {"error": "无交易记录"} df = pd.DataFrame(self.trades) total_trades = len(df) wins = df[df['实际盈亏'] > 0] losses = df[df['实际盈亏'] < 0] win_rate = len(wins) / total_trades * 100 total_pnl = df['实际盈亏'].sum() total_return = (self.current_capital - self.initial_capital) / self.initial_capital * 100 profits = wins['实际盈亏'].sum() if len(wins) > 0 else 0 loss_amount = abs(losses['实际盈亏'].sum()) if len(losses) > 0 else 1 profit_factor = profits / loss_amount return { '总交易次数': total_trades, '胜率': win_rate, '总盈亏': total_pnl, '总收益率': total_return, '盈亏比': profit_factor, '最终资金': self.current_capital, } def grid_search_optimization(data, trades_df): """网格搜索最优参数""" print("\n" + "="*60) print("🔍 网格搜索最优参数") print("="*60) # 参数搜索空间 param_grid = { 'stop_loss': [0.005, 0.008, 0.01, 0.015], 'take_profit': [0.02, 0.025, 0.03, 0.035], 'trailing_activation': [0.008, 0.01, 0.015], 'trailing_distance': [0.003, 0.005, 0.008], } results = [] for sl in param_grid['stop_loss']: for tp in param_grid['take_profit']: for ta in param_grid['trailing_activation']: for td in param_grid['trailing_distance']: params = { 'rsi_low': 30, 'rsi_high': 60, 'momentum_min': -2, 'trend_min': 0, 'avoid_hour': 13, 'max_daily_trades': 3, 'position_base': 0.7, 'stop_loss': sl, 'take_profit': tp, 'trailing_stop': True, 'trailing_activation': ta, 'trailing_distance': td, 'partial_exit': False, } strategy = AdvancedOptimizedStrategy(params) strategy.backtest(data, trades_df) result = strategy.report() if 'error' not in result and result['总交易次数'] >= 20: results.append({ '止损': sl, '止盈': tp, '移动激活': ta, '移动距离': td, '交易数': result['总交易次数'], '胜率': result['胜率'], '总盈亏': result['总盈亏'], '盈亏比': result['盈亏比'], '收益率': result['总收益率'], }) results_df = pd.DataFrame(results) if len(results_df) > 0: print("\n【总盈亏TOP10参数组合】") top10 = results_df.nlargest(10, '总盈亏') print(top10.to_string(index=False)) print("\n【胜率TOP5参数组合】(至少30笔)") winrate_top = results_df[results_df['交易数'] >= 30].nlargest(5, '胜率') print(winrate_top.to_string(index=False)) print("\n【综合评分TOP5】(盈亏比>1.5且胜率>45%)") filtered = results_df[(results_df['盈亏比'] > 1.5) & (results_df['胜率'] > 45)] if len(filtered) > 0: print(filtered.nlargest(5, '总盈亏').to_string(index=False)) else: print("无满足条件的参数组合") return results_df return None def compare_strategies(data, trades_df): """对比不同策略""" print("\n" + "="*60) print("📊 策略对比分析") print("="*60) strategies = { '原策略(固定SL0.8% TP2%)': { 'stop_loss': 0.008, 'take_profit': 0.02, 'trailing_stop': False, 'partial_exit': False }, '优化V1(固定SL0.8% TP2.5%)': { 'stop_loss': 0.008, 'take_profit': 0.025, 'trailing_stop': False, 'partial_exit': False }, '优化V2(移动止损)': { 'stop_loss': 0.008, 'take_profit': 0.025, 'trailing_stop': True, 'trailing_activation': 0.01, 'trailing_distance': 0.005, 'partial_exit': False }, '优化V3(分批止盈)': { 'stop_loss': 0.008, 'take_profit': 0.03, 'trailing_stop': False, 'partial_exit': True, 'partial_ratio': 0.5, 'partial_target': 0.015 }, '优化V4(移动+分批)': { 'stop_loss': 0.01, 'take_profit': 0.035, 'trailing_stop': True, 'trailing_activation': 0.015, 'trailing_distance': 0.008, 'partial_exit': True, 'partial_ratio': 0.5, 'partial_target': 0.02 }, } results = [] for name, params in strategies.items(): full_params = { 'rsi_low': 30, 'rsi_high': 60, 'momentum_min': -2, 'trend_min': 0, 'avoid_hour': 13, 'max_daily_trades': 3, 'position_base': 0.7, **params } strategy = AdvancedOptimizedStrategy(full_params) strategy.backtest(data, trades_df) result = strategy.report() if 'error' not in result: results.append({ '策略': name, '交易数': result['总交易次数'], '胜率': f"{result['胜率']:.1f}%", '总盈亏': f"{result['总盈亏']:+.0f}", '盈亏比': f"{result['盈亏比']:.2f}", '收益率': f"{result['总收益率']:+.2f}%", }) results_df = pd.DataFrame(results) print("\n" + results_df.to_string(index=False)) def main(): """主函数""" print("="*60) print("创业板50 T+1 高级优化策略V2") print("="*60) # 加载数据 raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv') data = calculate_advanced_indicators(raw_data) # 获取原策略交易 print("\n📡 生成原策略交易信号...") config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) signal_generator = DualDirectionSignalGenerator() executor = DualDirectionExecutor(initial_capital=1000000) data_with_indicators = fetcher.calculate_intraday_indicators(data) signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) # 策略对比 compare_strategies(data, trades_df) # 网格搜索最优参数 grid_results = grid_search_optimization(data, trades_df) # 使用最优参数最终回测 if grid_results is not None and len(grid_results) > 0: best_params = grid_results.loc[grid_results['总盈亏'].idxmax()] print(f"\n" + "="*60) print("🏆 最优参数最终回测") print("="*60) print(f"止损: {best_params['止损']}") print(f"止盈: {best_params['止盈']}") print(f"移动激活: {best_params['移动激活']}") print(f"移动距离: {best_params['移动距离']}") print(f"预期盈亏: {best_params['总盈亏']:,.0f}元") print(f"预期收益率: {best_params['收益率']:+.2f}%") print("\n" + "="*60) print("✅ 分析完成") print("="*60) if __name__ == '__main__': main()