|
@@ -0,0 +1,331 @@
|
|
|
|
|
+#!/usr/bin/env python3
|
|
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
|
|
+"""
|
|
|
|
|
+创业板50指数 - 全策略年度收益对比(真实数据)
|
|
|
|
|
+对比策略:趋势跟踪、双均线、动量、多因子、RSI
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import pandas as pd
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+import warnings
|
|
|
|
|
+warnings.filterwarnings('ignore')
|
|
|
|
|
+
|
|
|
|
|
+print("="*90)
|
|
|
|
|
+print("创业板50指数 - 全策略年度收益对比(真实数据)")
|
|
|
|
|
+print("="*90)
|
|
|
|
|
+
|
|
|
|
|
+# ==================== 1. 加载真实数据 ====================
|
|
|
|
|
+def load_real_data():
|
|
|
|
|
+ df = pd.read_csv('cyb50_baostock.csv')
|
|
|
|
|
+ df['date'] = pd.to_datetime(df['date'])
|
|
|
|
|
+ df = df.set_index('date').sort_index()
|
|
|
|
|
+ for col in ['open', 'high', 'low', 'close', 'volume']:
|
|
|
|
|
+ df[col] = pd.to_numeric(df[col], errors='coerce')
|
|
|
|
|
+ return df
|
|
|
|
|
+
|
|
|
|
|
+# ==================== 2. 策略定义 ====================
|
|
|
|
|
+
|
|
|
|
|
+# 策略1: 趋势跟踪
|
|
|
|
|
+def strategy_trend(data, pos):
|
|
|
|
|
+ close = data['close'].values
|
|
|
|
|
+ high = data['high'].values
|
|
|
|
|
+ low = data['low'].values
|
|
|
|
|
+ if len(close) < 60:
|
|
|
|
|
+ return 0, "INIT"
|
|
|
|
|
+
|
|
|
|
|
+ ma10 = np.mean(close[-10:])
|
|
|
|
|
+ ma30 = np.mean(close[-30:])
|
|
|
|
|
+ ret10 = (close[-1] / close[-10] - 1) if len(close) >= 10 else 0
|
|
|
|
|
+ high_20 = np.max(high[-20:])
|
|
|
|
|
+ low_20 = np.min(low[-20:])
|
|
|
|
|
+ curr = close[-1]
|
|
|
|
|
+
|
|
|
|
|
+ buy = (curr > ma10 > ma30) and (curr >= high_20 * 0.995) and (ret10 > 0.02)
|
|
|
|
|
+ sell = (curr < ma30) or (curr <= low_20 * 1.005)
|
|
|
|
|
+
|
|
|
|
|
+ if buy and pos == 0:
|
|
|
|
|
+ return 1.0, "ENTRY"
|
|
|
|
|
+ elif sell and pos > 0:
|
|
|
|
|
+ return 0.0, "EXIT"
|
|
|
|
|
+ return pos, "HOLD" if pos > 0 else "EMPTY"
|
|
|
|
|
+
|
|
|
|
|
+# 策略2: 双均线
|
|
|
|
|
+def strategy_ma_cross(data, pos):
|
|
|
|
|
+ close = data['close'].values
|
|
|
|
|
+ if len(close) < 60:
|
|
|
|
|
+ return 0, "INIT"
|
|
|
|
|
+
|
|
|
|
|
+ ma20 = np.mean(close[-20:])
|
|
|
|
|
+ ma60 = np.mean(close[-60:])
|
|
|
|
|
+ curr = close[-1]
|
|
|
|
|
+
|
|
|
|
|
+ if curr > ma20 > ma60:
|
|
|
|
|
+ return 1.0, "BULL"
|
|
|
|
|
+ elif curr < ma60:
|
|
|
|
|
+ return 0.0, "BEAR"
|
|
|
|
|
+ return pos, "HOLD"
|
|
|
|
|
+
|
|
|
|
|
+# 策略3: 动量
|
|
|
|
|
+def strategy_momentum(data, pos):
|
|
|
|
|
+ close = data['close']
|
|
|
|
|
+ if len(close) < 60:
|
|
|
|
|
+ return 0, "INIT"
|
|
|
|
|
+
|
|
|
|
|
+ ma5 = close.rolling(5).mean().iloc[-1]
|
|
|
|
|
+ ma20 = close.rolling(20).mean().iloc[-1]
|
|
|
|
|
+ ma60 = close.rolling(60).mean().iloc[-1]
|
|
|
|
|
+
|
|
|
|
|
+ momentum = (close.iloc[-1] / close.iloc[-10] - 1) * 100
|
|
|
|
|
+
|
|
|
|
|
+ trend_strong = (close.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60)
|
|
|
|
|
+ trend_weak = (close.iloc[-1] < ma5) and (ma5 < ma20)
|
|
|
|
|
+
|
|
|
|
|
+ if trend_strong and momentum > 2:
|
|
|
|
|
+ return 1.0, "STRONG_UP"
|
|
|
|
|
+ elif trend_strong and momentum > 0:
|
|
|
|
|
+ return 0.8, "UP"
|
|
|
|
|
+ elif trend_weak or momentum < -3:
|
|
|
|
|
+ return 0.0, "DOWN"
|
|
|
|
|
+ return 0.5, "OSCILLATE"
|
|
|
|
|
+
|
|
|
|
|
+# 策略4: 多因子
|
|
|
|
|
+def strategy_multifactor(data, pos):
|
|
|
|
|
+ c = data['close']
|
|
|
|
|
+ h = data['high']
|
|
|
|
|
+ l = data['low']
|
|
|
|
|
+
|
|
|
|
|
+ if len(c) < 60:
|
|
|
|
|
+ return 0, "INIT"
|
|
|
|
|
+
|
|
|
|
|
+ # 趋势因子
|
|
|
|
|
+ ma5 = c.rolling(5).mean()
|
|
|
|
|
+ ma20 = c.rolling(20).mean()
|
|
|
|
|
+ ma60 = c.rolling(60).mean()
|
|
|
|
|
+
|
|
|
|
|
+ trend_score = 0
|
|
|
|
|
+ if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1
|
|
|
|
|
+ if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1
|
|
|
|
|
+ if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1
|
|
|
|
|
+ trend_score = trend_score / 3
|
|
|
|
|
+
|
|
|
|
|
+ # 动量因子
|
|
|
|
|
+ ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0
|
|
|
|
|
+ mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1)
|
|
|
|
|
+
|
|
|
|
|
+ # 波动率因子
|
|
|
|
|
+ atr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1)
|
|
|
|
|
+ atr_mean = atr.rolling(20).mean().iloc[-1]
|
|
|
|
|
+ vol_pct = atr_mean / c.iloc[-1]
|
|
|
|
|
+ vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1)
|
|
|
|
|
+
|
|
|
|
|
+ # 突破因子
|
|
|
|
|
+ high_20 = h.rolling(20).max()
|
|
|
|
|
+ breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0
|
|
|
|
|
+
|
|
|
|
|
+ # 综合得分
|
|
|
|
|
+ total_score = trend_score * 0.35 + mom_score * 0.25 + vol_score * 0.25 + breakout * 0.15
|
|
|
|
|
+
|
|
|
|
|
+ if total_score > 0.7:
|
|
|
|
|
+ return 1.0, "STRONG"
|
|
|
|
|
+ elif total_score > 0.5:
|
|
|
|
|
+ return 0.6, "MEDIUM"
|
|
|
|
|
+ elif total_score > 0.3:
|
|
|
|
|
+ return 0.3, "WEAK"
|
|
|
|
|
+ return 0.0, "EMPTY"
|
|
|
|
|
+
|
|
|
|
|
+# 策略5: RSI
|
|
|
|
|
+def strategy_rsi(data, pos):
|
|
|
|
|
+ close = data['close']
|
|
|
|
|
+ if len(close) < 20:
|
|
|
|
|
+ return 0, "INIT"
|
|
|
|
|
+
|
|
|
|
|
+ delta = close.diff()
|
|
|
|
|
+ gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
|
|
|
|
|
+ loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
|
|
|
|
|
+ rs = gain / loss
|
|
|
|
|
+ rsi = 100 - (100 / (1 + rs))
|
|
|
|
|
+
|
|
|
|
|
+ curr_rsi = rsi.iloc[-1]
|
|
|
|
|
+
|
|
|
|
|
+ if pd.isna(curr_rsi):
|
|
|
|
|
+ return 0, "INIT"
|
|
|
|
|
+
|
|
|
|
|
+ if curr_rsi < 30:
|
|
|
|
|
+ return 1.0, "OVERSOLD"
|
|
|
|
|
+ elif curr_rsi > 70:
|
|
|
|
|
+ return 0.0, "OVERBOUGHT"
|
|
|
|
|
+ return pos, "HOLD"
|
|
|
|
|
+
|
|
|
|
|
+# ==================== 3. 回测引擎 ====================
|
|
|
|
|
+def backtest_yearly(data, strategy_func, year, warmup=60):
|
|
|
|
|
+ """年度回测"""
|
|
|
|
|
+ year_data = data[data.index.year == year].copy()
|
|
|
|
|
+ if len(year_data) < warmup + 5:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ nav = 1.0
|
|
|
|
|
+ position = 0
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(warmup, len(year_data)):
|
|
|
|
|
+ curr_data = data.loc[:year_data.index[i]]
|
|
|
|
|
+ new_pos, state = strategy_func(curr_data, position)
|
|
|
|
|
+
|
|
|
|
|
+ if i > warmup:
|
|
|
|
|
+ daily_ret = year_data['close'].iloc[i] / year_data['close'].iloc[i-1] - 1
|
|
|
|
|
+ nav *= (1 + daily_ret * position)
|
|
|
|
|
+
|
|
|
|
|
+ position = new_pos if new_pos is not None else position
|
|
|
|
|
+
|
|
|
|
|
+ return (nav - 1) * 100 # 返回百分比收益
|
|
|
|
|
+
|
|
|
|
|
+# ==================== 4. 主程序 ====================
|
|
|
|
|
+def main():
|
|
|
|
|
+ # 加载数据
|
|
|
|
|
+ data = load_real_data()
|
|
|
|
|
+
|
|
|
|
|
+ strategies = [
|
|
|
|
|
+ ("趋势跟踪", strategy_trend),
|
|
|
|
|
+ ("双均线", strategy_ma_cross),
|
|
|
|
|
+ ("动量", strategy_momentum),
|
|
|
|
|
+ ("多因子", strategy_multifactor),
|
|
|
|
|
+ ("RSI", strategy_rsi),
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ years = list(range(2018, 2026))
|
|
|
|
|
+
|
|
|
|
|
+ # 收集结果
|
|
|
|
|
+ results = {name: {} for name, _ in strategies}
|
|
|
|
|
+ results["买入持有"] = {}
|
|
|
|
|
+
|
|
|
|
|
+ print("\n开始年度回测...")
|
|
|
|
|
+ print("-"*90)
|
|
|
|
|
+
|
|
|
|
|
+ for year in years:
|
|
|
|
|
+ year_data = data[data.index.year == year]
|
|
|
|
|
+ if len(year_data) == 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 买入持有收益
|
|
|
|
|
+ start_price = year_data['close'].iloc[60] # 考虑warmup
|
|
|
|
|
+ end_price = year_data['close'].iloc[-1]
|
|
|
|
|
+ buyhold_ret = (end_price / start_price - 1) * 100
|
|
|
|
|
+ results["买入持有"][year] = buyhold_ret
|
|
|
|
|
+
|
|
|
|
|
+ # 各策略收益
|
|
|
|
|
+ for name, strategy_func in strategies:
|
|
|
|
|
+ ret = backtest_yearly(data, strategy_func, year)
|
|
|
|
|
+ results[name][year] = ret if ret is not None else 0
|
|
|
|
|
+
|
|
|
|
|
+ # 打印结果
|
|
|
|
|
+ print("\n" + "="*90)
|
|
|
|
|
+ print("年度收益对比表 (%)")
|
|
|
|
|
+ print("="*90)
|
|
|
|
|
+
|
|
|
|
|
+ # 表头
|
|
|
|
|
+ header = f"{'策略':<10}"
|
|
|
|
|
+ for year in years:
|
|
|
|
|
+ header += f" | {year:>8}"
|
|
|
|
|
+ header += f" | {'平均':>8} | {'跑赢年数':>8}"
|
|
|
|
|
+ print(header)
|
|
|
|
|
+ print("-"*90)
|
|
|
|
|
+
|
|
|
|
|
+ # 各策略
|
|
|
|
|
+ all_strategies = list(results.keys())
|
|
|
|
|
+ for name in all_strategies:
|
|
|
|
|
+ row = f"{name:<10}"
|
|
|
|
|
+ returns = []
|
|
|
|
|
+ win_count = 0
|
|
|
|
|
+
|
|
|
|
|
+ for year in years:
|
|
|
|
|
+ if year in results[name]:
|
|
|
|
|
+ ret = results[name][year]
|
|
|
|
|
+ returns.append(ret)
|
|
|
|
|
+ row += f" | {ret:>+7.1f}"
|
|
|
|
|
+
|
|
|
|
|
+ # 判断是否跑赢指数
|
|
|
|
|
+ if name != "买入持有" and year in results["买入持有"]:
|
|
|
|
|
+ if ret > results["买入持有"][year]:
|
|
|
|
|
+ win_count += 1
|
|
|
|
|
+ else:
|
|
|
|
|
+ row += f" | {'--':>8}"
|
|
|
|
|
+
|
|
|
|
|
+ avg_ret = np.mean(returns) if returns else 0
|
|
|
|
|
+ row += f" | {avg_ret:>+7.1f}"
|
|
|
|
|
+
|
|
|
|
|
+ if name != "买入持有":
|
|
|
|
|
+ row += f" | {win_count:>6}/8"
|
|
|
|
|
+ else:
|
|
|
|
|
+ row += f" | {'--':>8}"
|
|
|
|
|
+
|
|
|
|
|
+ print(row)
|
|
|
|
|
+
|
|
|
|
|
+ print("-"*90)
|
|
|
|
|
+
|
|
|
|
|
+ # 超额收益统计
|
|
|
|
|
+ print("\n" + "="*90)
|
|
|
|
|
+ print("超额收益对比 (策略 - 买入持有)")
|
|
|
|
|
+ print("="*90)
|
|
|
|
|
+
|
|
|
|
|
+ header = f"{'策略':<10}"
|
|
|
|
|
+ for year in years:
|
|
|
|
|
+ header += f" | {year:>8}"
|
|
|
|
|
+ header += f" | {'平均超额':>8}"
|
|
|
|
|
+ print(header)
|
|
|
|
|
+ print("-"*90)
|
|
|
|
|
+
|
|
|
|
|
+ for name in all_strategies:
|
|
|
|
|
+ if name == "买入持有":
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ row = f"{name:<10}"
|
|
|
|
|
+ excess_returns = []
|
|
|
|
|
+
|
|
|
|
|
+ for year in years:
|
|
|
|
|
+ if year in results[name] and year in results["买入持有"]:
|
|
|
|
|
+ excess = results[name][year] - results["买入持有"][year]
|
|
|
|
|
+ excess_returns.append(excess)
|
|
|
|
|
+ marker = "⭐" if excess > 10 else "✓" if excess > 0 else ""
|
|
|
|
|
+ row += f" | {excess:>+7.1f}{marker}"
|
|
|
|
|
+ else:
|
|
|
|
|
+ row += f" | {'--':>8}"
|
|
|
|
|
+
|
|
|
|
|
+ avg_excess = np.mean(excess_returns) if excess_returns else 0
|
|
|
|
|
+ row += f" | {avg_excess:>+7.1f}"
|
|
|
|
|
+ print(row)
|
|
|
|
|
+
|
|
|
|
|
+ print("-"*90)
|
|
|
|
|
+
|
|
|
|
|
+ # 汇总评价
|
|
|
|
|
+ print("\n" + "="*90)
|
|
|
|
|
+ print("策略评价")
|
|
|
|
|
+ print("="*90)
|
|
|
|
|
+
|
|
|
|
|
+ for name in all_strategies:
|
|
|
|
|
+ if name == "买入持有":
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ returns = [results[name][y] for y in years if y in results[name]]
|
|
|
|
|
+ excess_list = [results[name][y] - results["买入持有"][y] for y in years if y in results[name] and y in results["买入持有"]]
|
|
|
|
|
+
|
|
|
|
|
+ win_years = sum(1 for e in excess_list if e > 0)
|
|
|
|
|
+ avg_ret = np.mean(returns)
|
|
|
|
|
+ avg_excess = np.mean(excess_list)
|
|
|
|
|
+
|
|
|
|
|
+ print(f"\n【{name}】")
|
|
|
|
|
+ print(f" 年均收益: {avg_ret:+.1f}%")
|
|
|
|
|
+ print(f" 年均超额: {avg_excess:+.1f}%")
|
|
|
|
|
+ print(f" 跑赢年数: {win_years}/8 ({win_years/8*100:.0f}%)")
|
|
|
|
|
+
|
|
|
|
|
+ if avg_excess > 10:
|
|
|
|
|
+ print(f" 评价: 优秀 ⭐⭐⭐")
|
|
|
|
|
+ elif avg_excess > 5:
|
|
|
|
|
+ print(f" 评价: 良好 ⭐⭐")
|
|
|
|
|
+ elif avg_excess > 0:
|
|
|
|
|
+ print(f" 评价: 一般 ⭐")
|
|
|
|
|
+ else:
|
|
|
|
|
+ print(f" 评价: 不佳")
|
|
|
|
|
+
|
|
|
|
|
+ print("\n" + "="*90)
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ main()
|