| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50 T+1 优化策略实现与回测
- 基于五层深挖的最优参数
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import warnings
- warnings.filterwarnings('ignore')
- # 导入原策略组件
- from cyb50_30min_dual_direction import (
- ConfigManager,
- IntradayDataFetcher,
- DualDirectionSignalGenerator,
- DualDirectionExecutor
- )
- from t1_converter import simulate_t1_trades
- def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
- """加载本地数据"""
- print(f"📊 加载数据 {csv_file}...")
- df = pd.read_csv(csv_file)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- # 列名映射
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
- # 类型转换
- for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- # 计算基础指标
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- print(f"✅ 数据加载完成: {len(df)}条K线")
- print(f" 区间: {df.index[0]} ~ {df.index[-1]}")
- return df
- def calculate_enhanced_indicators(df):
- """计算增强版技术指标"""
- print("📈 计算增强指标...")
- # RSI (6, 14, 21)
- for period in [6, 14, 21]:
- delta = df['Close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
- rs = gain / loss
- df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
- # 动量 (5周期)
- df['Momentum_5'] = (df['Close'] / df['Close'].shift(5) - 1) * 100
- # 均线趋势
- df['EMA5'] = df['Close'].ewm(span=5, adjust=False).mean()
- df['EMA20'] = df['Close'].ewm(span=20, adjust=False).mean()
- df['EMA60'] = df['Close'].ewm(span=60, adjust=False).mean()
- # 趋势评分
- df['Trend_Score'] = 0
- df.loc[df['Close'] > df['EMA5'], 'Trend_Score'] += 1
- df.loc[df['Close'] > df['EMA20'], 'Trend_Score'] += 1
- df.loc[df['Close'] > df['EMA60'], 'Trend_Score'] += 1
- # 波动率
- df['Volatility'] = df['Returns'].rolling(20).std() * np.sqrt(48)
- df['Vol_MA'] = df['Volatility'].rolling(20).mean()
- df['Vol_Regime'] = '正常'
- df.loc[df['Volatility'] > df['Vol_MA'] * 1.5, 'Vol_Regime'] = '高波动'
- df.loc[df['Volatility'] < df['Vol_MA'] * 0.5, 'Vol_Regime'] = '低波动'
- # 成交量比率
- df['Volume_MA20'] = df['Volume'].rolling(20).mean()
- df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20']
- df.dropna(inplace=True)
- print("✅ 指标计算完成")
- return df
- class OptimizedStrategy:
- """优化版T+1交易策略"""
- def __init__(self, initial_capital=1000000):
- self.initial_capital = initial_capital
- self.current_capital = initial_capital
- self.trades = []
- self.daily_trade_count = {}
- self.consecutive_losses = 0
- # 策略参数 - 基于五层深挖的最优值(进一步放宽)
- self.params = {
- 'rsi_low': 30, # RSI下限 - 进一步放宽
- 'rsi_high': 60, # RSI上限 - 进一步放宽
- 'momentum_min': -2, # 最小动量 - 进一步放宽
- 'trend_min': 0, # 最小趋势评分
- 'avoid_hour': 13, # 避开的小时
- 'avoid_friday': False, # 不避开周五
- 'max_daily_trades': 3, # 每日最大交易数 - 放宽
- 'position_base': 0.7, # 基础仓位70% - 提高
- 'position_max': 1.0, # 最大仓位100%
- 'stop_loss': 0.008, # 止损0.8%
- 'take_profit': 0.025, # 止盈2.5%
- 'max_hold_hours': 16, # 最大持仓16小时
- 't1_cutoff_time': '14:30', # T+1截止时间
- }
- def calculate_position_size(self, row):
- """动态仓位计算"""
- base = self.params['position_base']
- # 趋势因子
- trend_factor = min(1.0 + row['Trend_Score'] * 0.2, 1.5)
- # 波动率因子
- if row['Vol_Regime'] == '低波动':
- vol_factor = 1.2
- elif row['Vol_Regime'] == '正常':
- vol_factor = 1.0
- else:
- vol_factor = 0.6
- # 连续亏损惩罚
- loss_factor = max(1.0 - self.consecutive_losses * 0.2, 0.3)
- position = base * trend_factor * vol_factor * loss_factor
- return min(position, self.params['position_max'])
- def should_trade(self, timestamp, row):
- """判断是否应该交易"""
- # 1. 时间过滤 - 避开13点
- if timestamp.hour == self.params['avoid_hour']:
- return False, "避开13点"
- # 2. 星期过滤 - 避开周五
- if self.params['avoid_friday'] and timestamp.dayofweek == 4:
- return False, "避开周五"
- # 3. T+1过滤 - 14:30后不开仓
- cutoff = datetime.strptime(self.params['t1_cutoff_time'], '%H:%M').time()
- if timestamp.time() > cutoff:
- return False, "避开T+1"
- # 4. 每日交易次数限制
- date = timestamp.date()
- if self.daily_trade_count.get(date, 0) >= self.params['max_daily_trades']:
- return False, "已达日交易上限"
- # 5. RSI过滤 - 40-50区间
- rsi = row['RSI_14']
- if not (self.params['rsi_low'] <= rsi <= self.params['rsi_high']):
- return False, f"RSI {rsi:.1f} 不在范围内"
- # 6. 动量过滤
- if row['Momentum_5'] < self.params['momentum_min']:
- return False, f"动量 {row['Momentum_5']:.2f} 不足"
- # 7. 趋势过滤
- if row['Trend_Score'] < self.params['trend_min']:
- return False, f"趋势评分 {row['Trend_Score']} 不足"
- return True, "通过"
- def simulate_exit(self, entry_price, exit_price, entry_time, exit_time, position_size):
- """模拟退出,应用止损止盈"""
- # 计算理论盈亏比例
- pnl_pct = (exit_price - entry_price) / entry_price
- # 应用止损
- if pnl_pct <= -self.params['stop_loss']:
- actual_pnl_pct = -self.params['stop_loss']
- exit_reason = '止损'
- # 应用止盈
- elif pnl_pct >= self.params['take_profit']:
- actual_pnl_pct = self.params['take_profit']
- exit_reason = '止盈'
- else:
- actual_pnl_pct = pnl_pct
- exit_reason = '正常平仓'
- # 计算盈亏金额
- position_value = self.current_capital * position_size
- pnl_amount = actual_pnl_pct * position_value
- return pnl_amount, actual_pnl_pct, exit_reason
- def backtest(self, data, trades_df):
- """执行回测"""
- print("\n" + "="*60)
- print("🚀 开始优化策略回测")
- print("="*60)
- # 将数据与交易合并
- t1_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间'])
- # 获取指标数据
- merged_trades = []
- for idx, trade in t1_trades.iterrows():
- try:
- # 获取开仓时的指标
- mask = data.index <= trade['开仓时间']
- if not mask.any():
- continue
- current_idx = data.index[mask][-1]
- current_data = data.loc[current_idx]
- # 判断是否满足交易条件
- should_trade, reason = self.should_trade(trade['开仓时间'], current_data)
- if should_trade:
- # 计算仓位
- position_size = self.calculate_position_size(current_data)
- # 模拟退出
- pnl_amount, pnl_pct, exit_reason = self.simulate_exit(
- trade['开仓价格'],
- trade['平仓价格'],
- trade['开仓时间'],
- trade['平仓时间'],
- position_size
- )
- # 记录交易
- merged_trades.append({
- '开仓时间': trade['开仓时间'],
- '平仓时间': trade['平仓时间'],
- '开仓价格': trade['开仓价格'],
- '平仓价格': trade['平仓价格'],
- '仓位': position_size,
- '实际盈亏': pnl_amount,
- '盈亏比例': pnl_pct,
- '退出原因': exit_reason,
- 'RSI': current_data['RSI_14'],
- '动量': current_data['Momentum_5'],
- '趋势': current_data['Trend_Score'],
- })
- # 更新资本
- self.current_capital += pnl_amount
- # 更新每日计数
- date = trade['开仓时间'].date()
- self.daily_trade_count[date] = self.daily_trade_count.get(date, 0) + 1
- # 更新连续亏损
- if pnl_amount < 0:
- self.consecutive_losses += 1
- else:
- self.consecutive_losses = 0
- except Exception as e:
- continue
- self.trades = pd.DataFrame(merged_trades)
- return self.trades
- def report(self):
- """生成回测报告"""
- if len(self.trades) == 0:
- print("⚠️ 没有交易记录")
- return
- print("\n" + "="*60)
- print("📊 优化策略回测报告")
- print("="*60)
- # 基础统计
- total_trades = len(self.trades)
- winning_trades = self.trades[self.trades['实际盈亏'] > 0]
- losing_trades = self.trades[self.trades['实际盈亏'] < 0]
- win_rate = len(winning_trades) / total_trades * 100
- total_pnl = self.trades['实际盈亏'].sum()
- avg_pnl = self.trades['实际盈亏'].mean()
- total_return = (self.current_capital - self.initial_capital) / self.initial_capital * 100
- print(f"\n【基础统计】")
- print(f" 初始资金: {self.initial_capital:,.0f}元")
- print(f" 最终资金: {self.current_capital:,.0f}元")
- print(f" 总收益率: {total_return:+.2f}%")
- print(f" 总交易次数: {total_trades}笔")
- print(f" 盈利次数: {len(winning_trades)}笔")
- print(f" 亏损次数: {len(losing_trades)}笔")
- print(f" 胜率: {win_rate:.1f}%")
- print(f" 总盈亏: {total_pnl:+,.0f}元")
- print(f" 平均盈亏: {avg_pnl:+,.0f}元")
- # 盈亏比
- if len(losing_trades) > 0:
- profit_factor = abs(winning_trades['实际盈亏'].sum() / losing_trades['实际盈亏'].sum())
- print(f" 盈亏比: {profit_factor:.2f}")
- # 退出原因统计
- print(f"\n【退出原因统计】")
- exit_stats = self.trades.groupby('退出原因').agg({
- '实际盈亏': ['count', 'sum', 'mean']
- }).round(2)
- exit_stats.columns = ['次数', '总盈亏', '平均盈亏']
- print(exit_stats.to_string())
- # 对比原策略
- print(f"\n【与原策略对比】")
- original_pnl = -123843 # 原策略亏损
- improvement = total_pnl - original_pnl
- print(f" 原策略盈亏: {original_pnl:+,.0f}元")
- print(f" 优化策略盈亏: {total_pnl:+,.0f}元")
- print(f" 改善幅度: {improvement:+,.0f}元")
- print(f" 提升倍数: {abs(total_pnl / original_pnl) if original_pnl != 0 else 0:.1f}x")
- # 最近交易
- print(f"\n【最近5笔交易】")
- recent = self.trades.tail(5)[['开仓时间', '平仓时间', '仓位', '实际盈亏', '退出原因']]
- print(recent.to_string(index=False))
- def main():
- """主函数"""
- print("="*60)
- print("创业板50 T+1 优化策略回测系统")
- print("="*60)
- # 加载数据
- raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
- # 计算增强指标
- data = calculate_enhanced_indicators(raw_data)
- # 获取原策略交易信号
- print("\n📡 生成原策略交易信号...")
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- signal_generator = DualDirectionSignalGenerator()
- executor = DualDirectionExecutor(initial_capital=1000000)
- data_with_indicators = fetcher.calculate_intraday_indicators(data)
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- # 运行优化策略回测
- strategy = OptimizedStrategy(initial_capital=1000000)
- optimized_trades = strategy.backtest(data, trades_df)
- # 生成报告
- strategy.report()
- print("\n" + "="*60)
- print("✅ 回测完成")
- print("="*60)
- if __name__ == '__main__':
- main()
|