| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50指数 - 全策略真实数据统一回测验证
- 数据源:cyb50_baostock.csv (真实数据,2017-2025)
- """
- import pandas as pd
- import numpy as np
- import matplotlib
- matplotlib.use('Agg')
- import matplotlib.pyplot as plt
- import warnings
- warnings.filterwarnings('ignore')
- print("="*80)
- print("创业板50指数 - 全策略真实数据回测验证")
- print("="*80)
- # ==================== 1. 加载真实数据 ====================
- def load_real_data():
- """加载真实数据 - 唯一数据源"""
- df = pd.read_csv('cyb50_baostock.csv')
- df['date'] = pd.to_datetime(df['date'])
- df = df.set_index('date').sort_index()
-
- # 转换数据类型
- for col in ['open', 'high', 'low', 'close', 'volume']:
- df[col] = pd.to_numeric(df[col], errors='coerce')
-
- print("\n【数据验证】")
- print(f" ✅ 真实数据源: cyb50_baostock.csv")
- print(f" ✅ 数据区间: {df.index[0].date()} ~ {df.index[-1].date()}")
- print(f" ✅ 总交易日: {len(df)} 天")
- print(f" ✅ 价格范围: {df['close'].min():.0f} ~ {df['close'].max():.0f}")
-
- # 验证数据完整性
- null_count = df.isnull().sum().sum()
- if null_count > 0:
- print(f" ⚠️ 空值数量: {null_count}")
- else:
- print(f" ✅ 数据完整性: 无空值")
-
- # 统计特征
- returns = df['close'].pct_change().dropna()
- print(f"\n【数据统计特征】")
- print(f" 日收益均值: {returns.mean()*100:.4f}%")
- print(f" 日收益标准差: {returns.std()*100:.2f}%")
- print(f" 年化收益: {returns.mean()*252*100:.1f}%")
- print(f" 年化波动: {returns.std()*np.sqrt(252)*100:.1f}%")
- print(f" 最大单日涨幅: {returns.max()*100:.2f}%")
- print(f" 最大单日跌幅: {returns.min()*100:.2f}%")
-
- return df
- # ==================== 2. 回测引擎 ====================
- def backtest_engine(data, strategy_func, start_date, end_date, warmup=60, strategy_name="Strategy"):
- """统一回测引擎"""
- data = data[(data.index >= start_date) & (data.index <= end_date)].copy()
-
- if len(data) == 0:
- print(f" ❌ {strategy_name}: 无数据")
- return None, None
-
- results = []
- nav = 1.0
- position = 0
-
- for i in range(warmup, len(data)):
- curr_data = data.iloc[:i+1]
-
- try:
- target_pos, state = strategy_func(curr_data, position)
- except Exception as e:
- print(f" ⚠️ 策略错误: {e}")
- target_pos, state = 0, "ERROR"
-
- # 计算收益
- if i > warmup:
- daily_ret = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
- strategy_ret = daily_ret * position
- nav *= (1 + strategy_ret)
-
- results.append({
- 'date': data.index[i],
- 'pos': target_pos,
- 'nav': nav,
- 'state': state,
- 'close': data['close'].iloc[i]
- })
-
- position = target_pos
-
- df = pd.DataFrame(results).set_index('date')
- df['index_nav'] = df['close'] / df['close'].iloc[0]
- return df
- def calc_metrics(nav, index_nav):
- """计算绩效指标"""
- s_ret = nav.pct_change().dropna()
-
- total = nav.iloc[-1] - 1
- days = len(nav)
- annual = (1 + total) ** (252 / days) - 1 if days > 0 else 0
-
- idx_total = index_nav.iloc[-1] - 1
- idx_annual = (1 + idx_total) ** (252 / days) - 1 if days > 0 else 0
-
- running_max = nav.expanding().max()
- max_dd = ((nav - running_max) / running_max).min()
-
- vol = s_ret.std() * np.sqrt(252)
- sharpe = (annual - 0.03) / vol if vol > 0 else 0
- calmar = annual / abs(max_dd) if max_dd != 0 else 0
- win_rate = (s_ret > 0).mean()
-
- return {
- 'annual': annual, 'idx_annual': idx_annual,
- 'excess': annual - idx_annual, 'max_dd': max_dd,
- 'sharpe': sharpe, 'calmar': calmar,
- 'win_rate': win_rate, 'total': total, 'idx_total': idx_total,
- 'volatility': vol, 'days': days
- }
- def plot_results(results, title, filename):
- """绘制结果"""
- fig, axes = plt.subplots(3, 1, figsize=(14, 10))
-
- axes[0].plot(results.index, results['nav'], 'r-', lw=2, label='Strategy')
- axes[0].plot(results.index, results['index_nav'], 'gray', lw=1, alpha=0.7, label='CYB50 Index')
- axes[0].set_title(title, fontsize=14)
- axes[0].legend()
- axes[0].grid(True, alpha=0.3)
-
- axes[1].fill_between(results.index, 0, results['pos'], alpha=0.5, color='green')
- axes[1].set_ylim(0, 1.1)
- axes[1].set_ylabel('Position')
- axes[1].grid(True, alpha=0.3)
-
- running_max = results['nav'].expanding().max()
- drawdown = (results['nav'] - running_max) / running_max
- axes[2].fill_between(results.index, drawdown, 0, alpha=0.3, color='red')
- axes[2].set_ylabel('Drawdown')
- axes[2].set_xlabel('Date')
- axes[2].grid(True, alpha=0.3)
-
- plt.tight_layout()
- plt.savefig(filename, dpi=150)
- return filename
- # ==================== 3. 策略定义 ====================
- # 策略1: 趋势跟踪策略 (来自 cyb50_real_backtest.py)
- def strategy_trend(data, current_pos):
- """趋势策略:MA+突破+移动止损"""
- close = data['close'].values
- high = data['high'].values
- low = data['low'].values
-
- if len(close) < 60:
- return 0, "INIT"
-
- ma10 = np.mean(close[-10:])
- ma30 = np.mean(close[-30:])
- ret10 = (close[-1] / close[-10] - 1) if len(close) >= 10 else 0
- high_20 = np.max(high[-20:])
- low_20 = np.min(low[-20:])
- curr = close[-1]
-
- # 买入条件
- buy_signal = (curr > ma10 > ma30) and (curr >= high_20 * 0.995) and (ret10 > 0.02)
- # 卖出条件
- sell_signal = (curr < ma30) or (curr <= low_20 * 1.005)
-
- if buy_signal and current_pos == 0:
- return 1.0, "ENTRY"
- elif sell_signal and current_pos > 0:
- return 0.0, "EXIT"
- else:
- return current_pos, "HOLD" if current_pos > 0 else "EMPTY"
- # 策略2: 双均线策略 (来自 cyb50_simple.py)
- def strategy_ma_cross(data, current_pos):
- """双均线交叉策略"""
- close = data['close'].values
- if len(close) < 60:
- return 0, "INIT"
-
- ma20 = np.mean(close[-20:])
- ma60 = np.mean(close[-60:])
- curr = close[-1]
-
- if curr > ma20 > ma60:
- return 1.0, "BULL"
- elif curr < ma60:
- return 0.0, "BEAR"
- else:
- return current_pos, "HOLD"
- # 策略3: 动量策略 (来自 cyb50_high_perf.py)
- def strategy_momentum(data, current_pos):
- """动量策略:趋势+动量加速"""
- close = data['close']
- if len(close) < 60:
- return 0, "INIT"
-
- ma5 = close.rolling(5).mean().iloc[-1]
- ma20 = close.rolling(20).mean().iloc[-1]
- ma60 = close.rolling(60).mean().iloc[-1]
-
- momentum = (close.iloc[-1] / close.iloc[-10] - 1) * 100
-
- trend_strong = (close.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60)
- trend_weak = (close.iloc[-1] < ma5) and (ma5 < ma20)
-
- if trend_strong and momentum > 2:
- return 1.0, "STRONG_UP"
- elif trend_strong and momentum > 0:
- return 0.8, "UP"
- elif trend_weak or momentum < -3:
- return 0.0, "DOWN"
- else:
- return 0.5, "OSCILLATE"
- # 策略4: 多因子策略 (来自 cyb50_multifactor.py)
- def strategy_multifactor(data, current_pos):
- """多因子策略:趋势+动量+波动率+突破"""
- c = data['close']
- h = data['high']
- l = data['low']
-
- if len(c) < 60:
- return 0, "INIT"
-
- # 趋势因子
- ma5 = c.rolling(5).mean()
- ma20 = c.rolling(20).mean()
- ma60 = c.rolling(60).mean()
-
- trend_score = 0
- if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1
- if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1
- if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1
- trend_score = trend_score / 3
-
- # 动量因子
- ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0
- mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1)
-
- # 波动率因子
- atr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1)
- atr_mean = atr.rolling(20).mean().iloc[-1]
- vol_pct = atr_mean / c.iloc[-1]
- vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1)
-
- # 突破因子
- high_20 = h.rolling(20).max()
- breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0
-
- # 综合得分
- total_score = trend_score * 0.35 + mom_score * 0.25 + vol_score * 0.25 + breakout * 0.15
-
- if total_score > 0.7:
- return 1.0, "STRONG"
- elif total_score > 0.5:
- return 0.6, "MEDIUM"
- elif total_score > 0.3:
- return 0.3, "WEAK"
- else:
- return 0.0, "EMPTY"
- # 策略5: RSI策略
- def strategy_rsi(data, current_pos):
- """RSI策略"""
- close = data['close']
- if len(close) < 20:
- return 0, "INIT"
-
- delta = close.diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
- rs = gain / loss
- rsi = 100 - (100 / (1 + rs))
-
- curr_rsi = rsi.iloc[-1]
-
- if pd.isna(curr_rsi):
- return 0, "INIT"
-
- if curr_rsi < 30:
- return 1.0, "OVERSOLD"
- elif curr_rsi > 70:
- return 0.0, "OVERBOUGHT"
- else:
- return current_pos, "HOLD"
- # ==================== 4. 主程序 ====================
- def main():
- # 加载真实数据
- data = load_real_data()
-
- # 定义回测区间
- train_start, train_end = '2018-01-01', '2023-12-31'
- val_start, val_end = '2024-01-01', '2025-12-31'
-
- strategies = [
- ("趋势跟踪策略", strategy_trend),
- ("双均线策略", strategy_ma_cross),
- ("动量策略", strategy_momentum),
- ("多因子策略", strategy_multifactor),
- ("RSI策略", strategy_rsi),
- ]
-
- all_results = []
-
- print("\n" + "="*80)
- print("开始回测 - 全部使用真实数据")
- print("="*80)
-
- for name, strategy_func in strategies:
- print(f"\n【{name}】")
-
- # 训练集
- train_res = backtest_engine(data, strategy_func, train_start, train_end, strategy_name=name)
- if train_res is None:
- continue
- train_m = calc_metrics(train_res['nav'], train_res['index_nav'])
-
- # 验证集
- val_res = backtest_engine(data, strategy_func, val_start, val_end, strategy_name=name)
- val_m = calc_metrics(val_res['nav'], val_res['index_nav'])
-
- # 打印结果
- print(f" 训练集 (2018-2023):")
- print(f" 年化收益: {train_m['annual']*100:7.2f}% | 指数: {train_m['idx_annual']*100:7.2f}% | 超额: {train_m['excess']*100:7.2f}%")
- print(f" 最大回撤: {train_m['max_dd']*100:7.2f}% | 夏普: {train_m['sharpe']:5.2f} | 胜率: {train_m['win_rate']*100:5.1f}%")
-
- print(f" 验证集 (2024-2025):")
- print(f" 年化收益: {val_m['annual']*100:7.2f}% | 指数: {val_m['idx_annual']*100:7.2f}% | 超额: {val_m['excess']*100:7.2f}%")
- print(f" 最大回撤: {val_m['max_dd']*100:7.2f}% | 夏普: {val_m['sharpe']:5.2f}")
-
- # 过拟合检测
- decay = (train_m['annual'] - val_m['annual']) / train_m['annual'] * 100 if train_m['annual'] != 0 else 0
- status = "✅" if decay < 50 else "⚠️"
- print(f" 衰减率: {decay:.1f}% {status}")
-
- # 保存图表
- plot_results(train_res, f"{name} - Training", f"train_{name.replace(' ', '_')}.png")
- plot_results(val_res, f"{name} - Validation", f"val_{name.replace(' ', '_')}.png")
-
- all_results.append({
- 'name': name,
- 'train': train_m,
- 'val': val_m,
- 'decay': decay
- })
-
- # 汇总对比
- print("\n" + "="*80)
- print("策略对比汇总(真实数据)")
- print("="*80)
- print(f"{'策略':<12} {'训练年化':>10} {'验证年化':>10} {'训练回撤':>10} {'验证回撤':>10} {'衰减':>8} {'评价':>6}")
- print("-"*80)
-
- for r in all_results:
- t, v = r['train'], r['val']
- eval_status = "✅" if t['annual'] > 0.1 and v['annual'] > 0 and r['decay'] < 50 else "⚠️" if v['annual'] > 0 else "❌"
- print(f"{r['name']:<12} {t['annual']*100:>9.1f}% {v['annual']*100:>9.1f}% {t['max_dd']*100:>9.1f}% {v['max_dd']*100:>9.1f}% {r['decay']:>7.0f}% {eval_status:>6}")
-
- # 找出最佳策略
- best = max(all_results, key=lambda x: x['val']['annual'] if x['val']['annual'] > 0 else -999)
- print(f"\n🏆 验证集表现最佳: {best['name']}")
- print(f" 验证集年化: {best['val']['annual']*100:.2f}%")
- print(f" 超额收益: {best['val']['excess']*100:.2f}%")
-
- print("\n" + "="*80)
- print("✅ 所有策略已使用真实数据验证完成")
- print("="*80)
- if __name__ == "__main__":
- main()
|