| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50 T+1 高级优化策略 - 简化版
- 聚焦: 移动止损、分批止盈效果验证
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime
- import warnings
- warnings.filterwarnings('ignore')
- from cyb50_30min_dual_direction import ConfigManager, IntradayDataFetcher, DualDirectionSignalGenerator, DualDirectionExecutor
- from t1_converter import simulate_t1_trades
- def load_local_data(csv_file='cyb50_30min_2023_to_20260325.csv'):
- df = pd.read_csv(csv_file)
- df['DateTime'] = pd.to_datetime(df['DateTime'])
- df.set_index('DateTime', inplace=True)
- df.sort_index(inplace=True)
- if 'Open' not in df.columns and 'o' in df.columns:
- df.rename(columns={'o':'Open','h':'High','l':'Low','c':'Close','v':'Volume','a':'Amount'}, inplace=True)
- for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- df['Returns'] = df['Close'].pct_change()
- df.ffill(inplace=True)
- df.dropna(inplace=True)
- return df
- def calculate_indicators(df):
- """计算指标"""
- # RSI
- delta = df['Close'].diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
- rs = gain / loss
- df['RSI_14'] = 100 - (100 / (1 + rs))
- # 动量
- df['Momentum_5'] = (df['Close'] / df['Close'].shift(5) - 1) * 100
- # 均线
- df['EMA_5'] = df['Close'].ewm(span=5, adjust=False).mean()
- df['EMA_20'] = df['Close'].ewm(span=20, adjust=False).mean()
- df['Trend_Score'] = 0
- df.loc[df['Close'] > df['EMA_5'], 'Trend_Score'] += 1
- df.loc[df['Close'] > df['EMA_20'], 'Trend_Score'] += 1
- # 添加Close_Open_Pct(原策略需要)
- df['Close_Open_Pct'] = (df['Close'] - df['Open']) / df['Open']
- df.dropna(inplace=True)
- return df
- # 加载数据
- print("="*60)
- print("创业板50 T+1 高级优化策略 - 简化版")
- print("="*60)
- raw_data = load_local_data('cyb50_30min_2023_to_20260325.csv')
- data = calculate_indicators(raw_data)
- print(f"\n📊 数据: {len(data)}条K线")
- # 获取原策略交易
- config_manager = ConfigManager('config.json')
- fetcher = IntradayDataFetcher(config_manager)
- signal_generator = DualDirectionSignalGenerator()
- executor = DualDirectionExecutor(initial_capital=1000000)
- data_with_indicators = fetcher.calculate_intraday_indicators(data)
- signals_df = signal_generator.generate_dual_direction_signals(data_with_indicators)
- results_df, trades_df = executor.execute_dual_direction_trades(signals_df)
- long_trades = trades_df[trades_df['交易方向'] == '做多'].copy()
- t1_trades = simulate_t1_trades(data_with_indicators, long_trades, 1000000)
- print(f"\n📈 原策略交易: {len(t1_trades)}笔")
- # 准备数据
- t1_trades['开仓时间'] = pd.to_datetime(t1_trades['开仓时间'])
- t1_trades['平仓时间'] = pd.to_datetime(t1_trades['平仓时间'])
- t1_trades['是否盈利'] = t1_trades['盈亏金额'] > 0
- # 计算期间高低点(用于移动止损)
- for idx, row in t1_trades.iterrows():
- mask = (data.index >= row['开仓时间']) & (data.index <= row['平仓时间'])
- if mask.any():
- period_data = data.loc[mask]
- t1_trades.loc[idx, '期间最高'] = period_data['High'].max()
- t1_trades.loc[idx, '期间最低'] = period_data['Low'].min()
- else:
- t1_trades.loc[idx, '期间最高'] = row['平仓价格']
- t1_trades.loc[idx, '期间最低'] = row['平仓价格']
- # 策略对比
- print("\n" + "="*60)
- print("📊 策略对比分析")
- print("="*60)
- strategies = []
- # 策略1: 原策略
- original_pnl = t1_trades['盈亏金额'].sum()
- original_winrate = (t1_trades['盈亏金额'] > 0).mean() * 100
- strategies.append({
- '策略': '原策略(止损0.8%止盈2%)',
- '交易数': len(t1_trades),
- '胜率': f"{original_winrate:.1f}%",
- '总盈亏': f"{original_pnl:+.0f}",
- '改善': '-'
- })
- # 策略2: 固定止损0.8%止盈2.5%
- def simulate_fixed_sl_tp(trades, sl, tp, position=0.7):
- total = 0
- for _, row in trades.iterrows():
- entry = row['开仓价格']
- exit_p = row['平仓价格']
- pnl_pct = (exit_p - entry) / entry
- if pnl_pct <= -sl:
- actual = -sl
- elif pnl_pct >= tp:
- actual = tp
- else:
- actual = pnl_pct
- total += actual * 1000000 * position
- return total
- pnl_v1 = simulate_fixed_sl_tp(t1_trades, 0.008, 0.025)
- strategies.append({
- '策略': '优化V1(固定SL0.8% TP2.5%)',
- '交易数': len(t1_trades),
- '胜率': f"{original_winrate:.1f}%",
- '总盈亏': f"{pnl_v1:+.0f}",
- '改善': f"{pnl_v1 - original_pnl:+.0f}"
- })
- # 策略3: 移动止损
- def simulate_trailing_stop(trades, sl, tp, trail_activate, trail_dist, position=0.7):
- total = 0
- wins = 0
- losses = 0
- for _, row in trades.iterrows():
- entry = row['开仓价格']
- exit_p = row['平仓价格']
- high = row['期间最高']
- pnl_pct = (exit_p - entry) / entry
- max_profit_pct = (high - entry) / entry
- # 硬止损
- if pnl_pct <= -sl:
- actual = -sl
- losses += 1
- # 移动止损激活
- elif max_profit_pct >= trail_activate:
- trail_stop_price = high * (1 - trail_dist)
- trail_stop_pct = (trail_stop_price - entry) / entry
- if pnl_pct <= trail_stop_pct:
- actual = trail_stop_pct
- else:
- actual = pnl_pct
- if actual > 0:
- wins += 1
- else:
- losses += 1
- # 目标止盈
- elif pnl_pct >= tp:
- actual = tp
- wins += 1
- else:
- actual = pnl_pct
- if actual > 0:
- wins += 1
- else:
- losses += 1
- total += actual * 1000000 * position
- win_rate = wins / (wins + losses) * 100 if (wins + losses) > 0 else 0
- return total, win_rate
- pnl_v2, wr_v2 = simulate_trailing_stop(t1_trades, 0.008, 0.025, 0.01, 0.005)
- strategies.append({
- '策略': '优化V2(移动止损)',
- '交易数': len(t1_trades),
- '胜率': f"{wr_v2:.1f}%",
- '总盈亏': f"{pnl_v2:+.0f}",
- '改善': f"{pnl_v2 - original_pnl:+.0f}"
- })
- # 策略4: 分批止盈
- def simulate_partial_exit(trades, sl, tp1, tp2, ratio, position=0.7):
- total = 0
- for _, row in trades.iterrows():
- entry = row['开仓价格']
- exit_p = row['平仓价格']
- high = row['期间最高']
- max_profit_pct = (high - entry) / entry
- # 第一批止盈
- if max_profit_pct >= tp1:
- # 假设50%仓位在tp1止盈,剩余按实际或tp2
- if max_profit_pct >= tp2:
- actual = tp1 * ratio + tp2 * (1 - ratio)
- else:
- actual_pnl = (exit_p - entry) / entry
- actual = tp1 * ratio + actual_pnl * (1 - ratio)
- elif (exit_p - entry) / entry <= -sl:
- actual = -sl
- else:
- actual = (exit_p - entry) / entry
- total += actual * 1000000 * position
- return total
- pnl_v3 = simulate_partial_exit(t1_trades, 0.008, 0.015, 0.03, 0.5)
- strategies.append({
- '策略': '优化V3(分批止盈 1.5%+3%)',
- '交易数': len(t1_trades),
- '胜率': f"{original_winrate:.1f}%",
- '总盈亏': f"{pnl_v3:+.0f}",
- '改善': f"{pnl_v3 - original_pnl:+.0f}"
- })
- # 策略5: 最优组合
- def simulate_optimal(trades, position=0.7):
- total = 0
- for _, row in trades.iterrows():
- entry = row['开仓价格']
- exit_p = row['平仓价格']
- high = row['期间最高']
- rsi = 50
- # 获取RSI
- mask = data.index <= row['开仓时间']
- if mask.any():
- try:
- rsi = data.loc[data.index[mask][-1], 'RSI_14']
- except:
- pass
- # 只在RSI 35-55交易
- if not (35 <= rsi <= 55):
- continue
- pnl_pct = (exit_p - entry) / entry
- max_profit_pct = (high - entry) / entry
- # 移动止损 + 分批止盈组合
- if pnl_pct <= -0.008:
- actual = -0.008
- elif max_profit_pct >= 0.015:
- # 部分止盈
- trail_stop = max_profit_pct - 0.005
- if pnl_pct <= trail_stop and trail_stop > 0.01:
- actual = 0.015 * 0.5 + trail_stop * 0.5
- elif pnl_pct >= 0.03:
- actual = 0.015 * 0.5 + 0.03 * 0.5
- else:
- actual = pnl_pct
- elif pnl_pct >= 0.025:
- actual = 0.025
- else:
- actual = pnl_pct
- total += actual * 1000000 * position
- return total
- pnl_v4 = simulate_optimal(t1_trades)
- # 统计优化V4的交易数
- count_v4 = 0
- for _, row in t1_trades.iterrows():
- mask = data.index <= row['开仓时间']
- if mask.any():
- try:
- rsi = data.loc[data.index[mask][-1], 'RSI_14']
- if 35 <= rsi <= 55:
- count_v4 += 1
- except:
- pass
- strategies.append({
- '策略': '优化V4(RSI35-55+移动+分批)',
- '交易数': count_v4,
- '胜率': '-',
- '总盈亏': f"{pnl_v4:+.0f}",
- '改善': f"{pnl_v4 - original_pnl:+.0f}"
- })
- # 打印结果
- results_df = pd.DataFrame(strategies)
- print("\n" + results_df.to_string(index=False))
- # 最优参数推荐
- print("\n" + "="*60)
- print("🏆 最优策略推荐")
- print("="*60)
- best_pnl = max([original_pnl, pnl_v1, pnl_v2, pnl_v3, pnl_v4])
- if best_pnl == pnl_v2:
- print("【推荐】优化V2: 移动止损策略")
- print(" 参数: 止损0.8%, 止盈2.5%, 移动激活1%, 移动距离0.5%")
- print(f" 预期收益: {pnl_v2:+.0f}元")
- elif best_pnl == pnl_v4:
- print("【推荐】优化V4: 综合策略")
- print(" 参数: RSI35-55过滤 + 移动止损 + 分批止盈")
- print(f" 预期收益: {pnl_v4:+.0f}元")
- elif best_pnl == pnl_v3:
- print("【推荐】优化V3: 分批止盈策略")
- print(" 参数: 第一批1.5%止盈50%, 第二批3%止盈剩余")
- print(f" 预期收益: {pnl_v3:+.0f}元")
- else:
- print("【推荐】优化V1: 固定止盈2.5%")
- print(f" 预期收益: {pnl_v1:+.0f}元")
- print("\n" + "="*60)
- print("✅ 分析完成")
- print("="*60)
|