| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 创业板50指数 - 终极高收益策略
- 核心:追涨杀跌 + 杠杆思维 + 择时精准
- """
- import pandas as pd
- import numpy as np
- import matplotlib
- matplotlib.use('Agg')
- import matplotlib.pyplot as plt
- import warnings
- warnings.filterwarnings('ignore')
- def generate_realistic_data():
- """基于创业板真实历史特征生成数据"""
- np.random.seed(42)
- dates = pd.date_range(start='2017-01-01', end='2025-12-31', freq='D')
- dates = dates[dates.dayofweek < 5]
- n = len(dates)
-
- # 生成带有肥尾和趋势特征的收益率
- returns = np.random.standard_t(df=4, size=n) * 0.012 # t分布,肥尾
-
- # 按年份调整(基于创业板真实历史)
- year_returns = {
- 2017: -0.0003, 2018: -0.0015, 2019: 0.0018,
- 2020: 0.0022, 2021: 0.0008, 2022: -0.0012,
- 2023: -0.0006, 2024: 0.0010, 2025: 0.0008
- }
-
- for i, date in enumerate(dates):
- year = date.year
- if year in year_returns:
- returns[i] += year_returns[year]
-
- # 添加动量自相关(趋势延续)
- for i in range(1, len(returns)):
- returns[i] += returns[i-1] * 0.15 # 15%动量延续
-
- price = 1800
- prices = [price]
- for r in returns:
- price *= (1 + r)
- prices.append(price)
-
- df = pd.DataFrame(index=dates)
- df['close'] = prices[1:]
- df['open'] = df['close'].shift(1) * (1 + np.random.normal(0, 0.003, n))
- df['high'] = df[['open', 'close']].max(axis=1) * (1 + np.abs(np.random.normal(0, 0.008, n)))
- df['low'] = df[['open', 'close']].min(axis=1) * (1 - np.abs(np.random.normal(0, 0.008, n)))
- df = df.dropna()
-
- return df
- class UltimateStrategy:
- """
- 终极策略:三重过滤 + 动态杠杆
- """
-
- def __init__(self):
- self.position = 0
- self.cash = 1.0
- self.holdings = 0
- self.entry_price = 0
- self.peak_price = 0
- self.state = "EMPTY"
-
- def signal(self, data):
- """三重过滤信号"""
- c = data['close']
-
- # 1. 趋势过滤(三均线多头排列)
- ma5 = c.rolling(5).mean().iloc[-1]
- ma20 = c.rolling(20).mean().iloc[-1]
- ma60 = c.rolling(60).mean().iloc[-1]
-
- trend_ok = (c.iloc[-1] > ma5) and (ma5 > ma20) and (ma20 > ma60)
-
- # 2. 动量过滤(20日涨幅为正且加速)
- ret20 = (c.iloc[-1] / c.iloc[-20] - 1)
- ret10 = (c.iloc[-1] / c.iloc[-10] - 1)
- momentum_ok = (ret20 > 0.05) and (ret10 > ret20 * 0.6) # 动量加速
-
- # 3. 波动率过滤(低波动时重仓)
- volatility = c.pct_change().rolling(20).std().iloc[-1] * np.sqrt(252)
- vol_ok = volatility < 0.45 # 年化波动小于45%
-
- # 综合信号
- if trend_ok and momentum_ok and vol_ok:
- return "FULL", 1.0
- elif trend_ok and momentum_ok:
- return "HALF", 0.5
- elif trend_ok:
- return "QUARTER", 0.25
- else:
- return "EMPTY", 0.0
-
- def manage_risk(self, current_price):
- """风险管理"""
- if self.position <= 0:
- return self.position
-
- # 更新峰值
- if current_price > self.peak_price:
- self.peak_price = current_price
-
- # 回撤控制
- drawdown = (current_price - self.peak_price) / self.peak_price
-
- # 从高点回撤12%清仓
- if drawdown < -0.12:
- return 0.0
-
- # 从高点回撤8%减半
- if drawdown < -0.08 and self.position >= 0.5:
- return self.position * 0.5
-
- # 入场后亏损8%止损
- if self.entry_price > 0:
- loss = (current_price - self.entry_price) / self.entry_price
- if loss < -0.08:
- return 0.0
-
- return self.position
-
- def generate_signal(self, data):
- """主信号生成"""
- signal, pos = self.signal(data)
-
- current_price = data['close'].iloc[-1]
-
- # 先应用风险管理
- pos = self.manage_risk(current_price)
-
- # 状态更新
- if pos > self.position and self.position == 0:
- # 新开仓
- self.entry_price = current_price
- self.peak_price = current_price
- self.state = "ENTRY"
- elif pos == 0 and self.position > 0:
- # 清仓
- self.entry_price = 0
- self.peak_price = 0
- self.state = "EXIT"
- elif pos == 1.0:
- self.state = "FULL"
- elif pos == 0.5:
- self.state = "HALF"
- elif pos == 0:
- self.state = "EMPTY"
- else:
- self.state = "PARTIAL"
-
- self.position = pos
- return pos, self.state
- def backtest(data, strategy, start_date, end_date, warmup=60):
- """回测"""
- data = data[data.index >= start_date]
- data = data[data.index <= end_date]
-
- results = []
- nav = 1.0
-
- for i in range(warmup, len(data)):
- curr = data.iloc[:i+1]
- pos, state = strategy.generate_signal(curr)
-
- if i > warmup:
- daily_ret = data['close'].iloc[i] / data['close'].iloc[i-1] - 1
- # 使用前一个时刻的仓位计算当日收益
- prev_pos = results[-1]['position']
- nav *= (1 + daily_ret * prev_pos)
-
- results.append({
- 'date': data.index[i],
- 'position': pos,
- 'nav': nav,
- 'state': state,
- 'close': data['close'].iloc[i]
- })
-
- df = pd.DataFrame(results).set_index('date')
- df['index_nav'] = df['close'] / df['close'].iloc[0]
- return df
- def metrics(nav, index_nav):
- """计算指标"""
- s_ret = nav.pct_change().dropna()
-
- total = nav.iloc[-1] - 1
- days = len(nav)
- annual = (1 + total) ** (252/days) - 1
-
- idx_total = index_nav.iloc[-1] - 1
- idx_annual = (1 + idx_total) ** (252/days) - 1
-
- # 最大回撤
- running_max = nav.expanding().max()
- dd = ((nav - running_max) / running_max).min()
-
- # 夏普
- vol = s_ret.std() * np.sqrt(252)
- sharpe = (annual - 0.03) / vol if vol > 0 else 0
-
- # 卡玛
- calmar = annual / abs(dd) if dd != 0 else 0
-
- return {
- 'annual': annual,
- 'idx_annual': idx_annual,
- 'excess': annual - idx_annual,
- 'max_dd': dd,
- 'sharpe': sharpe,
- 'calmar': calmar,
- 'total': total,
- 'idx_total': idx_total
- }
- def plot(df, title, filename):
- """绘图"""
- fig, ax = plt.subplots(2, 1, figsize=(14, 8))
-
- ax[0].plot(df.index, df['nav'], 'r-', linewidth=2, label='Strategy')
- ax[0].plot(df.index, df['index_nav'], 'gray', linewidth=1, alpha=0.7, label='Index')
- ax[0].set_title(title, fontsize=14)
- ax[0].legend()
- ax[0].grid(True, alpha=0.3)
-
- ax[1].fill_between(df.index, 0, df['position'], alpha=0.5, color='green')
- ax[1].set_ylim(0, 1.1)
- ax[1].set_ylabel('Position')
- ax[1].grid(True, alpha=0.3)
-
- plt.tight_layout()
- plt.savefig(filename, dpi=150)
- print(f" 图表: {filename}")
- def main():
- print("="*60)
- print("创业板50 - 终极高收益策略")
- print("="*60)
-
- # 数据
- print("\n[1] 加载数据...")
- data = generate_realistic_data()
- print(f" {data.index[0].date()} ~ {data.index[-1].date()}")
-
- # 训练
- print("\n[2] 训练集 (2018-2023)...")
- s = UltimateStrategy()
- train = backtest(data, s, '2018-01-01', '2023-12-31')
- m = metrics(train['nav'], train['index_nav'])
-
- print(f"\n ╔════════════════════════════════╗")
- print(f" ║ 训 练 集 结 果 ║")
- print(f" ╠════════════════════════════════╣")
- print(f" ║ 策略收益: {m['total']*100:7.1f}% ║")
- print(f" ║ 指数收益: {m['idx_total']*100:7.1f}% ║")
- print(f" ║ 年化收益: {m['annual']*100:7.1f}% ║")
- print(f" ║ 超额收益: {m['excess']*100:7.1f}% ║")
- print(f" ║ 最大回撤: {m['max_dd']*100:7.1f}% ║")
- print(f" ║ 夏普比率: {m['sharpe']:7.2f} ║")
- print(f" ║ 卡玛比率: {m['calmar']:7.2f} ║")
- print(f" ╚════════════════════════════════╝")
-
- plot(train, "Training (2018-2023)", "train_ultimate.png")
-
- # 验证
- print("\n[3] 验证集 (2024-2025)...")
- s2 = UltimateStrategy()
- val = backtest(data, s2, '2024-01-01', '2025-12-31')
- m2 = metrics(val['nav'], val['index_nav'])
-
- print(f"\n ╔════════════════════════════════╗")
- print(f" ║ 验 证 集 结 果 ║")
- print(f" ╠════════════════════════════════╣")
- print(f" ║ 策略收益: {m2['total']*100:7.1f}% ║")
- print(f" ║ 指数收益: {m2['idx_total']*100:7.1f}% ║")
- print(f" ║ 年化收益: {m2['annual']*100:7.1f}% ║")
- print(f" ║ 超额收益: {m2['excess']*100:7.1f}% ║")
- print(f" ║ 最大回撤: {m2['max_dd']*100:7.1f}% ║")
- print(f" ║ 夏普比率: {m2['sharpe']:7.2f} ║")
- print(f" ╚════════════════════════════════╝")
-
- plot(val, "Validation (2024-2025)", "val_ultimate.png")
-
- # 评估
- print("\n[4] 策略评估:")
- if m['annual'] > 0.20 and m['calmar'] > 0.5:
- print(" ✅ 训练集表现优秀,具备盈利潜力")
- else:
- print(" ⚠️ 训练集收益一般")
-
- decay = (m['annual'] - m2['annual']) / m['annual'] if m['annual'] > 0 else 0
- print(f" 年化收益衰减: {decay*100:.0f}%")
-
- if m2['annual'] > 0.15:
- print(" ✅ 验证集收益优秀")
- elif m2['annual'] > 0:
- print(" ⚠️ 验证集收益一般")
- else:
- print(" ❌ 验证集亏损")
-
- print("\n" + "="*60)
- if __name__ == "__main__":
- main()
|