| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50 T+1 高级优化策略V2
- 探索: 移动止损、分批止盈、多因子评分、市场环境自适应
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime, timedelta
- import warnings
- from itertools import product
- warnings.filterwarnings('ignore')
- # 导入原策略组件
- from cyb50_30min_dual_direction import (
- ConfigManager,
- IntradayDataFetcher,
- DualDirectionSignalGenerator,
- DualDirectionExecutor
- )
- from t1_converter import simulate_t1_trades
- def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
- """加载本地数据"""
- print(f"📊 加载数据 {csv_file}...")
- df = pd.read_csv(csv_file)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
- for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df['Returns'] = df['Close'].pct_change()
- df['High_Low_Pct'] = (df['High'] - df['Low']) / df['Close'].shift(1)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- print(f"✅ 数据加载完成: {len(df)}条K线")
- return df
- def calculate_advanced_indicators(df):
- """计算高级技术指标"""
- print("📈 计算高级指标...")
- # RSI多种周期
- for period in [6, 14, 21]:
- delta = df['Close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
- rs = gain / loss
- df[f'RSI_{period}'] = 100 - (100 / (1 + rs))
- # 动量
- for period in [3, 5, 10]:
- df[f'Momentum_{period}'] = (df['Close'] / df['Close'].shift(period) - 1) * 100
- # 均线
- for period in [5, 20, 60]:
- df[f'EMA_{period}'] = df['Close'].ewm(span=period, adjust=False).mean()
- # 趋势评分
- df['Trend_Score'] = 0
- df.loc[df['Close'] > df['EMA_5'], 'Trend_Score'] += 1
- df.loc[df['Close'] > df['EMA_20'], 'Trend_Score'] += 1
- df.loc[df['Close'] > df['EMA_60'], 'Trend_Score'] += 1
- # 波动率
- df['Volatility'] = df['Returns'].rolling(20).std() * np.sqrt(48)
- df['Vol_MA'] = df['Volatility'].rolling(20).mean()
- df['Vol_Regime'] = '正常'
- df.loc[df['Volatility'] > df['Vol_MA'] * 1.5, 'Vol_Regime'] = '高波动'
- df.loc[df['Volatility'] < df['Vol_MA'] * 0.5, 'Vol_Regime'] = '低波动'
- # 成交量
- df['Volume_MA20'] = df['Volume'].rolling(20).mean()
- df['Volume_Ratio'] = df['Volume'] / df['Volume_MA20']
- # MACD
- exp1 = df['Close'].ewm(span=12, adjust=False).mean()
- exp2 = df['Close'].ewm(span=26, adjust=False).mean()
- df['MACD'] = exp1 - exp2
- df['MACD_Signal'] = df['MACD'].ewm(span=9, adjust=False).mean()
- df['MACD_Hist'] = df['MACD'] - df['MACD_Signal']
- # 布林带
- df['BB_Middle'] = df['Close'].rolling(20).mean()
- df['BB_Std'] = df['Close'].rolling(20).std()
- df['BB_Upper'] = df['BB_Middle'] + 2 * df['BB_Std']
- df['BB_Lower'] = df['BB_Middle'] - 2 * df['BB_Std']
- df['BB_Position'] = (df['Close'] - df['BB_Lower']) / (df['BB_Upper'] - df['BB_Lower'])
- df.dropna(inplace=True)
- return df
- class AdvancedOptimizedStrategy:
- """高级优化策略V2"""
- def __init__(self, params=None):
- self.initial_capital = 1000000
- self.current_capital = 1000000
- self.trades = []
- # 默认参数
- self.params = params or {
- 'rsi_low': 30,
- 'rsi_high': 60,
- 'momentum_min': -2,
- 'trend_min': 0,
- 'avoid_hour': 13,
- 'max_daily_trades': 3,
- 'position_base': 0.7,
- 'stop_loss': 0.008,
- 'take_profit': 0.025,
- 'trailing_stop': True,
- 'trailing_activation': 0.01,
- 'trailing_distance': 0.005,
- 'partial_exit': True,
- 'partial_ratio': 0.5,
- 'partial_target': 0.015,
- }
- def calculate_multi_factor_score(self, row):
- """多因子评分系统 (0-100分)"""
- score = 50 # 基础分
- # RSI因子 (0-20分)
- rsi = row['RSI_14']
- if 40 <= rsi <= 50:
- score += 20
- elif 35 <= rsi <= 55:
- score += 15
- elif 30 <= rsi <= 60:
- score += 10
- else:
- score -= 10
- # 趋势因子 (0-20分)
- trend = row['Trend_Score']
- score += trend * 5
- # 动量因子 (0-15分)
- mom = row['Momentum_5']
- if mom > 2:
- score += 15
- elif mom > 0:
- score += 10
- elif mom > -1:
- score += 5
- else:
- score -= 5
- # 波动率因子 (0-15分)
- if row['Vol_Regime'] == '低波动':
- score += 15
- elif row['Vol_Regime'] == '正常':
- score += 10
- else:
- score -= 5
- # MACD因子 (0-15分)
- if row['MACD_Hist'] > 0 and row['MACD'] > 0:
- score += 15
- elif row['MACD_Hist'] > 0:
- score += 10
- elif row['MACD'] > row['MACD_Signal']:
- score += 5
- # 布林带因子 (0-15分)
- bb_pos = row['BB_Position']
- if 0.2 <= bb_pos <= 0.4:
- score += 15
- elif 0.4 <= bb_pos <= 0.6:
- score += 10
- elif bb_pos < 0.2:
- score += 5
- return min(max(score, 0), 100)
- def should_trade(self, timestamp, row, daily_count):
- """判断是否应该交易"""
- # 时间过滤
- if timestamp.hour == self.params['avoid_hour']:
- return False, "避开13点"
- # T+1过滤
- if timestamp.time() > datetime.strptime('14:30', '%H:%M').time():
- return False, "避开T+1"
- # 每日交易次数限制
- date = timestamp.date()
- if daily_count.get(date, 0) >= self.params['max_daily_trades']:
- return False, "已达日交易上限"
- # 基础RSI过滤
- rsi = row['RSI_14']
- if not (self.params['rsi_low'] <= rsi <= self.params['rsi_high']):
- return False, f"RSI {rsi:.1f} 不在范围内"
- # 动量过滤
- if row['Momentum_5'] < self.params['momentum_min']:
- return False, f"动量 {row['Momentum_5']:.2f} 不足"
- return True, "通过"
- def simulate_advanced_exit(self, entry_price, exit_price, high_price, low_price,
- entry_time, exit_time, position_size):
- """模拟高级退出策略(移动止损+分批止盈)"""
- pnl_pct = (exit_price - entry_price) / entry_price
- # 计算期间最大盈利(用于移动止损)
- if high_price > entry_price:
- max_profit_pct = (high_price - entry_price) / entry_price
- else:
- max_profit_pct = 0
- exit_reason = '正常平仓'
- actual_pnl_pct = pnl_pct
- # 1. 硬止损检查
- if pnl_pct <= -self.params['stop_loss']:
- actual_pnl_pct = -self.params['stop_loss']
- exit_reason = '止损'
- return actual_pnl_pct, exit_reason
- # 2. 分批止盈检查
- if self.params['partial_exit'] and max_profit_pct >= self.params['partial_target']:
- # 达到分批止盈条件,假设50%仓位止盈
- partial_pnl = self.params['partial_target'] * self.params['partial_ratio']
- remaining_pnl = pnl_pct * (1 - self.params['partial_ratio'])
- actual_pnl_pct = partial_pnl + remaining_pnl
- exit_reason = '分批止盈'
- # 3. 移动止损检查
- elif self.params['trailing_stop'] and max_profit_pct >= self.params['trailing_activation']:
- # 激活移动止损
- trailing_stop_price = high_price * (1 - self.params['trailing_distance'])
- trailing_stop_pct = (trailing_stop_price - entry_price) / entry_price
- if pnl_pct <= trailing_stop_pct:
- actual_pnl_pct = trailing_stop_pct
- exit_reason = '移动止损'
- # 4. 目标止盈检查
- elif pnl_pct >= self.params['take_profit']:
- actual_pnl_pct = self.params['take_profit']
- exit_reason = '止盈'
- return actual_pnl_pct, exit_reason
- def backtest(self, data, trades_df):
- """执行回测"""
- print("\n🚀 开始高级优化策略回测")
- t1_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间'])
- t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间'])
- daily_count = {}
- for idx, trade in t1_trades.iterrows():
- try:
- mask = data.index <= trade['开仓时间']
- if not mask.any():
- continue
- current_idx = data.index[mask][-1]
- current_data = data.loc[current_idx]
- should_trade, reason = self.should_trade(trade['开仓时间'], current_data, daily_count)
- if should_trade:
- position_size = self.params['position_base']
- # 多因子评分调整仓位
- factor_score = self.calculate_multi_factor_score(current_data)
- if factor_score >= 80:
- position_size = min(position_size * 1.2, 1.0)
- elif factor_score < 50:
- position_size = position_size * 0.7
- # 获取期间高低点(简化处理)
- exit_mask = (data.index >= trade['开仓时间']) & (data.index <= trade['平仓时间'])
- if exit_mask.any():
- period_data = data.loc[exit_mask]
- high_price = period_data['High'].max()
- low_price = period_data['Low'].min()
- else:
- high_price = trade['平仓价格']
- low_price = trade['平仓价格']
- pnl_pct, exit_reason = self.simulate_advanced_exit(
- trade['开仓价格'],
- trade['平仓价格'],
- high_price,
- low_price,
- trade['开仓时间'],
- trade['平仓时间'],
- position_size
- )
- position_value = self.current_capital * position_size
- pnl_amount = pnl_pct * position_value
- self.trades.append({
- '开仓时间': trade['开仓时间'],
- '平仓时间': trade['平仓时间'],
- '仓位': position_size,
- '因子评分': factor_score,
- '盈亏比例': pnl_pct,
- '实际盈亏': pnl_amount,
- '退出原因': exit_reason,
- })
- self.current_capital += pnl_amount
- date = trade['开仓时间'].date()
- daily_count[date] = daily_count.get(date, 0) + 1
- except Exception as e:
- continue
- return pd.DataFrame(self.trades)
- def report(self):
- """生成报告"""
- if len(self.trades) == 0:
- return {"error": "无交易记录"}
- df = pd.DataFrame(self.trades)
- total_trades = len(df)
- wins = df[df['实际盈亏'] > 0]
- losses = df[df['实际盈亏'] < 0]
- win_rate = len(wins) / total_trades * 100
- total_pnl = df['实际盈亏'].sum()
- total_return = (self.current_capital - self.initial_capital) / self.initial_capital * 100
- profits = wins['实际盈亏'].sum() if len(wins) > 0 else 0
- loss_amount = abs(losses['实际盈亏'].sum()) if len(losses) > 0 else 1
- profit_factor = profits / loss_amount
- return {
- '总交易次数': total_trades,
- '胜率': win_rate,
- '总盈亏': total_pnl,
- '总收益率': total_return,
- '盈亏比': profit_factor,
- '最终资金': self.current_capital,
- }
- def grid_search_optimization(data, trades_df):
- """网格搜索最优参数"""
- print("\n" + "="*60)
- print("🔍 网格搜索最优参数")
- print("="*60)
- # 参数搜索空间
- param_grid = {
- 'stop_loss': [0.005, 0.008, 0.01, 0.015],
- 'take_profit': [0.02, 0.025, 0.03, 0.035],
- 'trailing_activation': [0.008, 0.01, 0.015],
- 'trailing_distance': [0.003, 0.005, 0.008],
- }
- results = []
- for sl in param_grid['stop_loss']:
- for tp in param_grid['take_profit']:
- for ta in param_grid['trailing_activation']:
- for td in param_grid['trailing_distance']:
- params = {
- 'rsi_low': 30,
- 'rsi_high': 60,
- 'momentum_min': -2,
- 'trend_min': 0,
- 'avoid_hour': 13,
- 'max_daily_trades': 3,
- 'position_base': 0.7,
- 'stop_loss': sl,
- 'take_profit': tp,
- 'trailing_stop': True,
- 'trailing_activation': ta,
- 'trailing_distance': td,
- 'partial_exit': False,
- }
- strategy = AdvancedOptimizedStrategy(params)
- strategy.backtest(data, trades_df)
- result = strategy.report()
- if 'error' not in result and result['总交易次数'] >= 20:
- results.append({
- '止损': sl,
- '止盈': tp,
- '移动激活': ta,
- '移动距离': td,
- '交易数': result['总交易次数'],
- '胜率': result['胜率'],
- '总盈亏': result['总盈亏'],
- '盈亏比': result['盈亏比'],
- '收益率': result['总收益率'],
- })
- results_df = pd.DataFrame(results)
- if len(results_df) > 0:
- print("\n【总盈亏TOP10参数组合】")
- top10 = results_df.nlargest(10, '总盈亏')
- print(top10.to_string(index=False))
- print("\n【胜率TOP5参数组合】(至少30笔)")
- winrate_top = results_df[results_df['交易数'] >= 30].nlargest(5, '胜率')
- print(winrate_top.to_string(index=False))
- print("\n【综合评分TOP5】(盈亏比>1.5且胜率>45%)")
- filtered = results_df[(results_df['盈亏比'] > 1.5) & (results_df['胜率'] > 45)]
- if len(filtered) > 0:
- print(filtered.nlargest(5, '总盈亏').to_string(index=False))
- else:
- print("无满足条件的参数组合")
- return results_df
- return None
- def compare_strategies(data, trades_df):
- """对比不同策略"""
- print("\n" + "="*60)
- print("📊 策略对比分析")
- print("="*60)
- strategies = {
- '原策略(固定SL0.8% TP2%)': {
- 'stop_loss': 0.008, 'take_profit': 0.02,
- 'trailing_stop': False, 'partial_exit': False
- },
- '优化V1(固定SL0.8% TP2.5%)': {
- 'stop_loss': 0.008, 'take_profit': 0.025,
- 'trailing_stop': False, 'partial_exit': False
- },
- '优化V2(移动止损)': {
- 'stop_loss': 0.008, 'take_profit': 0.025,
- 'trailing_stop': True, 'trailing_activation': 0.01,
- 'trailing_distance': 0.005, 'partial_exit': False
- },
- '优化V3(分批止盈)': {
- 'stop_loss': 0.008, 'take_profit': 0.03,
- 'trailing_stop': False, 'partial_exit': True,
- 'partial_ratio': 0.5, 'partial_target': 0.015
- },
- '优化V4(移动+分批)': {
- 'stop_loss': 0.01, 'take_profit': 0.035,
- 'trailing_stop': True, 'trailing_activation': 0.015,
- 'trailing_distance': 0.008, 'partial_exit': True,
- 'partial_ratio': 0.5, 'partial_target': 0.02
- },
- }
- results = []
- for name, params in strategies.items():
- full_params = {
- 'rsi_low': 30, 'rsi_high': 60, 'momentum_min': -2,
- 'trend_min': 0, 'avoid_hour': 13, 'max_daily_trades': 3,
- 'position_base': 0.7, **params
- }
- strategy = AdvancedOptimizedStrategy(full_params)
- strategy.backtest(data, trades_df)
- result = strategy.report()
- if 'error' not in result:
- results.append({
- '策略': name,
- '交易数': result['总交易次数'],
- '胜率': f"{result['胜率']:.1f}%",
- '总盈亏': f"{result['总盈亏']:+.0f}",
- '盈亏比': f"{result['盈亏比']:.2f}",
- '收益率': f"{result['总收益率']:+.2f}%",
- })
- results_df = pd.DataFrame(results)
- print("\n" + results_df.to_string(index=False))
- def main():
- """主函数"""
- print("="*60)
- print("创业板50 T+1 高级优化策略V2")
- print("="*60)
- # 加载数据
- raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
- data = calculate_advanced_indicators(raw_data)
- # 获取原策略交易
- print("\n📡 生成原策略交易信号...")
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- signal_generator = DualDirectionSignalGenerator()
- executor = DualDirectionExecutor(initial_capital=1000000)
- data_with_indicators = fetcher.calculate_intraday_indicators(data)
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- # 策略对比
- compare_strategies(data, trades_df)
- # 网格搜索最优参数
- grid_results = grid_search_optimization(data, trades_df)
- # 使用最优参数最终回测
- if grid_results is not None and len(grid_results) > 0:
- best_params = grid_results.loc[grid_results['总盈亏'].idxmax()]
- print(f"\n" + "="*60)
- print("🏆 最优参数最终回测")
- print("="*60)
- print(f"止损: {best_params['止损']}")
- print(f"止盈: {best_params['止盈']}")
- print(f"移动激活: {best_params['移动激活']}")
- print(f"移动距离: {best_params['移动距离']}")
- print(f"预期盈亏: {best_params['总盈亏']:,.0f}元")
- print(f"预期收益率: {best_params['收益率']:+.2f}%")
- print("\n" + "="*60)
- print("✅ 分析完成")
- print("="*60)
- if __name__ == '__main__':
- main()
|