|
|
@@ -0,0 +1,302 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+趋势质量评估器 (Trend Quality Evaluator)
|
|
|
+多因子评分模型:0-100分制,≥60分触发交易
|
|
|
+
|
|
|
+因子权重:
|
|
|
+- ADX趋势强度: 30%
|
|
|
+- 均线斜率: 25%
|
|
|
+- 波动率收缩: 20%
|
|
|
+- 多时间框架共振: 15%
|
|
|
+- 成交量确认: 10%
|
|
|
+"""
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+import baostock as bs
|
|
|
+from dataclasses import dataclass
|
|
|
+from typing import Optional, Tuple
|
|
|
+import warnings
|
|
|
+warnings.filterwarnings('ignore')
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class TrendQualityScore:
|
|
|
+ """趋势质量评分结果"""
|
|
|
+ total_score: float # 总分 0-100
|
|
|
+ adx_score: float # ADX得分 0-30
|
|
|
+ ma_slope_score: float # 均线斜率得分 0-25
|
|
|
+ volatility_score: float # 波动率得分 0-20
|
|
|
+ timeframe_score: float # 时间框架得分 0-15
|
|
|
+ volume_score: float # 成交量得分 0-10
|
|
|
+
|
|
|
+ is_tradeable: bool # 是否可交易 (>=60分)
|
|
|
+
|
|
|
+ adx_value: float # 原始ADX值
|
|
|
+ ma_slope: float # 均线斜率
|
|
|
+ volatility_ratio: float # 波动率比率
|
|
|
+ volume_ratio: float # 成交量比率
|
|
|
+
|
|
|
+
|
|
|
+class TrendQualityEvaluator:
|
|
|
+ """趋势质量评估器"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.weights = {
|
|
|
+ 'adx': 30,
|
|
|
+ 'ma_slope': 25,
|
|
|
+ 'volatility': 20,
|
|
|
+ 'timeframe': 15,
|
|
|
+ 'volume': 10
|
|
|
+ }
|
|
|
+
|
|
|
+ def calculate_adx(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
|
|
|
+ """计算ADX趋势强度指标"""
|
|
|
+ high, low, close = df['high'], df['low'], df['close']
|
|
|
+
|
|
|
+ # +DM和-DM
|
|
|
+ plus_dm = high.diff()
|
|
|
+ minus_dm = low.diff().abs()
|
|
|
+
|
|
|
+ plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
|
|
|
+ minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
|
|
|
+
|
|
|
+ # 真实波幅 TR
|
|
|
+ tr1 = high - low
|
|
|
+ tr2 = (high - close.shift()).abs()
|
|
|
+ tr3 = (low - close.shift()).abs()
|
|
|
+ tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
|
|
|
+
|
|
|
+ # ATR
|
|
|
+ atr = tr.rolling(period).mean()
|
|
|
+
|
|
|
+ # +DI和-DI
|
|
|
+ plus_di = 100 * (plus_dm.rolling(period).mean() / atr)
|
|
|
+ minus_di = 100 * (minus_dm.rolling(period).mean() / atr)
|
|
|
+
|
|
|
+ # DX和ADX
|
|
|
+ dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)) * 100
|
|
|
+ adx = dx.rolling(period).mean()
|
|
|
+
|
|
|
+ return adx
|
|
|
+
|
|
|
+ def evaluate(self, df: pd.DataFrame, df_weekly: Optional[pd.DataFrame] = None) -> TrendQualityScore:
|
|
|
+ """
|
|
|
+ 评估趋势质量
|
|
|
+
|
|
|
+ Args:
|
|
|
+ df: 日线数据 DataFrame (需要包含 open, high, low, close, volume)
|
|
|
+ df_weekly: 周线数据 (可选,用于多时间框架共振)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ TrendQualityScore: 评分结果
|
|
|
+ """
|
|
|
+ latest = df.iloc[-1]
|
|
|
+
|
|
|
+ # 1. ADX趋势强度 (30分) - 阈值: ADX > 25
|
|
|
+ adx = self.calculate_adx(df, 14)
|
|
|
+ latest_adx = adx.iloc[-1]
|
|
|
+ # 评分: ADX 0-50映射到0-30分,>25得满分
|
|
|
+ if latest_adx >= 25:
|
|
|
+ adx_score = 30
|
|
|
+ else:
|
|
|
+ adx_score = min(30, latest_adx * 30 / 25)
|
|
|
+
|
|
|
+ # 2. 均线斜率 (25分) - 阈值: MA20/MA20[5] > 1.002
|
|
|
+ ma20 = df['close'].rolling(20).mean()
|
|
|
+ ma20_current = ma20.iloc[-1]
|
|
|
+ ma20_5days_ago = ma20.iloc[-5] if len(ma20) >= 5 else ma20_current
|
|
|
+ ma_slope = ma20_current / ma20_5days_ago if ma20_5days_ago > 0 else 1
|
|
|
+ # 评分: 斜率 > 1.002得满分,线性递减
|
|
|
+ if ma_slope >= 1.005:
|
|
|
+ ma_slope_score = 25
|
|
|
+ elif ma_slope >= 1.002:
|
|
|
+ ma_slope_score = 25 * (ma_slope - 1.002) / (1.005 - 1.002) + 15
|
|
|
+ elif ma_slope >= 1.0:
|
|
|
+ ma_slope_score = 15 * (ma_slope - 1.0) / 0.002
|
|
|
+ else:
|
|
|
+ ma_slope_score = 0
|
|
|
+
|
|
|
+ # 3. 波动率收缩 (20分) - 阈值: ATR(14)/ATR(50) < 0.8
|
|
|
+ atr14 = self._calculate_atr(df, 14)
|
|
|
+ atr50 = self._calculate_atr(df, 50)
|
|
|
+ volatility_ratio = atr14 / atr50 if atr50 > 0 else 1
|
|
|
+ # 评分: 比率 < 0.8得满分,越小越好
|
|
|
+ if volatility_ratio <= 0.6:
|
|
|
+ volatility_score = 20
|
|
|
+ elif volatility_ratio <= 0.8:
|
|
|
+ volatility_score = 20 - 10 * (volatility_ratio - 0.6) / 0.2
|
|
|
+ elif volatility_ratio <= 1.0:
|
|
|
+ volatility_score = 10 - 10 * (volatility_ratio - 0.8) / 0.2
|
|
|
+ else:
|
|
|
+ volatility_score = 0
|
|
|
+
|
|
|
+ # 4. 多时间框架共振 (15分) - 日线突破+周线方向一致
|
|
|
+ timeframe_score = 0
|
|
|
+ if df_weekly is not None and len(df_weekly) >= 5:
|
|
|
+ # 日线突破: 收盘价 > MA20
|
|
|
+ daily_breakout = latest['close'] > ma20_current
|
|
|
+ # 周线方向: 周线MA5 > MA10
|
|
|
+ weekly_ma5 = df_weekly['close'].rolling(5).mean().iloc[-1]
|
|
|
+ weekly_ma10 = df_weekly['close'].rolling(10).mean().iloc[-1]
|
|
|
+ weekly_aligned = weekly_ma5 > weekly_ma10
|
|
|
+
|
|
|
+ if daily_breakout and weekly_aligned:
|
|
|
+ timeframe_score = 15
|
|
|
+ elif daily_breakout or weekly_aligned:
|
|
|
+ timeframe_score = 7.5
|
|
|
+ else:
|
|
|
+ # 无周线数据时,仅看日线突破
|
|
|
+ daily_breakout = latest['close'] > ma20_current
|
|
|
+ timeframe_score = 15 if daily_breakout else 0
|
|
|
+
|
|
|
+ # 5. 成交量确认 (10分) - 突破当日成交量 > 20日均量1.5倍
|
|
|
+ volume_ma20 = df['volume'].rolling(20).mean().iloc[-1]
|
|
|
+ volume_ratio = latest['volume'] / volume_ma20 if volume_ma20 > 0 else 1
|
|
|
+ # 评分: >1.5倍得满分
|
|
|
+ if volume_ratio >= 2.0:
|
|
|
+ volume_score = 10
|
|
|
+ elif volume_ratio >= 1.5:
|
|
|
+ volume_score = 10 - 5 * (2.0 - volume_ratio) / 0.5
|
|
|
+ elif volume_ratio >= 1.0:
|
|
|
+ volume_score = 5 - 5 * (1.5 - volume_ratio) / 0.5
|
|
|
+ else:
|
|
|
+ volume_score = 0
|
|
|
+
|
|
|
+ # 计算总分
|
|
|
+ total_score = adx_score + ma_slope_score + volatility_score + timeframe_score + volume_score
|
|
|
+
|
|
|
+ return TrendQualityScore(
|
|
|
+ total_score=round(total_score, 1),
|
|
|
+ adx_score=round(adx_score, 1),
|
|
|
+ ma_slope_score=round(ma_slope_score, 1),
|
|
|
+ volatility_score=round(volatility_score, 1),
|
|
|
+ timeframe_score=round(timeframe_score, 1),
|
|
|
+ volume_score=round(volume_score, 1),
|
|
|
+ is_tradeable=total_score >= 60,
|
|
|
+ adx_value=round(latest_adx, 2),
|
|
|
+ ma_slope=round(ma_slope, 4),
|
|
|
+ volatility_ratio=round(volatility_ratio, 3),
|
|
|
+ volume_ratio=round(volume_ratio, 2)
|
|
|
+ )
|
|
|
+
|
|
|
+ def _calculate_atr(self, df: pd.DataFrame, period: int) -> float:
|
|
|
+ """计算ATR"""
|
|
|
+ high, low, close = df['high'], df['low'], df['close']
|
|
|
+ tr1 = high - low
|
|
|
+ tr2 = (high - close.shift()).abs()
|
|
|
+ tr3 = (low - close.shift()).abs()
|
|
|
+ tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
|
|
|
+ return tr.rolling(period).mean().iloc[-1]
|
|
|
+
|
|
|
+
|
|
|
+def fetch_stock_data(symbol: str, start_date: str, end_date: str, frequency: str = "d") -> Optional[pd.DataFrame]:
|
|
|
+ """获取股票数据"""
|
|
|
+ try:
|
|
|
+ bs.login()
|
|
|
+
|
|
|
+ if symbol.startswith('6'):
|
|
|
+ code = f"sh.{symbol}"
|
|
|
+ elif symbol.startswith('0') or symbol.startswith('3'):
|
|
|
+ code = f"sz.{symbol}"
|
|
|
+ else:
|
|
|
+ code = symbol
|
|
|
+
|
|
|
+ rs = bs.query_history_k_data_plus(
|
|
|
+ code,
|
|
|
+ "date,open,high,low,close,volume",
|
|
|
+ start_date=start_date,
|
|
|
+ end_date=end_date,
|
|
|
+ frequency=frequency,
|
|
|
+ adjustflag="3"
|
|
|
+ )
|
|
|
+
|
|
|
+ data = []
|
|
|
+ while rs.error_code == '0' and rs.next():
|
|
|
+ row = rs.get_row_data()
|
|
|
+ data.append({
|
|
|
+ 'date': row[0],
|
|
|
+ 'open': float(row[1]),
|
|
|
+ 'high': float(row[2]),
|
|
|
+ 'low': float(row[3]),
|
|
|
+ 'close': float(row[4]),
|
|
|
+ 'volume': int(float(row[5]))
|
|
|
+ })
|
|
|
+
|
|
|
+ bs.logout()
|
|
|
+
|
|
|
+ if not data:
|
|
|
+ return None
|
|
|
+
|
|
|
+ df = pd.DataFrame(data)
|
|
|
+ df['date'] = pd.to_datetime(df['date'])
|
|
|
+ df = df.set_index('date').sort_index()
|
|
|
+ return df
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"数据获取失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主函数 - 示例用法"""
|
|
|
+ print("="*70)
|
|
|
+ print("趋势质量评估器 (Trend Quality Evaluator)")
|
|
|
+ print("="*70)
|
|
|
+
|
|
|
+ # 示例: 评估创业板50
|
|
|
+ symbol = "399673" # 创业板50
|
|
|
+
|
|
|
+ print(f"\n评估标的: 创业板50 ({symbol})")
|
|
|
+ print("-"*70)
|
|
|
+
|
|
|
+ # 获取日线数据
|
|
|
+ df_daily = fetch_stock_data(symbol, "2024-01-01", "2026-12-31", "d")
|
|
|
+ if df_daily is None:
|
|
|
+ print("数据获取失败")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 获取周线数据(用于多时间框架共振)
|
|
|
+ df_weekly = fetch_stock_data(symbol, "2023-01-01", "2026-12-31", "w")
|
|
|
+
|
|
|
+ # 评估趋势质量
|
|
|
+ evaluator = TrendQualityEvaluator()
|
|
|
+ score = evaluator.evaluate(df_daily, df_weekly)
|
|
|
+
|
|
|
+ # 打印结果
|
|
|
+ print(f"\n📊 评估日期: {df_daily.index[-1].strftime('%Y-%m-%d')}")
|
|
|
+ print(f"📈 当前价格: {df_daily['close'].iloc[-1]:.2f}")
|
|
|
+ print()
|
|
|
+ print("="*50)
|
|
|
+ print("评分详情 (满分100分)")
|
|
|
+ print("="*50)
|
|
|
+ print(f"{'1. ADX趋势强度 (30分):':<25} {score.adx_score:>6.1f}分 (ADX={score.adx_value:.2f})")
|
|
|
+ print(f"{'2. 均线斜率 (25分):':<25} {score.ma_slope_score:>6.1f}分 (斜率={score.ma_slope:.4f})")
|
|
|
+ print(f"{'3. 波动率收缩 (20分):':<25} {score.volatility_score:>6.1f}分 (ATR比={score.volatility_ratio:.3f})")
|
|
|
+ print(f"{'4. 多时间框架共振 (15分):':<25} {score.timeframe_score:>6.1f}分")
|
|
|
+ print(f"{'5. 成交量确认 (10分):':<25} {score.volume_score:>6.1f}分 (量比={score.volume_ratio:.2f}x)")
|
|
|
+ print("-"*50)
|
|
|
+ print(f"{'总分:':<25} {score.total_score:>6.1f}分")
|
|
|
+ print("="*50)
|
|
|
+
|
|
|
+ # 交易建议
|
|
|
+ print(f"\n🎯 交易建议:")
|
|
|
+ if score.is_tradeable:
|
|
|
+ print(f" ✅ 趋势质量良好 (≥60分),建议交易")
|
|
|
+ if score.total_score >= 80:
|
|
|
+ print(f" 💎 优秀趋势!建议重仓")
|
|
|
+ elif score.total_score >= 70:
|
|
|
+ print(f" ⭐ 良好趋势!建议中等仓位")
|
|
|
+ else:
|
|
|
+ print(f" 📌 及格趋势!建议轻仓试探")
|
|
|
+ else:
|
|
|
+ print(f" ❌ 趋势质量不足 (<60分),建议观望")
|
|
|
+ if score.total_score < 40:
|
|
|
+ print(f" ⚠️ 趋势混乱,避免交易")
|
|
|
+
|
|
|
+ print("\n" + "="*70)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|