| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 趋势质量评估器 (Trend Quality Evaluator)
- 多因子评分模型:0-100分制,≥60分触发交易
- 因子权重:
- - ADX趋势强度: 30%
- - 均线斜率: 25%
- - 波动率收缩: 20%
- - 多时间框架共振: 15%
- - 成交量确认: 10%
- """
- import numpy as np
- import pandas as pd
- import baostock as bs
- from dataclasses import dataclass
- from typing import Optional, Tuple
- import warnings
- warnings.filterwarnings('ignore')
- @dataclass
- class TrendQualityScore:
- """趋势质量评分结果"""
- total_score: float # 总分 0-100
- adx_score: float # ADX得分 0-30
- ma_slope_score: float # 均线斜率得分 0-25
- volatility_score: float # 波动率得分 0-20
- timeframe_score: float # 时间框架得分 0-15
- volume_score: float # 成交量得分 0-10
-
- is_tradeable: bool # 是否可交易 (>=60分)
-
- adx_value: float # 原始ADX值
- ma_slope: float # 均线斜率
- volatility_ratio: float # 波动率比率
- volume_ratio: float # 成交量比率
- class TrendQualityEvaluator:
- """趋势质量评估器"""
-
- def __init__(self):
- self.weights = {
- 'adx': 30,
- 'ma_slope': 25,
- 'volatility': 20,
- 'timeframe': 15,
- 'volume': 10
- }
-
- def calculate_adx(self, df: pd.DataFrame, period: int = 14) -> pd.Series:
- """计算ADX趋势强度指标"""
- high, low, close = df['high'], df['low'], df['close']
-
- # +DM和-DM
- plus_dm = high.diff()
- minus_dm = low.diff().abs()
-
- plus_dm = plus_dm.where((plus_dm > minus_dm) & (plus_dm > 0), 0)
- minus_dm = minus_dm.where((minus_dm > plus_dm) & (minus_dm > 0), 0)
-
- # 真实波幅 TR
- tr1 = high - low
- tr2 = (high - close.shift()).abs()
- tr3 = (low - close.shift()).abs()
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
-
- # ATR
- atr = tr.rolling(period).mean()
-
- # +DI和-DI
- plus_di = 100 * (plus_dm.rolling(period).mean() / atr)
- minus_di = 100 * (minus_dm.rolling(period).mean() / atr)
-
- # DX和ADX
- dx = (abs(plus_di - minus_di) / (plus_di + minus_di + 1e-10)) * 100
- adx = dx.rolling(period).mean()
-
- return adx
-
- def evaluate(self, df: pd.DataFrame, df_weekly: Optional[pd.DataFrame] = None) -> TrendQualityScore:
- """
- 评估趋势质量
-
- Args:
- df: 日线数据 DataFrame (需要包含 open, high, low, close, volume)
- df_weekly: 周线数据 (可选,用于多时间框架共振)
-
- Returns:
- TrendQualityScore: 评分结果
- """
- latest = df.iloc[-1]
-
- # 1. ADX趋势强度 (30分) - 阈值: ADX > 25
- adx = self.calculate_adx(df, 14)
- latest_adx = adx.iloc[-1]
- # 评分: ADX 0-50映射到0-30分,>25得满分
- if latest_adx >= 25:
- adx_score = 30
- else:
- adx_score = min(30, latest_adx * 30 / 25)
-
- # 2. 均线斜率 (25分) - 阈值: MA20/MA20[5] > 1.002
- ma20 = df['close'].rolling(20).mean()
- ma20_current = ma20.iloc[-1]
- ma20_5days_ago = ma20.iloc[-5] if len(ma20) >= 5 else ma20_current
- ma_slope = ma20_current / ma20_5days_ago if ma20_5days_ago > 0 else 1
- # 评分: 斜率 > 1.002得满分,线性递减
- if ma_slope >= 1.005:
- ma_slope_score = 25
- elif ma_slope >= 1.002:
- ma_slope_score = 25 * (ma_slope - 1.002) / (1.005 - 1.002) + 15
- elif ma_slope >= 1.0:
- ma_slope_score = 15 * (ma_slope - 1.0) / 0.002
- else:
- ma_slope_score = 0
-
- # 3. 波动率收缩 (20分) - 阈值: ATR(14)/ATR(50) < 0.8
- atr14 = self._calculate_atr(df, 14)
- atr50 = self._calculate_atr(df, 50)
- volatility_ratio = atr14 / atr50 if atr50 > 0 else 1
- # 评分: 比率 < 0.8得满分,越小越好
- if volatility_ratio <= 0.6:
- volatility_score = 20
- elif volatility_ratio <= 0.8:
- volatility_score = 20 - 10 * (volatility_ratio - 0.6) / 0.2
- elif volatility_ratio <= 1.0:
- volatility_score = 10 - 10 * (volatility_ratio - 0.8) / 0.2
- else:
- volatility_score = 0
-
- # 4. 多时间框架共振 (15分) - 日线突破+周线方向一致
- timeframe_score = 0
- if df_weekly is not None and len(df_weekly) >= 5:
- # 日线突破: 收盘价 > MA20
- daily_breakout = latest['close'] > ma20_current
- # 周线方向: 周线MA5 > MA10
- weekly_ma5 = df_weekly['close'].rolling(5).mean().iloc[-1]
- weekly_ma10 = df_weekly['close'].rolling(10).mean().iloc[-1]
- weekly_aligned = weekly_ma5 > weekly_ma10
-
- if daily_breakout and weekly_aligned:
- timeframe_score = 15
- elif daily_breakout or weekly_aligned:
- timeframe_score = 7.5
- else:
- # 无周线数据时,仅看日线突破
- daily_breakout = latest['close'] > ma20_current
- timeframe_score = 15 if daily_breakout else 0
-
- # 5. 成交量确认 (10分) - 突破当日成交量 > 20日均量1.5倍
- volume_ma20 = df['volume'].rolling(20).mean().iloc[-1]
- volume_ratio = latest['volume'] / volume_ma20 if volume_ma20 > 0 else 1
- # 评分: >1.5倍得满分
- if volume_ratio >= 2.0:
- volume_score = 10
- elif volume_ratio >= 1.5:
- volume_score = 10 - 5 * (2.0 - volume_ratio) / 0.5
- elif volume_ratio >= 1.0:
- volume_score = 5 - 5 * (1.5 - volume_ratio) / 0.5
- else:
- volume_score = 0
-
- # 计算总分
- total_score = adx_score + ma_slope_score + volatility_score + timeframe_score + volume_score
-
- return TrendQualityScore(
- total_score=round(total_score, 1),
- adx_score=round(adx_score, 1),
- ma_slope_score=round(ma_slope_score, 1),
- volatility_score=round(volatility_score, 1),
- timeframe_score=round(timeframe_score, 1),
- volume_score=round(volume_score, 1),
- is_tradeable=total_score >= 60,
- adx_value=round(latest_adx, 2),
- ma_slope=round(ma_slope, 4),
- volatility_ratio=round(volatility_ratio, 3),
- volume_ratio=round(volume_ratio, 2)
- )
-
- def _calculate_atr(self, df: pd.DataFrame, period: int) -> float:
- """计算ATR"""
- high, low, close = df['high'], df['low'], df['close']
- tr1 = high - low
- tr2 = (high - close.shift()).abs()
- tr3 = (low - close.shift()).abs()
- tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
- return tr.rolling(period).mean().iloc[-1]
- def fetch_stock_data(symbol: str, start_date: str, end_date: str, frequency: str = "d") -> Optional[pd.DataFrame]:
- """获取股票数据"""
- try:
- bs.login()
-
- if symbol.startswith('6'):
- code = f"sh.{symbol}"
- elif symbol.startswith('0') or symbol.startswith('3'):
- code = f"sz.{symbol}"
- else:
- code = symbol
-
- rs = bs.query_history_k_data_plus(
- code,
- "date,open,high,low,close,volume",
- start_date=start_date,
- end_date=end_date,
- frequency=frequency,
- adjustflag="3"
- )
-
- data = []
- while rs.error_code == '0' and rs.next():
- row = rs.get_row_data()
- data.append({
- 'date': row[0],
- 'open': float(row[1]),
- 'high': float(row[2]),
- 'low': float(row[3]),
- 'close': float(row[4]),
- 'volume': int(float(row[5]))
- })
-
- bs.logout()
-
- if not data:
- return None
-
- df = pd.DataFrame(data)
- df['date'] = pd.to_datetime(df['date'])
- df = df.set_index('date').sort_index()
- return df
-
- except Exception as e:
- print(f"数据获取失败: {e}")
- return None
- def main():
- """主函数 - 示例用法"""
- print("="*70)
- print("趋势质量评估器 (Trend Quality Evaluator)")
- print("="*70)
-
- # 示例: 评估创业板50
- symbol = "399673" # 创业板50
-
- print(f"\n评估标的: 创业板50 ({symbol})")
- print("-"*70)
-
- # 获取日线数据
- df_daily = fetch_stock_data(symbol, "2024-01-01", "2026-12-31", "d")
- if df_daily is None:
- print("数据获取失败")
- return
-
- # 获取周线数据(用于多时间框架共振)
- df_weekly = fetch_stock_data(symbol, "2023-01-01", "2026-12-31", "w")
-
- # 评估趋势质量
- evaluator = TrendQualityEvaluator()
- score = evaluator.evaluate(df_daily, df_weekly)
-
- # 打印结果
- print(f"\n📊 评估日期: {df_daily.index[-1].strftime('%Y-%m-%d')}")
- print(f"📈 当前价格: {df_daily['close'].iloc[-1]:.2f}")
- print()
- print("="*50)
- print("评分详情 (满分100分)")
- print("="*50)
- print(f"{'1. ADX趋势强度 (30分):':<25} {score.adx_score:>6.1f}分 (ADX={score.adx_value:.2f})")
- print(f"{'2. 均线斜率 (25分):':<25} {score.ma_slope_score:>6.1f}分 (斜率={score.ma_slope:.4f})")
- print(f"{'3. 波动率收缩 (20分):':<25} {score.volatility_score:>6.1f}分 (ATR比={score.volatility_ratio:.3f})")
- print(f"{'4. 多时间框架共振 (15分):':<25} {score.timeframe_score:>6.1f}分")
- print(f"{'5. 成交量确认 (10分):':<25} {score.volume_score:>6.1f}分 (量比={score.volume_ratio:.2f}x)")
- print("-"*50)
- print(f"{'总分:':<25} {score.total_score:>6.1f}分")
- print("="*50)
-
- # 交易建议
- print(f"\n🎯 交易建议:")
- if score.is_tradeable:
- print(f" ✅ 趋势质量良好 (≥60分),建议交易")
- if score.total_score >= 80:
- print(f" 💎 优秀趋势!建议重仓")
- elif score.total_score >= 70:
- print(f" ⭐ 良好趋势!建议中等仓位")
- else:
- print(f" 📌 及格趋势!建议轻仓试探")
- else:
- print(f" ❌ 趋势质量不足 (<60分),建议观望")
- if score.total_score < 40:
- print(f" ⚠️ 趋势混乱,避免交易")
-
- print("\n" + "="*70)
- if __name__ == "__main__":
- main()
|