#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 趋势质量评估器 (Trend Quality Evaluator) 多因子评分模型:0-100分制,≥60分触发交易 因子权重: - ADX趋势强度: 30% - 均线斜率: 25% - 波动率收缩: 20% - 多时间框架共振: 15% - 成交量确认: 10% """ import numpy as np import pandas as pd import baostock as bs from dataclasses import dataclass from typing import Optional, Tuple import warnings warnings.filterwarnings('ignore') @dataclass class TrendQualityScore: """趋势质量评分结果""" total_score: float # 总分 0-100 adx_score: float # ADX得分 0-30 ma_slope_score: float # 均线斜率得分 0-25 volatility_score: float # 波动率得分 0-20 timeframe_score: float # 时间框架得分 0-15 volume_score: float # 成交量得分 0-10 is_tradeable: bool # 是否可交易 (>=60分) adx_value: float # 原始ADX值 ma_slope: float # 均线斜率 volatility_ratio: float # 波动率比率 volume_ratio: float # 成交量比率 class TrendQualityEvaluator: """趋势质量评估器""" def __init__(self): self.weights = { 'adx': 30, 'ma_slope': 25, 'volatility': 20, 'timeframe': 15, 'volume': 10 } def calculate_adx(self, df: pd.DataFrame, period: int = 14) -> pd.Series: """计算ADX趋势强度指标""" high, low, close = df['high'], df['low'], df['close'] # +DM和-DM plus_dm = high.diff() minus_dm = low.diff().abs() plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0) minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0) # 真实波幅 TR tr1 = high - low tr2 = (high - close.shift()).abs() tr3 = (low - close.shift()).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) # ATR atr = tr.rolling(period).mean() # +DI和-DI plus_di = 100 * (plus_dm.rolling(period).mean() / atr) minus_di = 100 * (minus_dm.rolling(period).mean() / atr) # DX和ADX dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)) * 100 adx = dx.rolling(period).mean() return adx def evaluate(self, df: pd.DataFrame, df_weekly: Optional[pd.DataFrame] = None) -> TrendQualityScore: """ 评估趋势质量 Args: df: 日线数据 DataFrame (需要包含 open, high, low, close, volume) df_weekly: 周线数据 (可选,用于多时间框架共振) Returns: TrendQualityScore: 评分结果 """ latest = df.iloc[-1] # 1. ADX趋势强度 (30分) - 阈值: ADX > 25 adx = self.calculate_adx(df, 14) latest_adx = adx.iloc[-1] # 评分: ADX 0-50映射到0-30分,>25得满分 if latest_adx >= 25: adx_score = 30 else: adx_score = min(30, latest_adx * 30 / 25) # 2. 均线斜率 (25分) - 阈值: MA20/MA20[5] > 1.002 ma20 = df['close'].rolling(20).mean() ma20_current = ma20.iloc[-1] ma20_5days_ago = ma20.iloc[-5] if len(ma20) >= 5 else ma20_current ma_slope = ma20_current / ma20_5days_ago if ma20_5days_ago > 0 else 1 # 评分: 斜率 > 1.002得满分,线性递减 if ma_slope >= 1.005: ma_slope_score = 25 elif ma_slope >= 1.002: ma_slope_score = 25 * (ma_slope - 1.002) / (1.005 - 1.002) + 15 elif ma_slope >= 1.0: ma_slope_score = 15 * (ma_slope - 1.0) / 0.002 else: ma_slope_score = 0 # 3. 波动率收缩 (20分) - 阈值: ATR(14)/ATR(50) < 0.8 atr14 = self._calculate_atr(df, 14) atr50 = self._calculate_atr(df, 50) volatility_ratio = atr14 / atr50 if atr50 > 0 else 1 # 评分: 比率 < 0.8得满分,越小越好 if volatility_ratio <= 0.6: volatility_score = 20 elif volatility_ratio <= 0.8: volatility_score = 20 - 10 * (volatility_ratio - 0.6) / 0.2 elif volatility_ratio <= 1.0: volatility_score = 10 - 10 * (volatility_ratio - 0.8) / 0.2 else: volatility_score = 0 # 4. 多时间框架共振 (15分) - 日线突破+周线方向一致 timeframe_score = 0 if df_weekly is not None and len(df_weekly) >= 5: # 日线突破: 收盘价 > MA20 daily_breakout = latest['close'] > ma20_current # 周线方向: 周线MA5 > MA10 weekly_ma5 = df_weekly['close'].rolling(5).mean().iloc[-1] weekly_ma10 = df_weekly['close'].rolling(10).mean().iloc[-1] weekly_aligned = weekly_ma5 > weekly_ma10 if daily_breakout and weekly_aligned: timeframe_score = 15 elif daily_breakout or weekly_aligned: timeframe_score = 7.5 else: # 无周线数据时,仅看日线突破 daily_breakout = latest['close'] > ma20_current timeframe_score = 15 if daily_breakout else 0 # 5. 成交量确认 (10分) - 突破当日成交量 > 20日均量1.5倍 volume_ma20 = df['volume'].rolling(20).mean().iloc[-1] volume_ratio = latest['volume'] / volume_ma20 if volume_ma20 > 0 else 1 # 评分: >1.5倍得满分 if volume_ratio >= 2.0: volume_score = 10 elif volume_ratio >= 1.5: volume_score = 10 - 5 * (2.0 - volume_ratio) / 0.5 elif volume_ratio >= 1.0: volume_score = 5 - 5 * (1.5 - volume_ratio) / 0.5 else: volume_score = 0 # 计算总分 total_score = adx_score + ma_slope_score + volatility_score + timeframe_score + volume_score return TrendQualityScore( total_score=round(total_score, 1), adx_score=round(adx_score, 1), ma_slope_score=round(ma_slope_score, 1), volatility_score=round(volatility_score, 1), timeframe_score=round(timeframe_score, 1), volume_score=round(volume_score, 1), is_tradeable=total_score >= 60, adx_value=round(latest_adx, 2), ma_slope=round(ma_slope, 4), volatility_ratio=round(volatility_ratio, 3), volume_ratio=round(volume_ratio, 2) ) def _calculate_atr(self, df: pd.DataFrame, period: int) -> float: """计算ATR""" high, low, close = df['high'], df['low'], df['close'] tr1 = high - low tr2 = (high - close.shift()).abs() tr3 = (low - close.shift()).abs() tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1) return tr.rolling(period).mean().iloc[-1] def fetch_stock_data(symbol: str, start_date: str, end_date: str, frequency: str = "d") -> Optional[pd.DataFrame]: """获取股票数据""" try: bs.login() if symbol.startswith('6'): code = f"sh.{symbol}" elif symbol.startswith('0') or symbol.startswith('3'): code = f"sz.{symbol}" else: code = symbol rs = bs.query_history_k_data_plus( code, "date,open,high,low,close,volume", start_date=start_date, end_date=end_date, frequency=frequency, adjustflag="3" ) data = [] while rs.error_code == '0' and rs.next(): row = rs.get_row_data() data.append({ 'date': row[0], 'open': float(row[1]), 'high': float(row[2]), 'low': float(row[3]), 'close': float(row[4]), 'volume': int(float(row[5])) }) bs.logout() if not data: return None df = pd.DataFrame(data) df['date'] = pd.to_datetime(df['date']) df = df.set_index('date').sort_index() return df except Exception as e: print(f"数据获取失败: {e}") return None def main(): """主函数 - 示例用法""" print("="*70) print("趋势质量评估器 (Trend Quality Evaluator)") print("="*70) # 示例: 评估创业板50 symbol = "399673" # 创业板50 print(f"\n评估标的: 创业板50 ({symbol})") print("-"*70) # 获取日线数据 df_daily = fetch_stock_data(symbol, "2024-01-01", "2026-12-31", "d") if df_daily is None: print("数据获取失败") return # 获取周线数据(用于多时间框架共振) df_weekly = fetch_stock_data(symbol, "2023-01-01", "2026-12-31", "w") # 评估趋势质量 evaluator = TrendQualityEvaluator() score = evaluator.evaluate(df_daily, df_weekly) # 打印结果 print(f"\n📊 评估日期: {df_daily.index[-1].strftime('%Y-%m-%d')}") print(f"📈 当前价格: {df_daily['close'].iloc[-1]:.2f}") print() print("="*50) print("评分详情 (满分100分)") print("="*50) print(f"{'1. ADX趋势强度 (30分):':<25} {score.adx_score:>6.1f}分 (ADX={score.adx_value:.2f})") print(f"{'2. 均线斜率 (25分):':<25} {score.ma_slope_score:>6.1f}分 (斜率={score.ma_slope:.4f})") print(f"{'3. 波动率收缩 (20分):':<25} {score.volatility_score:>6.1f}分 (ATR比={score.volatility_ratio:.3f})") print(f"{'4. 多时间框架共振 (15分):':<25} {score.timeframe_score:>6.1f}分") print(f"{'5. 成交量确认 (10分):':<25} {score.volume_score:>6.1f}分 (量比={score.volume_ratio:.2f}x)") print("-"*50) print(f"{'总分:':<25} {score.total_score:>6.1f}分") print("="*50) # 交易建议 print(f"\n🎯 交易建议:") if score.is_tradeable: print(f" ✅ 趋势质量良好 (≥60分),建议交易") if score.total_score >= 80: print(f" 💎 优秀趋势!建议重仓") elif score.total_score >= 70: print(f" ⭐ 良好趋势!建议中等仓位") else: print(f" 📌 及格趋势!建议轻仓试探") else: print(f" ❌ 趋势质量不足 (<60分),建议观望") if score.total_score < 40: print(f" ⚠️ 趋势混乱,避免交易") print("\n" + "="*70) if __name__ == "__main__": main()