瀏覽代碼

修改数据获取逻辑: 只使用实时在线数据,移除本地文件备选,添加重试机制

openclaw 3 月之前
父節點
當前提交
bfd1ae8fde
共有 1 個文件被更改,包括 47 次插入106 次删除
  1. 47 106
      cat-fly/auto_report.py

+ 47 - 106
cat-fly/auto_report.py

@@ -67,123 +67,64 @@ def send_email(subject, html_content, text_content=""):
 
 # ==================== 数据获取 ====================
 class DataFetcher:
-    """数据获取类"""
+    """数据获取类 - 使用实时在线数据"""
     
     @staticmethod
     def fetch_recent_2months():
-        """获取近2个月数据 - 优先在线获取,失败则使用本地数据"""
+        """获取近2个月数据 - 使用实时在线数据"""
         end_date = datetime.now()
         start_date = end_date - timedelta(days=70)  # 2个月+10天缓冲
         
         print(f"获取数据: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
         
-        # 首先尝试在线获取
-        try:
-            df = ak.index_zh_a_hist_min_em(
-                symbol="399673",
-                period="30",
-                start_date=start_date.strftime('%Y%m%d%H%M%S'),
-                end_date=end_date.strftime('%Y%m%d%H%M%S')
-            )
-            
-            if df is not None and len(df) > 0:
-                df = df.rename(columns={
-                    '时间': 'datetime',
-                    '开盘': 'open',
-                    '收盘': 'close',
-                    '最高': 'high',
-                    '最低': 'low',
-                    '成交量': 'volume'
-                })
+        # 尝试在线获取(带重试机制)
+        max_retries = 3
+        for attempt in range(max_retries):
+            try:
+                print(f"[尝试 {attempt + 1}/{max_retries}] 正在使用东方财富30分钟K线接口...")
                 
-                df['datetime'] = pd.to_datetime(df['datetime'])
-                df = df.set_index('datetime').sort_index()
+                # 使用东方财富接口获取30分钟K线
+                df = ak.index_zh_a_hist_min_em(symbol="399673", period="30")
                 
-                backtest_start = end_date - timedelta(days=60)
-                df_backtest = df[df.index >= backtest_start]
-                
-                print(f"✅ 在线数据获取成功: 共{len(df_backtest)}条30分钟K线")
-                print(f"   数据区间: {df_backtest.index[0]} 至 {df_backtest.index[-1]}")
-                return df_backtest
-        except Exception as e:
-            print(f"⚠️ 在线数据获取失败: {e}")
-            print("   尝试使用本地数据...")
-        
-        # 在线获取失败,使用本地数据
-        return DataFetcher._load_local_data(start_date, end_date)
-    
-    @staticmethod
-    def _load_local_data(start_date, end_date):
-        """从本地文件加载数据"""
-        local_file = 'SZ#399673.txt'
-        
-        try:
-            if not os.path.exists(local_file):
-                print(f"❌ 本地文件不存在: {local_file}")
-                return None
-            
-            print(f"正在从本地文件读取数据: {local_file}")
-            
-            # 读取文本格式数据
-            data_list = []
-            encodings = ['gbk', 'gb2312', 'utf-8', 'latin-1']
-            lines = None
-            
-            for encoding in encodings:
-                try:
-                    with open(local_file, 'r', encoding=encoding) as f:
-                        lines = f.readlines()
-                    print(f"✅ 成功使用编码: {encoding}")
-                    break
-                except UnicodeDecodeError:
-                    continue
-            
-            if lines is None:
-                raise ValueError("无法读取文件,尝试了多种编码格式都失败")
-            
-            # 跳过前两行(标题行)
-            for line in lines[2:]:
-                line = line.strip()
-                if not line:
-                    continue
+                if df is not None and not df.empty and len(df) >= 50:
+                    # 标准化列名
+                    df = df.rename(columns={
+                        '时间': 'datetime',
+                        '开盘': 'open',
+                        '收盘': 'close',
+                        '最高': 'high',
+                        '最低': 'low',
+                        '成交量': 'volume'
+                    })
                     
-                parts = line.split()
-                if len(parts) >= 7:
-                    try:
-                        # 格式: 日期 时间 开盘 最高 最低 收盘 成交量 成交额
-                        date_time_str = f"{parts[0]} {parts[1]}"
-                        datetime_obj = pd.to_datetime(date_time_str, format='%Y/%m/%d %H%M')
-                        
-                        data_list.append({
-                            'datetime': datetime_obj,
-                            'open': float(parts[2]),
-                            'high': float(parts[3]),
-                            'low': float(parts[4]),
-                            'close': float(parts[5]),
-                            'volume': float(parts[6])
-                        })
-                    except (ValueError, IndexError) as e:
-                        continue
-            
-            if not data_list:
-                raise ValueError("文本文件中没有解析到有效数据")
-            
-            df = pd.DataFrame(data_list)
-            df = df.set_index('datetime').sort_index()
-            
-            # 只保留最近2个月的数据
-            backtest_start = end_date - timedelta(days=60)
-            df_backtest = df[df.index >= backtest_start]
-            
-            print(f"✅ 本地数据加载成功: 共{len(df_backtest)}条30分钟K线")
-            if len(df_backtest) > 0:
-                print(f"   数据区间: {df_backtest.index[0]} 至 {df_backtest.index[-1]}")
-            
-            return df_backtest
-            
-        except Exception as e:
-            print(f"❌ 本地数据加载失败: {e}")
-            return None
+                    df['datetime'] = pd.to_datetime(df['datetime'])
+                    df = df.set_index('datetime').sort_index()
+                    
+                    # 只保留最近2个月的数据用于回测
+                    backtest_start = end_date - timedelta(days=60)
+                    df_backtest = df[df.index >= backtest_start]
+                    
+                    print(f"✅ 数据获取成功: 共{len(df_backtest)}条30分钟K线")
+                    print(f"   数据区间: {df_backtest.index[0]} 至 {df_backtest.index[-1]}")
+                    
+                    # 检查数据时效性
+                    latest_time = df_backtest.index[-1]
+                    time_delay = end_date - latest_time
+                    print(f"   数据延迟: {time_delay}")
+                    
+                    return df_backtest
+                else:
+                    print(f"⚠️ 获取到的数据不足: {len(df) if df is not None else 0}条")
+                    
+            except Exception as e:
+                print(f"❌ 尝试 {attempt + 1} 失败: {e}")
+                if attempt < max_retries - 1:
+                    import time
+                    print(f"   等待3秒后重试...")
+                    time.sleep(3)
+        
+        # 所有尝试都失败
+        raise Exception("无法获取实时数据,所有数据源均失败。请检查网络连接或稍后重试。")
 
 
 # ==================== 策略类 ====================