#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 创业板50指数 - 全策略年度收益对比(真实数据) 对比策略:趋势跟踪、双均线、动量、多因子、RSI """ import pandas as pd import numpy as np import warnings warnings.filterwarnings('ignore') print("="*90) print("创业板50指数 - 全策略年度收益对比(真实数据)") print("="*90) # ==================== 1. 加载真实数据 ==================== def load_real_data(): df = pd.read_csv('cyb50_baostock.csv') df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() for col in ['open', 'high', 'low', 'close', 'volume']: df[col] = pd.to_numeric(df[col], errors='coerce') return df # ==================== 2. 策略定义 ==================== # 策略1: 趋势跟踪 def strategy_trend(data, pos): close = data['close'].values high = data['high'].values low = data['low'].values if len(close) < 60: return 0, "INIT" ma10 = np.mean(close[-10:]) ma30 = np.mean(close[-30:]) ret10 = (close[-1] / close[-10] - 1) if len(close) >= 10 else 0 high_20 = np.max(high[-20:]) low_20 = np.min(low[-20:]) curr = close[-1] buy = (curr > ma10 > ma30) and (curr >= high_20 * 0.995) and (ret10 > 0.02) sell = (curr < ma30) or (curr <= low_20 * 1.005) if buy and pos == 0: return 1.0, "ENTRY" elif sell and pos > 0: return 0.0, "EXIT" return pos, "HOLD" if pos > 0 else "EMPTY" # 策略2: 双均线 def strategy_ma_cross(data, pos): close = data['close'].values if len(close) < 60: return 0, "INIT" ma20 = np.mean(close[-20:]) ma60 = np.mean(close[-60:]) curr = close[-1] if curr > ma20 > ma60: return 1.0, "BULL" elif curr < ma60: return 0.0, "BEAR" return pos, "HOLD" # 策略3: 动量 def strategy_momentum(data, pos): close = data['close'] if len(close) < 60: return 0, "INIT" ma5 = close.rolling(5).mean().iloc[-1] ma20 = close.rolling(20).mean().iloc[-1] ma60 = close.rolling(60).mean().iloc[-1] momentum = (close.iloc[-1] / close.iloc[-10] - 1) * 100 trend_strong = (close.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60) trend_weak = (close.iloc[-1] < ma5) and (ma5 < ma20) if trend_strong and momentum > 2: return 1.0, "STRONG_UP" elif trend_strong and momentum > 0: return 0.8, "UP" elif trend_weak or momentum < -3: return 0.0, "DOWN" return 0.5, "OSCILLATE" # 策略4: 多因子 def strategy_multifactor(data, pos): c = data['close'] h = data['high'] l = data['low'] if len(c) < 60: return 0, "INIT" # 趋势因子 ma5 = c.rolling(5).mean() ma20 = c.rolling(20).mean() ma60 = c.rolling(60).mean() trend_score = 0 if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1 if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1 if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1 trend_score = trend_score / 3 # 动量因子 ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0 mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1) # 波动率因子 atr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1) atr_mean = atr.rolling(20).mean().iloc[-1] vol_pct = atr_mean / c.iloc[-1] vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1) # 突破因子 high_20 = h.rolling(20).max() breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0 # 综合得分 total_score = trend_score * 0.35 + mom_score * 0.25 + vol_score * 0.25 + breakout * 0.15 if total_score > 0.7: return 1.0, "STRONG" elif total_score > 0.5: return 0.6, "MEDIUM" elif total_score > 0.3: return 0.3, "WEAK" return 0.0, "EMPTY" # 策略5: RSI def strategy_rsi(data, pos): close = data['close'] if len(close) < 20: return 0, "INIT" delta = close.diff() gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() rs = gain / loss rsi = 100 - (100 / (1 + rs)) curr_rsi = rsi.iloc[-1] if pd.isna(curr_rsi): return 0, "INIT" if curr_rsi < 30: return 1.0, "OVERSOLD" elif curr_rsi > 70: return 0.0, "OVERBOUGHT" return pos, "HOLD" # ==================== 3. 回测引擎 ==================== def backtest_yearly(data, strategy_func, year, warmup=60): """年度回测""" year_data = data[data.index.year == year].copy() if len(year_data) < warmup + 5: return None nav = 1.0 position = 0 for i in range(warmup, len(year_data)): curr_data = data.loc[:year_data.index[i]] new_pos, state = strategy_func(curr_data, position) if i > warmup: daily_ret = year_data['close'].iloc[i] / year_data['close'].iloc[i-1] - 1 nav *= (1 + daily_ret * position) position = new_pos if new_pos is not None else position return (nav - 1) * 100 # 返回百分比收益 # ==================== 4. 主程序 ==================== def main(): # 加载数据 data = load_real_data() strategies = [ ("趋势跟踪", strategy_trend), ("双均线", strategy_ma_cross), ("动量", strategy_momentum), ("多因子", strategy_multifactor), ("RSI", strategy_rsi), ] years = list(range(2018, 2026)) # 收集结果 results = {name: {} for name, _ in strategies} results["买入持有"] = {} print("\n开始年度回测...") print("-"*90) for year in years: year_data = data[data.index.year == year] if len(year_data) == 0: continue # 买入持有收益 start_price = year_data['close'].iloc[60] # 考虑warmup end_price = year_data['close'].iloc[-1] buyhold_ret = (end_price / start_price - 1) * 100 results["买入持有"][year] = buyhold_ret # 各策略收益 for name, strategy_func in strategies: ret = backtest_yearly(data, strategy_func, year) results[name][year] = ret if ret is not None else 0 # 打印结果 print("\n" + "="*90) print("年度收益对比表 (%)") print("="*90) # 表头 header = f"{'策略':<10}" for year in years: header += f" | {year:>8}" header += f" | {'平均':>8} | {'跑赢年数':>8}" print(header) print("-"*90) # 各策略 all_strategies = list(results.keys()) for name in all_strategies: row = f"{name:<10}" returns = [] win_count = 0 for year in years: if year in results[name]: ret = results[name][year] returns.append(ret) row += f" | {ret:>+7.1f}" # 判断是否跑赢指数 if name != "买入持有" and year in results["买入持有"]: if ret > results["买入持有"][year]: win_count += 1 else: row += f" | {'--':>8}" avg_ret = np.mean(returns) if returns else 0 row += f" | {avg_ret:>+7.1f}" if name != "买入持有": row += f" | {win_count:>6}/8" else: row += f" | {'--':>8}" print(row) print("-"*90) # 超额收益统计 print("\n" + "="*90) print("超额收益对比 (策略 - 买入持有)") print("="*90) header = f"{'策略':<10}" for year in years: header += f" | {year:>8}" header += f" | {'平均超额':>8}" print(header) print("-"*90) for name in all_strategies: if name == "买入持有": continue row = f"{name:<10}" excess_returns = [] for year in years: if year in results[name] and year in results["买入持有"]: excess = results[name][year] - results["买入持有"][year] excess_returns.append(excess) marker = "⭐" if excess > 10 else "✓" if excess > 0 else "" row += f" | {excess:>+7.1f}{marker}" else: row += f" | {'--':>8}" avg_excess = np.mean(excess_returns) if excess_returns else 0 row += f" | {avg_excess:>+7.1f}" print(row) print("-"*90) # 汇总评价 print("\n" + "="*90) print("策略评价") print("="*90) for name in all_strategies: if name == "买入持有": continue returns = [results[name][y] for y in years if y in results[name]] excess_list = [results[name][y] - results["买入持有"][y] for y in years if y in results[name] and y in results["买入持有"]] win_years = sum(1 for e in excess_list if e > 0) avg_ret = np.mean(returns) avg_excess = np.mean(excess_list) print(f"\n【{name}】") print(f" 年均收益: {avg_ret:+.1f}%") print(f" 年均超额: {avg_excess:+.1f}%") print(f" 跑赢年数: {win_years}/8 ({win_years/8*100:.0f}%)") if avg_excess > 10: print(f" 评价: 优秀 ⭐⭐⭐") elif avg_excess > 5: print(f" 评价: 良好 ⭐⭐") elif avg_excess > 0: print(f" 评价: 一般 ⭐") else: print(f" 评价: 不佳") print("\n" + "="*90) if __name__ == "__main__": main()