#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 全策略真实数据统一回测验证 数据源:cyb50_baostock.csv (真实数据,2017-2025) """ import pandas as pd import numpy as np import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import warnings warnings.filterwarnings('ignore') print("="*80) print("创业板50指数 - 全策略真实数据回测验证") print("="*80) # ==================== 1. 加载真实数据 ==================== def load_real_data(): """加载真实数据 - 唯一数据源""" df = pd.read_csv('cyb50_baostock.csv') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() # 转换数据类型 for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') print("\n【数据验证】") print(f" ✅ 真实数据源: cyb50_baostock.csv") print(f" ✅ 数据区间: {df.index[0].date()} ~ {df.index[-1].date()}") print(f" ✅ 总交易日: {len(df)} 天") print(f" ✅ 价格范围: {df['close'].min():.0f} ~ {df['close'].max():.0f}") # 验证数据完整性 null_count = df.isnull().sum().sum() if null_count > 0: print(f" ⚠️ 空值数量: {null_count}") else: print(f" ✅ 数据完整性: 无空值") # 统计特征 returns = df['close'].pct_change().dropna() print(f"\n【数据统计特征】") print(f" 日收益均值: {returns.mean()*100:.4f}%") print(f" 日收益标准差: {returns.std()*100:.2f}%") print(f" 年化收益: {returns.mean()*252*100:.1f}%") print(f" 年化波动: {returns.std()*np.sqrt(252)*100:.1f}%") print(f" 最大单日涨幅: {returns.max()*100:.2f}%") print(f" 最大单日跌幅: {returns.min()*100:.2f}%") return df # ==================== 2. 回测引擎 ==================== def backtest_engine(data, strategy_func, start_date, end_date, warmup=60, strategy_name="Strategy"): """统一回测引擎""" data = data[(data.index >= start_date) & (data.index <= end_date)].copy() if len(data) == 0: print(f" ❌ {strategy_name}: 无数据") return None, None results = [] nav = 1.0 position = 0 for i in range(warmup, len(data)): curr_data = data.iloc[:i+1] try: target_pos, state = strategy_func(curr_data, position) except Exception as e: print(f" ⚠️ 策略错误: {e}") target_pos, state = 0, "ERROR" # 计算收益 if i > warmup: daily_ret = data['close'].iloc[i] / data['close'].iloc[i-1] - 1 strategy_ret = daily_ret * position nav *= (1 + strategy_ret) results.append({ 'date': data.index[i], 'pos': target_pos, 'nav': nav, 'state': state, 'close': data['close'].iloc[i] }) position = target_pos df = pd.DataFrame(results).set_index('date') df['index_nav'] = df['close'] / df['close'].iloc[0] return df def calc_metrics(nav, index_nav): """计算绩效指标""" s_ret = nav.pct_change().dropna() total = nav.iloc[-1] - 1 days = len(nav) annual = (1 + total) ** (252 / days) - 1 if days > 0 else 0 idx_total = index_nav.iloc[-1] - 1 idx_annual = (1 + idx_total) ** (252 / days) - 1 if days > 0 else 0 running_max = nav.expanding().max() max_dd = ((nav - running_max) / running_max).min() vol = s_ret.std() * np.sqrt(252) sharpe = (annual - 0.03) / vol if vol > 0 else 0 calmar = annual / abs(max_dd) if max_dd != 0 else 0 win_rate = (s_ret > 0).mean() return { 'annual': annual, 'idx_annual': idx_annual, 'excess': annual - idx_annual, 'max_dd': max_dd, 'sharpe': sharpe, 'calmar': calmar, 'win_rate': win_rate, 'total': total, 'idx_total': idx_total, 'volatility': vol, 'days': days } def plot_results(results, title, filename): """绘制结果""" fig, axes = plt.subplots(3, 1, figsize=(14, 10)) axes[0].plot(results.index, results['nav'], 'r-', lw=2, label='Strategy') axes[0].plot(results.index, results['index_nav'], 'gray', lw=1, alpha=0.7, label='CYB50 Index') axes[0].set_title(title, fontsize=14) axes[0].legend() axes[0].grid(True, alpha=0.3) axes[1].fill_between(results.index, 0, results['pos'], alpha=0.5, color='green') axes[1].set_ylim(0, 1.1) axes[1].set_ylabel('Position') axes[1].grid(True, alpha=0.3) running_max = results['nav'].expanding().max() drawdown = (results['nav'] - running_max) / running_max axes[2].fill_between(results.index, drawdown, 0, alpha=0.3, color='red') axes[2].set_ylabel('Drawdown') axes[2].set_xlabel('Date') axes[2].grid(True, alpha=0.3) plt.tight_layout() plt.savefig(filename, dpi=150) return filename # ==================== 3. 策略定义 ==================== # 策略1: 趋势跟踪策略 (来自 cyb50_real_backtest.py) def strategy_trend(data, current_pos): """趋势策略:MA+突破+移动止损""" close = data['close'].values high = data['high'].values low = data['low'].values if len(close) < 60: return 0, "INIT" ma10 = np.mean(close[-10:]) ma30 = np.mean(close[-30:]) ret10 = (close[-1] / close[-10] - 1) if len(close) >= 10 else 0 high_20 = np.max(high[-20:]) low_20 = np.min(low[-20:]) curr = close[-1] # 买入条件 buy_signal = (curr > ma10 > ma30) and (curr >= high_20 * 0.995) and (ret10 > 0.02) # 卖出条件 sell_signal = (curr < ma30) or (curr <= low_20 * 1.005) if buy_signal and current_pos == 0: return 1.0, "ENTRY" elif sell_signal and current_pos > 0: return 0.0, "EXIT" else: return current_pos, "HOLD" if current_pos > 0 else "EMPTY" # 策略2: 双均线策略 (来自 cyb50_simple.py) def strategy_ma_cross(data, current_pos): """双均线交叉策略""" close = data['close'].values if len(close) < 60: return 0, "INIT" ma20 = np.mean(close[-20:]) ma60 = np.mean(close[-60:]) curr = close[-1] if curr > ma20 > ma60: return 1.0, "BULL" elif curr < ma60: return 0.0, "BEAR" else: return current_pos, "HOLD" # 策略3: 动量策略 (来自 cyb50_high_perf.py) def strategy_momentum(data, current_pos): """动量策略:趋势+动量加速""" close = data['close'] if len(close) < 60: return 0, "INIT" ma5 = close.rolling(5).mean().iloc[-1] ma20 = close.rolling(20).mean().iloc[-1] ma60 = close.rolling(60).mean().iloc[-1] momentum = (close.iloc[-1] / close.iloc[-10] - 1) * 100 trend_strong = (close.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60) trend_weak = (close.iloc[-1] < ma5) and (ma5 < ma20) if trend_strong and momentum > 2: return 1.0, "STRONG_UP" elif trend_strong and momentum > 0: return 0.8, "UP" elif trend_weak or momentum < -3: return 0.0, "DOWN" else: return 0.5, "OSCILLATE" # 策略4: 多因子策略 (来自 cyb50_multifactor.py) def strategy_multifactor(data, current_pos): """多因子策略:趋势+动量+波动率+突破""" c = data['close'] h = data['high'] l = data['low'] if len(c) < 60: return 0, "INIT" # 趋势因子 ma5 = c.rolling(5).mean() ma20 = c.rolling(20).mean() ma60 = c.rolling(60).mean() trend_score = 0 if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1 if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1 if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1 trend_score = trend_score / 3 # 动量因子 ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0 mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1) # 波动率因子 atr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1) atr_mean = atr.rolling(20).mean().iloc[-1] vol_pct = atr_mean / c.iloc[-1] vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1) # 突破因子 high_20 = h.rolling(20).max() breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0 # 综合得分 total_score = trend_score * 0.35 + mom_score * 0.25 + vol_score * 0.25 + breakout * 0.15 if total_score > 0.7: return 1.0, "STRONG" elif total_score > 0.5: return 0.6, "MEDIUM" elif total_score > 0.3: return 0.3, "WEAK" else: return 0.0, "EMPTY" # 策略5: RSI策略 def strategy_rsi(data, current_pos): """RSI策略""" close = data['close'] if len(close) < 20: return 0, "INIT" delta = close.diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) curr_rsi = rsi.iloc[-1] if pd.isna(curr_rsi): return 0, "INIT" if curr_rsi < 30: return 1.0, "OVERSOLD" elif curr_rsi > 70: return 0.0, "OVERBOUGHT" else: return current_pos, "HOLD" # ==================== 4. 主程序 ==================== def main(): # 加载真实数据 data = load_real_data() # 定义回测区间 train_start, train_end = '2018-01-01', '2023-12-31' val_start, val_end = '2024-01-01', '2025-12-31' strategies = [ ("趋势跟踪策略", strategy_trend), ("双均线策略", strategy_ma_cross), ("动量策略", strategy_momentum), ("多因子策略", strategy_multifactor), ("RSI策略", strategy_rsi), ] all_results = [] print("\n" + "="*80) print("开始回测 - 全部使用真实数据") print("="*80) for name, strategy_func in strategies: print(f"\n【{name}】") # 训练集 train_res = backtest_engine(data, strategy_func, train_start, train_end, strategy_name=name) if train_res is None: continue train_m = calc_metrics(train_res['nav'], train_res['index_nav']) # 验证集 val_res = backtest_engine(data, strategy_func, val_start, val_end, strategy_name=name) val_m = calc_metrics(val_res['nav'], val_res['index_nav']) # 打印结果 print(f" 训练集 (2018-2023):") print(f" 年化收益: {train_m['annual']*100:7.2f}% | 指数: {train_m['idx_annual']*100:7.2f}% | 超额: {train_m['excess']*100:7.2f}%") print(f" 最大回撤: {train_m['max_dd']*100:7.2f}% | 夏普: {train_m['sharpe']:5.2f} | 胜率: {train_m['win_rate']*100:5.1f}%") print(f" 验证集 (2024-2025):") print(f" 年化收益: {val_m['annual']*100:7.2f}% | 指数: {val_m['idx_annual']*100:7.2f}% | 超额: {val_m['excess']*100:7.2f}%") print(f" 最大回撤: {val_m['max_dd']*100:7.2f}% | 夏普: {val_m['sharpe']:5.2f}") # 过拟合检测 decay = (train_m['annual'] - val_m['annual']) / train_m['annual'] * 100 if train_m['annual'] != 0 else 0 status = "✅" if decay < 50 else "⚠️" print(f" 衰减率: {decay:.1f}% {status}") # 保存图表 plot_results(train_res, f"{name} - Training", f"train_{name.replace(' ', '_')}.png") plot_results(val_res, f"{name} - Validation", f"val_{name.replace(' ', '_')}.png") all_results.append({ 'name': name, 'train': train_m, 'val': val_m, 'decay': decay }) # 汇总对比 print("\n" + "="*80) print("策略对比汇总(真实数据)") print("="*80) print(f"{'策略':<12} {'训练年化':>10} {'验证年化':>10} {'训练回撤':>10} {'验证回撤':>10} {'衰减':>8} {'评价':>6}") print("-"*80) for r in all_results: t, v = r['train'], r['val'] eval_status = "✅" if t['annual'] > 0.1 and v['annual'] > 0 and r['decay'] < 50 else "⚠️" if v['annual'] > 0 else "❌" print(f"{r['name']:<12} {t['annual']*100:>9.1f}% {v['annual']*100:>9.1f}% {t['max_dd']*100:>9.1f}% {v['max_dd']*100:>9.1f}% {r['decay']:>7.0f}% {eval_status:>6}") # 找出最佳策略 best = max(all_results, key=lambda x: x['val']['annual'] if x['val']['annual'] > 0 else -999) print(f"\n🏆 验证集表现最佳: {best['name']}") print(f" 验证集年化: {best['val']['annual']*100:.2f}%") print(f" 超额收益: {best['val']['excess']*100:.2f}%") print("\n" + "="*80) print("✅ 所有策略已使用真实数据验证完成") print("="*80) if __name__ == "__main__": main()