| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 趋势质量评估器 - 参数优化测试
- 测试不同参数组合的回测表现,寻找最优配置
- """
- import sys
- sys.path.insert(0, '/root/.openclaw/workspace/trend-quality-evaluator')
- import numpy as np
- import pandas as pd
- from trend_quality_evaluator import fetch_stock_data
- from dataclasses import dataclass
- from typing import List, Tuple
- import itertools
- import warnings
- warnings.filterwarnings('ignore')
- print("="*70)
- print("趋势质量评估器 - 参数优化测试")
- print("="*70)
- # 获取数据
- df = fetch_stock_data("399673", "2017-01-01", "2026-03-06", "d")
- if df is None:
- print("数据获取失败")
- exit(1)
- print(f"\n✓ 数据获取成功: {len(df)}条")
- print(f" 日期范围: {df.index[0].date()} ~ {df.index[-1].date()}")
- # 定义参数搜索空间
- param_grid = {
- 'adx_threshold': [20, 25, 30], # ADX阈值
- 'adx_weight': [25, 30, 35], # ADX权重
- 'ma_slope_threshold': [1.001, 1.002, 1.003], # 均线斜率阈值
- 'ma_slope_weight': [20, 25, 30], # 均线斜率权重
- 'volatility_threshold': [0.7, 0.8, 0.9], # 波动率比率阈值
- 'volatility_weight': [15, 20, 25], # 波动率权重
- 'volume_threshold': [1.3, 1.5, 1.8], # 成交量阈值
- 'volume_weight': [8, 10, 12], # 成交量权重
- }
- # 总权重必须是100
- @dataclass
- class StrategyConfig:
- """策略配置"""
- adx_threshold: float
- adx_weight: int
- ma_slope_threshold: float
- ma_slope_weight: int
- volatility_threshold: float
- volatility_weight: int
- volume_threshold: float
- volume_weight: int
- timeframe_weight: int = 15 # 固定15分
-
- @property
- def total_weight(self) -> int:
- return self.adx_weight + self.ma_slope_weight + self.volatility_weight + self.volume_weight + self.timeframe_weight
-
- @property
- def trade_threshold(self) -> int:
- return 60 # 固定60分阈值
- def calculate_indicators(df: pd.DataFrame) -> pd.DataFrame:
- """预计算所有技术指标"""
- data = df.copy()
-
- # ADX
- high, low, close = data['high'], data['low'], data['close']
- plus_dm = high.diff()
- minus_dm = low.diff().abs()
- plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
- minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
-
- tr1 = high - low
- tr2 = (high - close.shift()).abs()
- tr3 = (low - close.shift()).abs()
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
- atr = tr.rolling(14).mean()
-
- plus_di = 100 * (plus_dm.rolling(14).mean() / atr)
- minus_di = 100 * (minus_dm.rolling(14).mean() / atr)
- dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)) * 100
- data['adx'] = dx.rolling(14).mean()
-
- # MA20斜率
- data['ma20'] = data['close'].rolling(20).mean()
- data['ma20_slope'] = data['ma20'] / data['ma20'].shift(5)
-
- # ATR比率
- data['atr14'] = tr.rolling(14).mean()
- data['atr50'] = tr.rolling(50).mean()
- data['atr_ratio'] = data['atr14'] / data['atr50']
-
- # 成交量
- data['volume_ma20'] = data['volume'].rolling(20).mean()
- data['volume_ratio'] = data['volume'] / data['volume_ma20']
-
- # 日线突破
- data['price_above_ma20'] = data['close'] > data['ma20']
-
- return data
- def evaluate_strategy(data: pd.DataFrame, config: StrategyConfig) -> dict:
- """
- 评估策略配置
- 返回各项绩效指标
- """
- scores = []
-
- # 从第60天开始(需要足够数据计算MA60)
- for i in range(60, len(data)):
- row = data.iloc[i]
-
- # 计算各因子得分
- # 1. ADX得分
- if row['adx'] >= config.adx_threshold:
- adx_score = config.adx_weight
- else:
- adx_score = config.adx_weight * (row['adx'] / config.adx_threshold)
-
- # 2. 均线斜率得分
- if row['ma20_slope'] >= config.ma_slope_threshold * 1.5:
- ma_score = config.ma_slope_weight
- elif row['ma20_slope'] >= config.ma_slope_threshold:
- ma_score = config.ma_slope_weight * 0.7
- elif row['ma20_slope'] >= 1.0:
- ma_score = config.ma_slope_weight * 0.3
- else:
- ma_score = 0
-
- # 3. 波动率得分
- if row['atr_ratio'] <= config.volatility_threshold * 0.75:
- vol_score = config.volatility_weight
- elif row['atr_ratio'] <= config.volatility_threshold:
- vol_score = config.volatility_weight * 0.7
- elif row['atr_ratio'] <= 1.0:
- vol_score = config.volatility_weight * 0.3
- else:
- vol_score = 0
-
- # 4. 成交量得分
- if row['volume_ratio'] >= config.volume_threshold * 1.3:
- vol_score_trade = config.volume_weight
- elif row['volume_ratio'] >= config.volume_threshold:
- vol_score_trade = config.volume_weight * 0.7
- elif row['volume_ratio'] >= 1.0:
- vol_score_trade = config.volume_weight * 0.3
- else:
- vol_score_trade = 0
-
- # 5. 时间框架得分(简化版:价格突破MA20)
- tf_score = config.timeframe_weight if row['price_above_ma20'] else 0
-
- total_score = adx_score + ma_score + vol_score + vol_score_trade + tf_score
- is_tradeable = total_score >= config.trade_threshold
-
- scores.append({
- 'date': data.index[i],
- 'close': row['close'],
- 'total_score': total_score,
- 'is_tradeable': is_tradeable,
- 'adx': row['adx'],
- 'ma20_slope': row['ma20_slope'],
- 'atr_ratio': row['atr_ratio'],
- 'volume_ratio': row['volume_ratio']
- })
-
- scores_df = pd.DataFrame(scores)
- scores_df = scores_df.set_index('date')
-
- # 计算未来收益
- scores_df['future_5d_return'] = scores_df['close'].pct_change(5).shift(-5) * 100
- scores_df['future_10d_return'] = scores_df['close'].pct_change(10).shift(-10) * 100
- scores_df['future_20d_return'] = scores_df['close'].pct_change(20).shift(-20) * 100
-
- # 计算绩效指标
- t_mask = scores_df['is_tradeable']
- total_days = len(scores_df)
- tradeable_days = t_mask.sum()
-
- if tradeable_days == 0:
- return None
-
- returns_5d = scores_df[t_mask]['future_5d_return'].dropna()
- returns_10d = scores_df[t_mask]['future_10d_return'].dropna()
- returns_20d = scores_df[t_mask]['future_20d_return'].dropna()
-
- results = {
- 'config': config,
- 'total_days': total_days,
- 'tradeable_days': tradeable_days,
- 'tradeable_pct': tradeable_days / total_days * 100,
- 'avg_score': scores_df['total_score'].mean(),
- 'return_5d': returns_5d.mean() if len(returns_5d) > 0 else 0,
- 'return_10d': returns_10d.mean() if len(returns_10d) > 0 else 0,
- 'return_20d': returns_20d.mean() if len(returns_20d) > 0 else 0,
- 'win_rate_20d': (returns_20d > 0).mean() * 100 if len(returns_20d) > 0 else 0,
- 'sharpe_20d': returns_20d.mean() / returns_20d.std() if len(returns_20d) > 0 and returns_20d.std() > 0 else 0,
- 'excess_return_20d': returns_20d.mean() - scores_df[~t_mask]['future_20d_return'].mean() if len(returns_20d) > 0 else 0,
- 'scores_df': scores_df
- }
-
- return results
- # 预计算指标
- print("\n预计算技术指标...")
- data = calculate_indicators(df)
- print("✓ 指标计算完成")
- # 生成参数组合(只测试权重和为100的组合)
- print("\n生成参数组合...")
- valid_configs = []
- for adx_t, adx_w, ma_t, ma_w, vol_t, vol_w, volu_t, volu_w in itertools.product(
- param_grid['adx_threshold'],
- param_grid['adx_weight'],
- param_grid['ma_slope_threshold'],
- param_grid['ma_slope_weight'],
- param_grid['volatility_threshold'],
- param_grid['volatility_weight'],
- param_grid['volume_threshold'],
- param_grid['volume_weight']
- ):
- config = StrategyConfig(
- adx_threshold=adx_t,
- adx_weight=adx_w,
- ma_slope_threshold=ma_t,
- ma_slope_weight=ma_w,
- volatility_threshold=vol_t,
- volatility_weight=vol_w,
- volume_threshold=volu_t,
- volume_weight=volu_w
- )
-
- if config.total_weight == 100: # 只保留权重和为100的组合
- valid_configs.append(config)
- print(f"✓ 有效参数组合: {len(valid_configs)}个")
- # 运行回测
- print(f"\n开始回测 ({len(valid_configs)}个组合)...")
- print("-"*70)
- all_results = []
- for i, config in enumerate(valid_configs):
- result = evaluate_strategy(data, config)
- if result:
- all_results.append(result)
-
- if (i + 1) % 50 == 0:
- print(f" 进度: {i+1}/{len(valid_configs)} ({(i+1)/len(valid_configs)*100:.1f}%)")
- print(f"\n✓ 回测完成: {len(all_results)}个有效结果")
- # 排序并选择最佳配置
- print("\n" + "="*70)
- print("参数优化结果")
- print("="*70)
- # 按不同目标排序
- sort_by = [
- ('20日收益', lambda r: r['return_20d'], True),
- ('超额收益', lambda r: r['excess_return_20d'], True),
- ('胜率', lambda r: r['win_rate_20d'], True),
- ('夏普比率', lambda r: r['sharpe_20d'], True),
- ]
- best_configs = {}
- for name, key_func, reverse in sort_by:
- sorted_results = sorted(all_results, key=key_func, reverse=reverse)
- best_configs[name] = sorted_results[0]
-
- print(f"\n【最佳配置 - {name}】")
- best = sorted_results[0]
- print(f" ADX阈值: {best['config'].adx_threshold}, 权重: {best['config'].adx_weight}")
- print(f" MA斜率阈值: {best['config'].ma_slope_threshold}, 权重: {best['config'].ma_slope_weight}")
- print(f" 波动率阈值: {best['config'].volatility_threshold}, 权重: {best['config'].volatility_weight}")
- print(f" 成交量阈值: {best['config'].volume_threshold}, 权重: {best['config'].volume_weight}")
- print(f" 时间框架权重: {best['config'].timeframe_weight}")
- print(f"\n 绩效指标:")
- print(f" 可交易比例: {best['tradeable_pct']:.1f}%")
- print(f" 5日收益: {best['return_5d']:+.2f}%")
- print(f" 10日收益: {best['return_10d']:+.2f}%")
- print(f" 20日收益: {best['return_20d']:+.2f}%")
- print(f" 超额收益: {best['excess_return_20d']:+.2f}%")
- print(f" 胜率(20日): {best['win_rate_20d']:.1f}%")
- print(f" 夏普比率: {best['sharpe_20d']:.2f}")
- # 综合评分最佳(平衡收益、胜率、夏普)
- print(f"\n【综合最佳配置】")
- print("-"*50)
- # 计算综合得分(标准化后加权)
- max_return = max(r['return_20d'] for r in all_results)
- max_winrate = max(r['win_rate_20d'] for r in all_results)
- max_sharpe = max(r['sharpe_20d'] for r in all_results)
- max_excess = max(r['excess_return_20d'] for r in all_results)
- for r in all_results:
- # 综合得分 = 收益*0.3 + 超额*0.3 + 胜率*0.2 + 夏普*0.2
- r['composite_score'] = (
- (r['return_20d'] / max_return if max_return > 0 else 0) * 0.3 +
- (r['excess_return_20d'] / max_excess if max_excess > 0 else 0) * 0.3 +
- (r['win_rate_20d'] / max_winrate if max_winrate > 0 else 0) * 0.2 +
- (r['sharpe_20d'] / max_sharpe if max_sharpe > 0 else 0) * 0.2
- ) * 100
- best_composite = max(all_results, key=lambda r: r['composite_score'])
- print(f" ADX阈值: {best_composite['config'].adx_threshold}, 权重: {best_composite['config'].adx_weight}")
- print(f" MA斜率阈值: {best_composite['config'].ma_slope_threshold}, 权重: {best_composite['config'].ma_slope_weight}")
- print(f" 波动率阈值: {best_composite['config'].volatility_threshold}, 权重: {best_composite['config'].volatility_weight}")
- print(f" 成交量阈值: {best_composite['config'].volume_threshold}, 权重: {best_composite['config'].volume_weight}")
- print(f"\n 绩效指标:")
- print(f" 可交易比例: {best_composite['tradeable_pct']:.1f}%")
- print(f" 5日收益: {best_composite['return_5d']:+.2f}%")
- print(f" 10日收益: {best_composite['return_10d']:+.2f}%")
- print(f" 20日收益: {best_composite['return_20d']:+.2f}%")
- print(f" 超额收益: {best_composite['excess_return_20d']:+.2f}%")
- print(f" 胜率(20日): {best_composite['win_rate_20d']:.1f}%")
- print(f" 夏普比率: {best_composite['sharpe_20d']:.2f}")
- print(f" 综合得分: {best_composite['composite_score']:.1f}")
- # 保存结果
- print("\n" + "="*70)
- print("保存结果...")
- results_summary = []
- for r in all_results:
- results_summary.append({
- 'adx_threshold': r['config'].adx_threshold,
- 'adx_weight': r['config'].adx_weight,
- 'ma_slope_threshold': r['config'].ma_slope_threshold,
- 'ma_slope_weight': r['config'].ma_slope_weight,
- 'volatility_threshold': r['config'].volatility_threshold,
- 'volatility_weight': r['config'].volatility_weight,
- 'volume_threshold': r['config'].volume_threshold,
- 'volume_weight': r['config'].volume_weight,
- 'tradeable_pct': r['tradeable_pct'],
- 'return_5d': r['return_5d'],
- 'return_10d': r['return_10d'],
- 'return_20d': r['return_20d'],
- 'excess_return_20d': r['excess_return_20d'],
- 'win_rate_20d': r['win_rate_20d'],
- 'sharpe_20d': r['sharpe_20d'],
- 'composite_score': r['composite_score']
- })
- results_df = pd.DataFrame(results_summary)
- results_df = results_df.sort_values('composite_score', ascending=False)
- results_df.to_csv('/root/.openclaw/workspace/trend-quality-evaluator/optimization_results.csv', index=False)
- # 保存最佳配置详情
- best_composite['scores_df'].to_csv('/root/.openclaw/workspace/trend-quality-evaluator/best_config_backtest.csv')
- print("✓ 优化结果已保存: optimization_results.csv")
- print("✓ 最佳配置回测详情: best_config_backtest.csv")
- print("\n" + "="*70)
- print("参数优化测试完成!")
- print("="*70)
|