| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50指数真实数据回测 - 高收益优化版
- """
- import pandas as pd
- import numpy as np
- import matplotlib
- matplotlib.use('Agg')
- import matplotlib.pyplot as plt
- import warnings
- warnings.filterwarnings('ignore')
- # 尝试获取真实数据
- def get_real_data():
- """获取创业板50指数真实数据"""
- try:
- import akshare as ak
- # 获取创业板指数据作为代理(创业板50数据较难获取)
- print("尝试从akshare获取数据...")
- df = ak.index_zh_a_hist(symbol="399006", period="daily",
- start_date="20170101", end_date="20251231")
- df['日期'] = pd.to_datetime(df['日期'])
- df = df.rename(columns={
- '日期': 'date',
- '开盘': 'open',
- '收盘': 'close',
- '最高': 'high',
- '最低': 'low',
- '成交量': 'volume'
- })
- df = df.set_index('date').sort_index()
- print(f"成功获取真实数据: {df.index[0]} ~ {df.index[-1]}, 共{len(df)}条")
- return df
- except Exception as e:
- print(f"获取真实数据失败: {e}")
- return None
- # 如果没有真实数据,生成更贴近真实的数据
- def generate_realistic_data():
- """基于创业板历史特征生成更真实的模拟数据"""
- np.random.seed(42)
- dates = pd.date_range(start='2017-01-01', end='2025-12-31', freq='D')
- dates = dates[dates.dayofweek < 5]
- n = len(dates)
-
- # 创业板实际历史特征:
- # 2017: 震荡下跌 (-10%)
- # 2018: 大跌 (-28%)
- # 2019: 大涨 (+44%)
- # 2020: 大涨 (+65%)
- # 2021: 上涨 (+12%)
- # 2022: 大跌 (-29%)
- # 2023: 下跌 (-19%)
- # 2024: 震荡反弹 (+15%假设)
- # 2025: 继续上涨 (+10%假设)
-
- returns = np.random.normal(0, 0.015, n)
-
- # 按年份调整
- for i, date in enumerate(dates):
- year = date.year
- if year == 2017:
- returns[i] += np.random.normal(-0.0004, 0.012)
- elif year == 2018:
- returns[i] += np.random.normal(-0.0012, 0.018)
- elif year == 2019:
- returns[i] += np.random.normal(0.0015, 0.018)
- elif year == 2020:
- returns[i] += np.random.normal(0.0020, 0.022)
- elif year == 2021:
- returns[i] += np.random.normal(0.0005, 0.015)
- elif year == 2022:
- returns[i] += np.random.normal(-0.0013, 0.020)
- elif year == 2023:
- returns[i] += np.random.normal(-0.0009, 0.015)
- elif year == 2024:
- returns[i] += np.random.normal(0.0006, 0.018)
- elif year == 2025:
- returns[i] += np.random.normal(0.0004, 0.012)
-
- # 生成价格序列
- price = 1800
- prices = [price]
- for r in returns:
- price *= (1 + r)
- prices.append(price)
- prices = prices[1:]
-
- df = pd.DataFrame(index=dates)
- df['close'] = prices
- df['open'] = df['close'] * (1 + np.random.normal(0, 0.005, n))
- df['high'] = df[['open', 'close']].max(axis=1) * (1 + np.abs(np.random.normal(0, 0.008, n)))
- df['low'] = df[['open', 'close']].min(axis=1) * (1 - np.abs(np.random.normal(0, 0.008, n)))
-
- return df
- # ==================== 高性能策略 ====================
- class HighPerformanceStrategy:
- """
- 高收益策略:趋势跟踪 + 动量加速 + 智能止盈
- """
-
- def __init__(self, params=None):
- self.params = params or {
- 'fast_ma': 5, # 超短均线,快速响应
- 'slow_ma': 20, # 月均线
- 'trend_ma': 60, # 季均线
- 'momentum_period': 10,
- 'volatility_period': 20,
- 'max_position': 1.0,
- 'profit_take': 0.15, # 15%止盈
- 'trailing_stop': 0.08, # 8%移动止损
- }
- # 确保所有参数都有默认值
- default_params = {
- 'fast_ma': 5,
- 'slow_ma': 20,
- 'trend_ma': 60,
- 'momentum_period': 10,
- 'volatility_period': 20,
- 'max_position': 1.0,
- 'profit_take': 0.15,
- 'trailing_stop': 0.08,
- }
- if params:
- for key, val in default_params.items():
- if key not in params:
- self.params[key] = val
- self.position = 0
- self.entry_price = None
- self.max_price = None
-
- def generate_signal(self, data):
- """生成交易信号"""
- close = data['close']
- p = self.params
-
- # 计算指标
- ma_fast = close.rolling(p['fast_ma']).mean().iloc[-1]
- ma_slow = close.rolling(p['slow_ma']).mean().iloc[-1]
- ma_trend = close.rolling(p['trend_ma']).mean().iloc[-1]
-
- # 动量
- momentum = (close.iloc[-1] / close.iloc[-p['momentum_period']] - 1) * 100
-
- # 波动率
- returns = close.pct_change()
- vol = returns.rolling(p['volatility_period']).std().iloc[-1] * np.sqrt(252) * 100
-
- curr_price = close.iloc[-1]
-
- # 趋势强度
- trend_strong = (curr_price > ma_fast) and (ma_fast > ma_slow) and (ma_slow > ma_trend)
- trend_weak = (curr_price < ma_fast) and (ma_fast < ma_slow)
-
- # 信号生成
- if trend_strong and momentum > 2:
- # 强势上涨,满仓
- target_pos = p['max_position']
- state = "STRONG_UP"
- elif trend_strong and momentum > 0:
- # 趋势向上但动量一般,80%仓位
- target_pos = p['max_position'] * 0.8
- state = "UP"
- elif trend_weak or momentum < -3:
- # 趋势转弱,空仓
- target_pos = 0
- state = "DOWN"
- else:
- # 震荡,50%仓位
- target_pos = p['max_position'] * 0.5
- state = "OSCILLATE"
-
- # 移动止盈
- if self.position > 0 and self.max_price:
- current_return = (curr_price - self.entry_price) / self.entry_price
-
- # 更新最高价
- if curr_price > self.max_price:
- self.max_price = curr_price
-
- # 移动止损:从最高点回撤8%离场
- drawdown_from_peak = (curr_price - self.max_price) / self.max_price
- if drawdown_from_peak < -p['trailing_stop']:
- target_pos = 0
- state = "TRAILING_STOP"
-
- # 固定止盈15%
- elif current_return > p['profit_take']:
- target_pos = 0.5 # 减半仓,锁定利润
- state = "PROFIT_TAKE"
-
- # 更新状态
- if target_pos > 0 and self.position == 0:
- self.entry_price = curr_price
- self.max_price = curr_price
- elif target_pos == 0:
- self.entry_price = None
- self.max_price = None
-
- self.position = target_pos
- return target_pos, state
- # ==================== 回测引擎 ====================
- def backtest(data, strategy, start_date=None, end_date=None, warmup=60):
- """回测引擎"""
- if start_date:
- data = data[data.index >= start_date]
- if end_date:
- data = data[data.index <= end_date]
-
- results = []
- nav = 1.0
-
- for i in range(warmup, len(data)):
- curr_data = data.iloc[:i+1]
- position, state = strategy.generate_signal(curr_data)
-
- if i > warmup:
- daily_return = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
- strategy_return = daily_return * results[-1]['position'] if results else 0
- nav *= (1 + strategy_return)
-
- results.append({
- 'date': data.index[i],
- 'position': position,
- 'nav': nav,
- 'state': state,
- 'close': data['close'].iloc[i]
- })
-
- df = pd.DataFrame(results).set_index('date')
- df['index_nav'] = df['close'] / df['close'].iloc[0]
-
- metrics = calculate_metrics(df['nav'], df['index_nav'])
- return df, metrics
- def calculate_metrics(strategy_nav, index_nav):
- """计算绩效指标"""
- s_returns = strategy_nav.pct_change().dropna()
-
- total_return = strategy_nav.iloc[-1] - 1
- days = len(strategy_nav)
- annual_return = (1 + total_return) ** (252 / days) - 1
-
- index_return = index_nav.iloc[-1] - 1
- index_annual = (1 + index_return) ** (252 / days) - 1
-
- running_max = strategy_nav.expanding().max()
- max_dd = ((strategy_nav - running_max) / running_max).min()
-
- volatility = s_returns.std() * np.sqrt(252)
- sharpe = (annual_return - 0.03) / volatility if volatility > 0 else 0
- calmar = annual_return / abs(max_dd) if max_dd != 0 else 0
- win_rate = (s_returns > 0).mean()
-
- return {
- 'annual_return': annual_return,
- 'index_annual': index_annual,
- 'excess_annual': annual_return - index_annual,
- 'max_drawdown': max_dd,
- 'sharpe': sharpe,
- 'calmar': calmar,
- 'win_rate': win_rate,
- 'total_return': total_return,
- 'index_return': index_return
- }
- def plot_results(results, title, filename):
- """绘制回测图表"""
- fig, axes = plt.subplots(3, 1, figsize=(14, 10))
-
- ax1 = axes[0]
- ax1.plot(results.index, results['nav'], label='Strategy', linewidth=2, color='red')
- ax1.plot(results.index, results['index_nav'], label='Index', linewidth=1, color='gray', alpha=0.7)
- ax1.set_title(f'{title}', fontsize=14)
- ax1.set_ylabel('NAV')
- ax1.legend()
- ax1.grid(True, alpha=0.3)
-
- ax2 = axes[1]
- colors = {'STRONG_UP': 'green', 'UP': 'lightgreen', 'DOWN': 'red',
- 'OSCILLATE': 'yellow', 'TRAILING_STOP': 'orange', 'PROFIT_TAKE': 'blue'}
- pos_colors = [colors.get(s, 'gray') for s in results['state']]
- ax2.fill_between(results.index, 0, results['position'], alpha=0.5, color='green')
- ax2.set_ylabel('Position')
- ax2.set_ylim(0, 1.1)
- ax2.grid(True, alpha=0.3)
-
- ax3 = axes[2]
- running_max = results['nav'].expanding().max()
- drawdown = (results['nav'] - running_max) / running_max
- ax3.fill_between(results.index, drawdown, 0, alpha=0.3, color='red')
- ax3.set_ylabel('Drawdown')
- ax3.set_xlabel('Date')
- ax3.grid(True, alpha=0.3)
-
- plt.tight_layout()
- plt.savefig(filename, dpi=150, bbox_inches='tight')
- print(f" 图表已保存: {filename}")
- # ==================== 主程序 ====================
- def main():
- print("="*70)
- print("创业板50指数高收益策略回测")
- print("="*70)
-
- # 获取数据
- print("\n[1] 获取数据...")
- data = get_real_data()
- if data is None:
- print("使用高仿真模拟数据(基于历史特征)...")
- data = generate_realistic_data()
-
- print(f" 数据区间: {data.index[0].date()} ~ {data.index[-1].date()}")
-
- # 训练阶段
- print("\n[2] 训练阶段 (2018-2023) - 优化参数...")
-
- # 测试多组参数,找最优
- best_params = None
- best_score = -999
-
- test_configs = [
- {'fast_ma': 5, 'slow_ma': 20, 'profit_take': 0.15, 'trailing_stop': 0.08},
- {'fast_ma': 3, 'slow_ma': 15, 'profit_take': 0.12, 'trailing_stop': 0.06},
- {'fast_ma': 10, 'slow_ma': 30, 'profit_take': 0.20, 'trailing_stop': 0.10},
- ]
-
- for cfg in test_configs:
- strategy = HighPerformanceStrategy(cfg)
- results, metrics = backtest(data, strategy, start_date='2018-01-01', end_date='2023-12-31')
-
- # 评分:收益优先
- score = metrics['annual_return'] * 0.5 + metrics['calmar'] * 0.3 + metrics['sharpe'] * 0.2
-
- print(f"\n 参数: {cfg}")
- print(f" 年化: {metrics['annual_return']*100:.1f}%, 回撤: {metrics['max_drawdown']*100:.1f}%, 评分: {score:.2f}")
-
- if score > best_score and metrics['max_drawdown'] > -0.40:
- best_score = score
- best_params = cfg
-
- print(f"\n 最优参数: {best_params}")
-
- # 用最优参数重新回测训练集
- strategy = HighPerformanceStrategy(best_params)
- train_results, train_metrics = backtest(data, strategy, start_date='2018-01-01', end_date='2023-12-31')
-
- print(f"\n 训练集最终表现:")
- print(f" ┌─────────────────────────────────────┐")
- print(f" │ 策略年化收益: {train_metrics['annual_return']*100:>8.2f}% │")
- print(f" │ 指数年化收益: {train_metrics['index_annual']*100:>8.2f}% │")
- print(f" │ 超额收益: {train_metrics['excess_annual']*100:>8.2f}% │")
- print(f" │ 最大回撤: {train_metrics['max_drawdown']*100:>8.2f}% │")
- print(f" │ 夏普比率: {train_metrics['sharpe']:>8.2f} │")
- print(f" │ 卡玛比率: {train_metrics['calmar']:>8.2f} │")
- print(f" │ 胜率: {train_metrics['win_rate']*100:>8.1f}% │")
- print(f" └─────────────────────────────────────┘")
-
- plot_results(train_results, "Training Set (2018-2023)", "train_high_perf.png")
-
- # 验证阶段
- print(f"\n[3] 验证阶段 (2024-2025) - 样本外测试...")
- strategy_val = HighPerformanceStrategy(best_params)
- val_results, val_metrics = backtest(data, strategy_val, start_date='2024-01-01', end_date='2025-12-31')
-
- print(f"\n 验证集最终表现:")
- print(f" ┌─────────────────────────────────────┐")
- print(f" │ 策略年化收益: {val_metrics['annual_return']*100:>8.2f}% │")
- print(f" │ 指数年化收益: {val_metrics['index_annual']*100:>8.2f}% │")
- print(f" │ 超额收益: {val_metrics['excess_annual']*100:>8.2f}% │")
- print(f" │ 最大回撤: {val_metrics['max_drawdown']*100:>8.2f}% │")
- print(f" │ 夏普比率: {val_metrics['sharpe']:>8.2f} │")
- print(f" │ 卡玛比率: {val_metrics['calmar']:>8.2f} │")
- print(f" └─────────────────────────────────────┘")
-
- plot_results(val_results, "Validation Set (2024-2025)", "val_high_perf.png")
-
- # 过拟合检测
- print(f"\n[4] 过拟合检测:")
- return_decay = (train_metrics['annual_return'] - val_metrics['annual_return']) / train_metrics['annual_return'] if train_metrics['annual_return'] != 0 else 0
- print(f" 年化收益衰减: {return_decay*100:.1f}%")
- if return_decay > 0.5:
- print(" ⚠️ 策略在验证集表现下降明显")
- else:
- print(" ✓ 策略稳健性良好")
-
- print("\n" + "="*70)
- print("回测完成!")
- print("="*70)
- if __name__ == "__main__":
- main()
|