#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50 T+1 高级优化策略 - 简化版 聚焦: 移动止损、分批止盈效果验证 """ import pandas as pd import numpy as np from datetime import datetime import warnings warnings.filterwarnings('ignore') from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor from t1_converter import simulate_t1_trades def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'): df = pd.read_csv(csv_file) df['DateTime'] = pd.to_datetime(df['DateTime']) df.set_index('DateTime', inplace=True) df.sort_index(inplace=True) if 'Open' not in df.columns and 'o' in df.columns: df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True) for col in ['Open', 'High', 'Low', 'Close', 'Volume']: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') df['Returns'] = df['Close'].pct_change() df.ffill(inplace=True) df.dropna(inplace=True) return df def calculate_indicators(df): """计算指标""" # RSI delta = df['Close'].diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss df['RSI_14'] = 100 - (100 / (1 + rs)) # 动量 df['Momentum_5'] = (df['Close'] / df['Close'].shift(5) - 1) * 100 # 均线 df['EMA_5'] = df['Close'].ewm(span=5, adjust=False).mean() df['EMA_20'] = df['Close'].ewm(span=20, adjust=False).mean() df['Trend_Score'] = 0 df.loc[df['Close'] > df['EMA_5'], 'Trend_Score'] += 1 df.loc[df['Close'] > df['EMA_20'], 'Trend_Score'] += 1 # 添加Close_Open_Pct(原策略需要) df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open'] df.dropna(inplace=True) return df # 加载数据 print("="*60) print("创业板50 T+1 高级优化策略 - 简化版") print("="*60) raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv') data = calculate_indicators(raw_data) print(f"\n📊 数据: {len(data)}条K线") # 获取原策略交易 config_manager = ConfigManager('config.json') fetcher = IntradayDataFetcher(config_manager) signal_generator = DualDirectionSignalGenerator() executor = DualDirectionExecutor(initial_capital=1000000) data_with_indicators = fetcher.calculate_intraday_indicators(data) signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators) results_df, trades_df = executor.execute_dual_direction_trades(signals_df) long_trades = trades_df[trades_df['交易方向'] == '做多'].copy() t1_trades = simulate_t1_trades(data_with_indicators, long_trades, 1000000) print(f"\n📈 原策略交易: {len(t1_trades)}笔") # 准备数据 t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间']) t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间']) t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0 # 计算期间高低点(用于移动止损) for idx, row in t1_trades.iterrows(): mask = (data.index >= row['开仓时间']) & (data.index <= row['平仓时间']) if mask.any(): period_data = data.loc[mask] t1_trades.loc[idx, '期间最高'] = period_data['High'].max() t1_trades.loc[idx, '期间最低'] = period_data['Low'].min() else: t1_trades.loc[idx, '期间最高'] = row['平仓价格'] t1_trades.loc[idx, '期间最低'] = row['平仓价格'] # 策略对比 print("\n" + "="*60) print("📊 策略对比分析") print("="*60) strategies = [] # 策略1: 原策略 original_pnl = t1_trades['盈亏金额'].sum() original_winrate = (t1_trades['盈亏金额'] > 0).mean() * 100 strategies.append({ '策略': '原策略(止损0.8%止盈2%)', '交易数': len(t1_trades), '胜率': f"{original_winrate:.1f}%", '总盈亏': f"{original_pnl:+.0f}", '改善': '-' }) # 策略2: 固定止损0.8%止盈2.5% def simulate_fixed_sl_tp(trades, sl, tp, position=0.7): total = 0 for _, row in trades.iterrows(): entry = row['开仓价格'] exit_p = row['平仓价格'] pnl_pct = (exit_p - entry) / entry if pnl_pct <= -sl: actual = -sl elif pnl_pct >= tp: actual = tp else: actual = pnl_pct total += actual * 1000000 * position return total pnl_v1 = simulate_fixed_sl_tp(t1_trades, 0.008, 0.025) strategies.append({ '策略': '优化V1(固定SL0.8% TP2.5%)', '交易数': len(t1_trades), '胜率': f"{original_winrate:.1f}%", '总盈亏': f"{pnl_v1:+.0f}", '改善': f"{pnl_v1 - original_pnl:+.0f}" }) # 策略3: 移动止损 def simulate_trailing_stop(trades, sl, tp, trail_activate, trail_dist, position=0.7): total = 0 wins = 0 losses = 0 for _, row in trades.iterrows(): entry = row['开仓价格'] exit_p = row['平仓价格'] high = row['期间最高'] pnl_pct = (exit_p - entry) / entry max_profit_pct = (high - entry) / entry # 硬止损 if pnl_pct <= -sl: actual = -sl losses += 1 # 移动止损激活 elif max_profit_pct >= trail_activate: trail_stop_price = high * (1 - trail_dist) trail_stop_pct = (trail_stop_price - entry) / entry if pnl_pct <= trail_stop_pct: actual = trail_stop_pct else: actual = pnl_pct if actual > 0: wins += 1 else: losses += 1 # 目标止盈 elif pnl_pct >= tp: actual = tp wins += 1 else: actual = pnl_pct if actual > 0: wins += 1 else: losses += 1 total += actual * 1000000 * position win_rate = wins / (wins + losses) * 100 if (wins + losses) > 0 else 0 return total, win_rate pnl_v2, wr_v2 = simulate_trailing_stop(t1_trades, 0.008, 0.025, 0.01, 0.005) strategies.append({ '策略': '优化V2(移动止损)', '交易数': len(t1_trades), '胜率': f"{wr_v2:.1f}%", '总盈亏': f"{pnl_v2:+.0f}", '改善': f"{pnl_v2 - original_pnl:+.0f}" }) # 策略4: 分批止盈 def simulate_partial_exit(trades, sl, tp1, tp2, ratio, position=0.7): total = 0 for _, row in trades.iterrows(): entry = row['开仓价格'] exit_p = row['平仓价格'] high = row['期间最高'] max_profit_pct = (high - entry) / entry # 第一批止盈 if max_profit_pct >= tp1: # 假设50%仓位在tp1止盈,剩余按实际或tp2 if max_profit_pct >= tp2: actual = tp1 * ratio + tp2 * (1 - ratio) else: actual_pnl = (exit_p - entry) / entry actual = tp1 * ratio + actual_pnl * (1 - ratio) elif (exit_p - entry) / entry <= -sl: actual = -sl else: actual = (exit_p - entry) / entry total += actual * 1000000 * position return total pnl_v3 = simulate_partial_exit(t1_trades, 0.008, 0.015, 0.03, 0.5) strategies.append({ '策略': '优化V3(分批止盈 1.5%+3%)', '交易数': len(t1_trades), '胜率': f"{original_winrate:.1f}%", '总盈亏': f"{pnl_v3:+.0f}", '改善': f"{pnl_v3 - original_pnl:+.0f}" }) # 策略5: 最优组合 def simulate_optimal(trades, position=0.7): total = 0 for _, row in trades.iterrows(): entry = row['开仓价格'] exit_p = row['平仓价格'] high = row['期间最高'] rsi = 50 # 获取RSI mask = data.index <= row['开仓时间'] if mask.any(): try: rsi = data.loc[data.index[mask][-1], 'RSI_14'] except: pass # 只在RSI 35-55交易 if not (35 <= rsi <= 55): continue pnl_pct = (exit_p - entry) / entry max_profit_pct = (high - entry) / entry # 移动止损 + 分批止盈组合 if pnl_pct <= -0.008: actual = -0.008 elif max_profit_pct >= 0.015: # 部分止盈 trail_stop = max_profit_pct - 0.005 if pnl_pct <= trail_stop and trail_stop > 0.01: actual = 0.015 * 0.5 + trail_stop * 0.5 elif pnl_pct >= 0.03: actual = 0.015 * 0.5 + 0.03 * 0.5 else: actual = pnl_pct elif pnl_pct >= 0.025: actual = 0.025 else: actual = pnl_pct total += actual * 1000000 * position return total pnl_v4 = simulate_optimal(t1_trades) # 统计优化V4的交易数 count_v4 = 0 for _, row in t1_trades.iterrows(): mask = data.index <= row['开仓时间'] if mask.any(): try: rsi = data.loc[data.index[mask][-1], 'RSI_14'] if 35 <= rsi <= 55: count_v4 += 1 except: pass strategies.append({ '策略': '优化V4(RSI35-55+移动+分批)', '交易数': count_v4, '胜率': '-', '总盈亏': f"{pnl_v4:+.0f}", '改善': f"{pnl_v4 - original_pnl:+.0f}" }) # 打印结果 results_df = pd.DataFrame(strategies) print("\n" + results_df.to_string(index=False)) # 最优参数推荐 print("\n" + "="*60) print("🏆 最优策略推荐") print("="*60) best_pnl = max([original_pnl, pnl_v1, pnl_v2, pnl_v3, pnl_v4]) if best_pnl == pnl_v2: print("【推荐】优化V2: 移动止损策略") print(" 参数: 止损0.8%, 止盈2.5%, 移动激活1%, 移动距离0.5%") print(f" 预期收益: {pnl_v2:+.0f}元") elif best_pnl == pnl_v4: print("【推荐】优化V4: 综合策略") print(" 参数: RSI35-55过滤 + 移动止损 + 分批止盈") print(f" 预期收益: {pnl_v4:+.0f}元") elif best_pnl == pnl_v3: print("【推荐】优化V3: 分批止盈策略") print(" 参数: 第一批1.5%止盈50%, 第二批3%止盈剩余") print(f" 预期收益: {pnl_v3:+.0f}元") else: print("【推荐】优化V1: 固定止盈2.5%") print(f" 预期收益: {pnl_v1:+.0f}元") print("\n" + "="*60) print("✅ 分析完成") print("="*60)