| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50指数 - 全策略年度收益对比(真实数据)
- 对比策略:趋势跟踪、双均线、动量、多因子、RSI
- """
- import pandas as pd
- import numpy as np
- import warnings
- warnings.filterwarnings('ignore')
- print("="*90)
- print("创业板50指数 - 全策略年度收益对比(真实数据)")
- print("="*90)
- # ==================== 1. 加载真实数据 ====================
- def load_real_data():
- df = pd.read_csv('cyb50_baostock.csv')
- df['date'] = pd.to_datetime(df['date'])
- df = df.set_index('date').sort_index()
- for col in ['open', 'high', 'low', 'close', 'volume']:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- return df
- # ==================== 2. 策略定义 ====================
- # 策略1: 趋势跟踪
- def strategy_trend(data, pos):
- close = data['close'].values
- high = data['high'].values
- low = data['low'].values
- if len(close) < 60:
- return 0, "INIT"
-
- ma10 = np.mean(close[-10:])
- ma30 = np.mean(close[-30:])
- ret10 = (close[-1] / close[-10] - 1) if len(close) >= 10 else 0
- high_20 = np.max(high[-20:])
- low_20 = np.min(low[-20:])
- curr = close[-1]
-
- buy = (curr > ma10 > ma30) and (curr >= high_20 * 0.995) and (ret10 > 0.02)
- sell = (curr < ma30) or (curr <= low_20 * 1.005)
-
- if buy and pos == 0:
- return 1.0, "ENTRY"
- elif sell and pos > 0:
- return 0.0, "EXIT"
- return pos, "HOLD" if pos > 0 else "EMPTY"
- # 策略2: 双均线
- def strategy_ma_cross(data, pos):
- close = data['close'].values
- if len(close) < 60:
- return 0, "INIT"
-
- ma20 = np.mean(close[-20:])
- ma60 = np.mean(close[-60:])
- curr = close[-1]
-
- if curr > ma20 > ma60:
- return 1.0, "BULL"
- elif curr < ma60:
- return 0.0, "BEAR"
- return pos, "HOLD"
- # 策略3: 动量
- def strategy_momentum(data, pos):
- close = data['close']
- if len(close) < 60:
- return 0, "INIT"
-
- ma5 = close.rolling(5).mean().iloc[-1]
- ma20 = close.rolling(20).mean().iloc[-1]
- ma60 = close.rolling(60).mean().iloc[-1]
-
- momentum = (close.iloc[-1] / close.iloc[-10] - 1) * 100
-
- trend_strong = (close.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60)
- trend_weak = (close.iloc[-1] < ma5) and (ma5 < ma20)
-
- if trend_strong and momentum > 2:
- return 1.0, "STRONG_UP"
- elif trend_strong and momentum > 0:
- return 0.8, "UP"
- elif trend_weak or momentum < -3:
- return 0.0, "DOWN"
- return 0.5, "OSCILLATE"
- # 策略4: 多因子
- def strategy_multifactor(data, pos):
- c = data['close']
- h = data['high']
- l = data['low']
-
- if len(c) < 60:
- return 0, "INIT"
-
- # 趋势因子
- ma5 = c.rolling(5).mean()
- ma20 = c.rolling(20).mean()
- ma60 = c.rolling(60).mean()
-
- trend_score = 0
- if c.iloc[-1] > ma5.iloc[-1]: trend_score += 1
- if ma5.iloc[-1] > ma20.iloc[-1]: trend_score += 1
- if ma20.iloc[-1] > ma60.iloc[-1]: trend_score += 1
- trend_score = trend_score / 3
-
- # 动量因子
- ret20 = (c.iloc[-1] / c.iloc[-20] - 1) if len(c) >= 20 else 0
- mom_score = np.clip((ret20 + 0.2) / 0.4, 0, 1)
-
- # 波动率因子
- atr = pd.concat([h-l, (h-c.shift(1)).abs(), (l-c.shift(1)).abs()], axis=1).max(axis=1)
- atr_mean = atr.rolling(20).mean().iloc[-1]
- vol_pct = atr_mean / c.iloc[-1]
- vol_score = 1 - np.clip((vol_pct - 0.015) / 0.025, 0, 1)
-
- # 突破因子
- high_20 = h.rolling(20).max()
- breakout = 1 if c.iloc[-1] >= high_20.iloc[-1] * 0.99 else 0
-
- # 综合得分
- total_score = trend_score * 0.35 + mom_score * 0.25 + vol_score * 0.25 + breakout * 0.15
-
- if total_score > 0.7:
- return 1.0, "STRONG"
- elif total_score > 0.5:
- return 0.6, "MEDIUM"
- elif total_score > 0.3:
- return 0.3, "WEAK"
- return 0.0, "EMPTY"
- # 策略5: RSI
- def strategy_rsi(data, pos):
- close = data['close']
- if len(close) < 20:
- return 0, "INIT"
-
- delta = close.diff()
- gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
- loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
- rs = gain / loss
- rsi = 100 - (100 / (1 + rs))
-
- curr_rsi = rsi.iloc[-1]
-
- if pd.isna(curr_rsi):
- return 0, "INIT"
-
- if curr_rsi < 30:
- return 1.0, "OVERSOLD"
- elif curr_rsi > 70:
- return 0.0, "OVERBOUGHT"
- return pos, "HOLD"
- # ==================== 3. 回测引擎 ====================
- def backtest_yearly(data, strategy_func, year, warmup=60):
- """年度回测"""
- year_data = data[data.index.year == year].copy()
- if len(year_data) < warmup + 5:
- return None
-
- nav = 1.0
- position = 0
-
- for i in range(warmup, len(year_data)):
- curr_data = data.loc[:year_data.index[i]]
- new_pos, state = strategy_func(curr_data, position)
-
- if i > warmup:
- daily_ret = year_data['close'].iloc[i] / year_data['close'].iloc[i-1] - 1
- nav *= (1 + daily_ret * position)
-
- position = new_pos if new_pos is not None else position
-
- return (nav - 1) * 100 # 返回百分比收益
- # ==================== 4. 主程序 ====================
- def main():
- # 加载数据
- data = load_real_data()
-
- strategies = [
- ("趋势跟踪", strategy_trend),
- ("双均线", strategy_ma_cross),
- ("动量", strategy_momentum),
- ("多因子", strategy_multifactor),
- ("RSI", strategy_rsi),
- ]
-
- years = list(range(2018, 2026))
-
- # 收集结果
- results = {name: {} for name, _ in strategies}
- results["买入持有"] = {}
-
- print("\n开始年度回测...")
- print("-"*90)
-
- for year in years:
- year_data = data[data.index.year == year]
- if len(year_data) == 0:
- continue
-
- # 买入持有收益
- start_price = year_data['close'].iloc[60] # 考虑warmup
- end_price = year_data['close'].iloc[-1]
- buyhold_ret = (end_price / start_price - 1) * 100
- results["买入持有"][year] = buyhold_ret
-
- # 各策略收益
- for name, strategy_func in strategies:
- ret = backtest_yearly(data, strategy_func, year)
- results[name][year] = ret if ret is not None else 0
-
- # 打印结果
- print("\n" + "="*90)
- print("年度收益对比表 (%)")
- print("="*90)
-
- # 表头
- header = f"{'策略':<10}"
- for year in years:
- header += f" | {year:>8}"
- header += f" | {'平均':>8} | {'跑赢年数':>8}"
- print(header)
- print("-"*90)
-
- # 各策略
- all_strategies = list(results.keys())
- for name in all_strategies:
- row = f"{name:<10}"
- returns = []
- win_count = 0
-
- for year in years:
- if year in results[name]:
- ret = results[name][year]
- returns.append(ret)
- row += f" | {ret:>+7.1f}"
-
- # 判断是否跑赢指数
- if name != "买入持有" and year in results["买入持有"]:
- if ret > results["买入持有"][year]:
- win_count += 1
- else:
- row += f" | {'--':>8}"
-
- avg_ret = np.mean(returns) if returns else 0
- row += f" | {avg_ret:>+7.1f}"
-
- if name != "买入持有":
- row += f" | {win_count:>6}/8"
- else:
- row += f" | {'--':>8}"
-
- print(row)
-
- print("-"*90)
-
- # 超额收益统计
- print("\n" + "="*90)
- print("超额收益对比 (策略 - 买入持有)")
- print("="*90)
-
- header = f"{'策略':<10}"
- for year in years:
- header += f" | {year:>8}"
- header += f" | {'平均超额':>8}"
- print(header)
- print("-"*90)
-
- for name in all_strategies:
- if name == "买入持有":
- continue
-
- row = f"{name:<10}"
- excess_returns = []
-
- for year in years:
- if year in results[name] and year in results["买入持有"]:
- excess = results[name][year] - results["买入持有"][year]
- excess_returns.append(excess)
- marker = "⭐" if excess > 10 else "✓" if excess > 0 else ""
- row += f" | {excess:>+7.1f}{marker}"
- else:
- row += f" | {'--':>8}"
-
- avg_excess = np.mean(excess_returns) if excess_returns else 0
- row += f" | {avg_excess:>+7.1f}"
- print(row)
-
- print("-"*90)
-
- # 汇总评价
- print("\n" + "="*90)
- print("策略评价")
- print("="*90)
-
- for name in all_strategies:
- if name == "买入持有":
- continue
-
- returns = [results[name][y] for y in years if y in results[name]]
- excess_list = [results[name][y] - results["买入持有"][y] for y in years if y in results[name] and y in results["买入持有"]]
-
- win_years = sum(1 for e in excess_list if e > 0)
- avg_ret = np.mean(returns)
- avg_excess = np.mean(excess_list)
-
- print(f"\n【{name}】")
- print(f" 年均收益: {avg_ret:+.1f}%")
- print(f" 年均超额: {avg_excess:+.1f}%")
- print(f" 跑赢年数: {win_years}/8 ({win_years/8*100:.0f}%)")
-
- if avg_excess > 10:
- print(f" 评价: 优秀 ⭐⭐⭐")
- elif avg_excess > 5:
- print(f" 评价: 良好 ⭐⭐")
- elif avg_excess > 0:
- print(f" 评价: 一般 ⭐")
- else:
- print(f" 评价: 不佳")
-
- print("\n" + "="*90)
- if __name__ == "__main__":
- main()
|