""" 结构化日志系统 支持生态状态、智能体决策、风险事件分级记录 """ import json import logging import sys from datetime import datetime from enum import Enum from pathlib import Path from typing import Any, Dict, Optional import structlog class LogLevel(Enum): """日志级别""" DEBUG = "debug" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" class EventType(Enum): """事件类型""" # 生态事件 ECOSYSTEM_CHANGE = "ecosystem_change" REGIME_TRANSITION = "regime_transition" # 智能体事件 AGENT_SIGNAL = "agent_signal" AGENT_ROUTING = "agent_routing" AGENT_HEALTH = "agent_health" # 风险事件 RISK_BUDGET = "risk_budget" RISK_HEDGE = "risk_hedge" RISK_ALERT = "risk_alert" RISK_WARNING = "risk_warning" RISK_STOP = "risk_stop" # 执行事件 EXECUTION_ORDER = "execution_order" EXECUTION_FILL = "execution_fill" EXECUTION_COST = "execution_cost" # 系统事件 SYSTEM_START = "system_start" SYSTEM_STOP = "system_stop" SYSTEM_ERROR = "system_error" class CYB50Logger: """CYB50-Pro 结构化日志器""" def __init__(self, log_dir: str = "logs", log_level: str = "INFO"): self.log_dir = Path(log_dir) self.log_dir.mkdir(exist_ok=True) self.log_level = getattr(logging, log_level.upper()) self._configure_structlog() self._configure_standard_logging() self.logger = structlog.get_logger() def _configure_structlog(self): """配置 structlog""" structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) def _configure_standard_logging(self): """配置标准日志""" # 文件处理器 - 详细日志 detailed_handler = logging.FileHandler( self.log_dir / f"cyb50_pro_{datetime.now():%Y%m%d}.log" ) detailed_handler.setLevel(self.log_level) # 文件处理器 - 仅错误 error_handler = logging.FileHandler( self.log_dir / f"cyb50_pro_error_{datetime.now():%Y%m%d}.log" ) error_handler.setLevel(logging.ERROR) # 控制台处理器 console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(self.log_level) # 格式化器 formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) detailed_handler.setFormatter(formatter) error_handler.setFormatter(formatter) console_handler.setFormatter(formatter) # 根日志配置 logging.basicConfig( level=self.log_level, handlers=[detailed_handler, error_handler, console_handler] ) def log_ecosystem( self, event_type: EventType, macro_regime: str, meso_health: float, micro_state: str, confidence: float, details: Optional[Dict[str, Any]] = None ): """记录生态状态""" self.logger.info( event_type=event_type.value, category="ecosystem", macro_regime=macro_regime, meso_health=meso_health, micro_state=micro_state, confidence=confidence, details=details or {} ) def log_agent( self, event_type: EventType, agent_name: str, signal: Optional[str] = None, weight: Optional[float] = None, health_score: Optional[float] = None, expected_return: Optional[float] = None, details: Optional[Dict[str, Any]] = None ): """记录智能体决策""" self.logger.info( event_type=event_type.value, category="agent", agent_name=agent_name, signal=signal, weight=weight, health_score=health_score, expected_return=expected_return, details=details or {} ) def log_risk( self, event_type: EventType, level: str, # "info", "warning", "critical" metric_name: Optional[str] = None, metric_value: Optional[float] = None, threshold: Optional[float] = None, action: Optional[str] = None, details: Optional[Dict[str, Any]] = None ): """记录风险事件""" log_method = getattr(self.logger, level) log_method( event_type=event_type.value, category="risk", metric_name=metric_name, metric_value=metric_value, threshold=threshold, action=action, details=details or {} ) def log_execution( self, event_type: EventType, order_id: str, symbol: str, side: str, quantity: int, price: Optional[float] = None, cost: Optional[float] = None, strategy: Optional[str] = None, details: Optional[Dict[str, Any]] = None ): """记录执行事件""" self.logger.info( event_type=event_type.value, category="execution", order_id=order_id, symbol=symbol, side=side, quantity=quantity, price=price, cost=cost, strategy=strategy, details=details or {} ) def log_system( self, event_type: EventType, message: str, level: str = "info", details: Optional[Dict[str, Any]] = None ): """记录系统事件""" log_method = getattr(self.logger, level) log_method( event_type=event_type.value, category="system", message=message, details=details or {} ) # 全局日志实例 _logger: Optional[CYB50Logger] = None def get_logger(log_dir: str = "logs", log_level: str = "INFO") -> CYB50Logger: """获取全局日志实例""" global _logger if _logger is None: _logger = CYB50Logger(log_dir, log_level) return _logger def configure_logging(log_dir: str = "logs", log_level: str = "INFO"): """配置日志系统""" global _logger _logger = CYB50Logger(log_dir, log_level)