#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50 T+1 优化策略实现与回测 基于五层深挖的最优参数 """ import pandas as pd import numpy as np from datetime import datetime, timedelta import warnings warnings.filterwarnings('ignore') # 导入原策略组件 from cyb50_30min_dual_direction import ( ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor ) from t1_converter import simulate_t1_trades def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'): """加载本地数据""" print(f"📊 加载数据 {csv_file}...") df = pd.read_csv(csv_file) df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) # 列名映射 if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True) # 类型转换 for col in ['Open', 'High', 'Low', 'Close', 'Volume']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 计算基础指标 df['Returns'] = df['Close'].pct_change() df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] df.ffill(inplace=True) df.dropna(inplace=True) print(f"✅ 数据加载完成: {len(df)}条K线") print(f" 区间: {df.index[0]} ~ {df.index[-1]}") return df def calculate_enhanced_indicators(df): """计算增强版技术指标""" print("📈 计算增强指标...") # RSI (6, 14, 21) for period in [6, 14, 21]: delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() rs = gain / loss df[f'RSI_{period}'] = 100 - (100 / (1 + rs)) # 动量 (5周期) df['Momentum_5'] = (df['Close'] / df['Close'].shift(5) - 1) * 100 # 均线趋势 df['EMA5'] = df['Close'].ewm(span=5, adjust=False).mean() df['EMA20'] = df['Close'].ewm(span=20, adjust=False).mean() df['EMA60'] = df['Close'].ewm(span=60, adjust=False).mean() # 趋势评分 df['Trend_Score'] = 0 df.loc[df['Close'] > df['EMA5'], 'Trend_Score'] += 1 df.loc[df['Close'] > df['EMA20'], 'Trend_Score'] += 1 df.loc[df['Close'] > df['EMA60'], 'Trend_Score'] += 1 # 波动率 df['Volatility'] = df['Returns'].rolling(20).std() * np.sqrt(48) df['Vol_MA'] = df['Volatility'].rolling(20).mean() df['Vol_Regime'] = '正常' df.loc[df['Volatility'] > df['Vol_MA'] * 1.5, 'Vol_Regime'] = '高波动' df.loc[df['Volatility'] < df['Vol_MA'] * 0.5, 'Vol_Regime'] = '低波动' # 成交量比率 df['Volume_MA20'] = df['Volume'].rolling(20).mean() df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20'] df.dropna(inplace=True) print("✅ 指标计算完成") return df class OptimizedStrategy: """优化版T+1交易策略""" def __init__(self, initial_capital=1000000): self.initial_capital = initial_capital self.current_capital = initial_capital self.trades = [] self.daily_trade_count = {} self.consecutive_losses = 0 # 策略参数 - 基于五层深挖的最优值(进一步放宽) self.params = { 'rsi_low': 30, # RSI下限 - 进一步放宽 'rsi_high': 60, # RSI上限 - 进一步放宽 'momentum_min': -2, # 最小动量 - 进一步放宽 'trend_min': 0, # 最小趋势评分 'avoid_hour': 13, # 避开的小时 'avoid_friday': False, # 不避开周五 'max_daily_trades': 3, # 每日最大交易数 - 放宽 'position_base': 0.7, # 基础仓位70% - 提高 'position_max': 1.0, # 最大仓位100% 'stop_loss': 0.008, # 止损0.8% 'take_profit': 0.025, # 止盈2.5% 'max_hold_hours': 16, # 最大持仓16小时 't1_cutoff_time': '14:30', # T+1截止时间 } def calculate_position_size(self, row): """动态仓位计算""" base = self.params['position_base'] # 趋势因子 trend_factor = min(1.0 + row['Trend_Score'] * 0.2, 1.5) # 波动率因子 if row['Vol_Regime'] == '低波动': vol_factor = 1.2 elif row['Vol_Regime'] == '正常': vol_factor = 1.0 else: vol_factor = 0.6 # 连续亏损惩罚 loss_factor = max(1.0 - self.consecutive_losses * 0.2, 0.3) position = base * trend_factor * vol_factor * loss_factor return min(position, self.params['position_max']) def should_trade(self, timestamp, row): """判断是否应该交易""" # 1. 时间过滤 - 避开13点 if timestamp.hour == self.params['avoid_hour']: return False, "避开13点" # 2. 星期过滤 - 避开周五 if self.params['avoid_friday'] and timestamp.dayofweek == 4: return False, "避开周五" # 3. T+1过滤 - 14:30后不开仓 cutoff = datetime.strptime(self.params['t1_cutoff_time'], '%H:%M').time() if timestamp.time() > cutoff: return False, "避开T+1" # 4. 每日交易次数限制 date = timestamp.date() if self.daily_trade_count.get(date, 0) >= self.params['max_daily_trades']: return False, "已达日交易上限" # 5. RSI过滤 - 40-50区间 rsi = row['RSI_14'] if not (self.params['rsi_low'] <= rsi <= self.params['rsi_high']): return False, f"RSI {rsi:.1f} 不在范围内" # 6. 动量过滤 if row['Momentum_5'] < self.params['momentum_min']: return False, f"动量 {row['Momentum_5']:.2f} 不足" # 7. 趋势过滤 if row['Trend_Score'] < self.params['trend_min']: return False, f"趋势评分 {row['Trend_Score']} 不足" return True, "通过" def simulate_exit(self, entry_price, exit_price, entry_time, exit_time, position_size): """模拟退出,应用止损止盈""" # 计算理论盈亏比例 pnl_pct = (exit_price - entry_price) / entry_price # 应用止损 if pnl_pct <= -self.params['stop_loss']: actual_pnl_pct = -self.params['stop_loss'] exit_reason = '止损' # 应用止盈 elif pnl_pct >= self.params['take_profit']: actual_pnl_pct = self.params['take_profit'] exit_reason = '止盈' else: actual_pnl_pct = pnl_pct exit_reason = '正常平仓' # 计算盈亏金额 position_value = self.current_capital * position_size pnl_amount = actual_pnl_pct * position_value return pnl_amount, actual_pnl_pct, exit_reason def backtest(self, data, trades_df): """执行回测""" print("\n" + "="*60) print("🚀 开始优化策略回测") print("="*60) # 将数据与交易合并 t1_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间']) # 获取指标数据 merged_trades = [] for idx, trade in t1_trades.iterrows(): try: # 获取开仓时的指标 mask = data.index <= trade['开仓时间'] if not mask.any(): continue current_idx = data.index[mask][-1] current_data = data.loc[current_idx] # 判断是否满足交易条件 should_trade, reason = self.should_trade(trade['开仓时间'], current_data) if should_trade: # 计算仓位 position_size = self.calculate_position_size(current_data) # 模拟退出 pnl_amount, pnl_pct, exit_reason = self.simulate_exit( trade['开仓价格'], trade['平仓价格'], trade['开仓时间'], trade['平仓时间'], position_size ) # 记录交易 merged_trades.append({ '开仓时间': trade['开仓时间'], '平仓时间': trade['平仓时间'], '开仓价格': trade['开仓价格'], '平仓价格': trade['平仓价格'], '仓位': position_size, '实际盈亏': pnl_amount, '盈亏比例': pnl_pct, '退出原因': exit_reason, 'RSI': current_data['RSI_14'], '动量': current_data['Momentum_5'], '趋势': current_data['Trend_Score'], }) # 更新资本 self.current_capital += pnl_amount # 更新每日计数 date = trade['开仓时间'].date() self.daily_trade_count[date] = self.daily_trade_count.get(date, 0) + 1 # 更新连续亏损 if pnl_amount < 0: self.consecutive_losses += 1 else: self.consecutive_losses = 0 except Exception as e: continue self.trades = pd.DataFrame(merged_trades) return self.trades def report(self): """生成回测报告""" if len(self.trades) == 0: print("⚠️ 没有交易记录") return print("\n" + "="*60) print("📊 优化策略回测报告") print("="*60) # 基础统计 total_trades = len(self.trades) winning_trades = self.trades[self.trades['实际盈亏'] > 0] losing_trades = self.trades[self.trades['实际盈亏'] < 0] win_rate = len(winning_trades) / total_trades * 100 total_pnl = self.trades['实际盈亏'].sum() avg_pnl = self.trades['实际盈亏'].mean() total_return = (self.current_capital - self.initial_capital) / self.initial_capital * 100 print(f"\n【基础统计】") print(f" 初始资金: {self.initial_capital:,.0f}元") print(f" 最终资金: {self.current_capital:,.0f}元") print(f" 总收益率: {total_return:+.2f}%") print(f" 总交易次数: {total_trades}笔") print(f" 盈利次数: {len(winning_trades)}笔") print(f" 亏损次数: {len(losing_trades)}笔") print(f" 胜率: {win_rate:.1f}%") print(f" 总盈亏: {total_pnl:+,.0f}元") print(f" 平均盈亏: {avg_pnl:+,.0f}元") # 盈亏比 if len(losing_trades) > 0: profit_factor = abs(winning_trades['实际盈亏'].sum() / losing_trades['实际盈亏'].sum()) print(f" 盈亏比: {profit_factor:.2f}") # 退出原因统计 print(f"\n【退出原因统计】") exit_stats = self.trades.groupby('退出原因').agg({ '实际盈亏': ['count', 'sum', 'mean'] }).round(2) exit_stats.columns = ['次数', '总盈亏', '平均盈亏'] print(exit_stats.to_string()) # 对比原策略 print(f"\n【与原策略对比】") original_pnl = -123843 # 原策略亏损 improvement = total_pnl - original_pnl print(f" 原策略盈亏: {original_pnl:+,.0f}元") print(f" 优化策略盈亏: {total_pnl:+,.0f}元") print(f" 改善幅度: {improvement:+,.0f}元") print(f" 提升倍数: {abs(total_pnl / original_pnl) if original_pnl != 0 else 0:.1f}x") # 最近交易 print(f"\n【最近5笔交易】") recent = self.trades.tail(5)[['开仓时间', '平仓时间', '仓位', '实际盈亏', '退出原因']] print(recent.to_string(index=False)) def main(): """主函数""" print("="*60) print("创业板50 T+1 优化策略回测系统") print("="*60) # 加载数据 raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv') # 计算增强指标 data = calculate_enhanced_indicators(raw_data) # 获取原策略交易信号 print("\n📡 生成原策略交易信号...") config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) signal_generator = DualDirectionSignalGenerator() executor = DualDirectionExecutor(initial_capital=1000000) data_with_indicators = fetcher.calculate_intraday_indicators(data) signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) # 运行优化策略回测 strategy = OptimizedStrategy(initial_capital=1000000) optimized_trades = strategy.backtest(data, trades_df) # 生成报告 strategy.report() print("\n" + "="*60) print("✅ 回测完成") print("="*60) if __name__ == '__main__': main()