| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- """
- 结构化日志系统
- 支持生态状态、智能体决策、风险事件分级记录
- """
- import json
- import logging
- import sys
- from datetime import datetime
- from enum import Enum
- from pathlib import Path
- from typing import Any, Dict, Optional
- import structlog
- class LogLevel(Enum):
- """日志级别"""
- DEBUG = "debug"
- INFO = "info"
- WARNING = "warning"
- ERROR = "error"
- CRITICAL = "critical"
- class EventType(Enum):
- """事件类型"""
- # 生态事件
- ECOSYSTEM_CHANGE = "ecosystem_change"
- REGIME_TRANSITION = "regime_transition"
- # 智能体事件
- AGENT_SIGNAL = "agent_signal"
- AGENT_ROUTING = "agent_routing"
- AGENT_HEALTH = "agent_health"
- # 风险事件
- RISK_BUDGET = "risk_budget"
- RISK_HEDGE = "risk_hedge"
- RISK_ALERT = "risk_alert"
- RISK_WARNING = "risk_warning"
- RISK_STOP = "risk_stop"
- # 执行事件
- EXECUTION_ORDER = "execution_order"
- EXECUTION_FILL = "execution_fill"
- EXECUTION_COST = "execution_cost"
- # 系统事件
- SYSTEM_START = "system_start"
- SYSTEM_STOP = "system_stop"
- SYSTEM_ERROR = "system_error"
- class CYB50Logger:
- """CYB50-Pro 结构化日志器"""
- def __init__(self, log_dir: str = "logs", log_level: str = "INFO"):
- self.log_dir = Path(log_dir)
- self.log_dir.mkdir(exist_ok=True)
- self.log_level = getattr(logging, log_level.upper())
- self._configure_structlog()
- self._configure_standard_logging()
- self.logger = structlog.get_logger()
- def _configure_structlog(self):
- """配置 structlog"""
- structlog.configure(
- processors=[
- structlog.stdlib.filter_by_level,
- structlog.stdlib.add_logger_name,
- structlog.stdlib.add_log_level,
- structlog.stdlib.PositionalArgumentsFormatter(),
- structlog.processors.TimeStamper(fmt="iso"),
- structlog.processors.StackInfoRenderer(),
- structlog.processors.format_exc_info,
- structlog.processors.UnicodeDecoder(),
- structlog.processors.JSONRenderer()
- ],
- context_class=dict,
- logger_factory=structlog.stdlib.LoggerFactory(),
- wrapper_class=structlog.stdlib.BoundLogger,
- cache_logger_on_first_use=True,
- )
- def _configure_standard_logging(self):
- """配置标准日志"""
- # 文件处理器 - 详细日志
- detailed_handler = logging.FileHandler(
- self.log_dir / f"cyb50_pro_{datetime.now():%Y%m%d}.log"
- )
- detailed_handler.setLevel(self.log_level)
- # 文件处理器 - 仅错误
- error_handler = logging.FileHandler(
- self.log_dir / f"cyb50_pro_error_{datetime.now():%Y%m%d}.log"
- )
- error_handler.setLevel(logging.ERROR)
- # 控制台处理器
- console_handler = logging.StreamHandler(sys.stdout)
- console_handler.setLevel(self.log_level)
- # 格式化器
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
- detailed_handler.setFormatter(formatter)
- error_handler.setFormatter(formatter)
- console_handler.setFormatter(formatter)
- # 根日志配置
- logging.basicConfig(
- level=self.log_level,
- handlers=[detailed_handler, error_handler, console_handler]
- )
- def log_ecosystem(
- self,
- event_type: EventType,
- macro_regime: str,
- meso_health: float,
- micro_state: str,
- confidence: float,
- details: Optional[Dict[str, Any]] = None
- ):
- """记录生态状态"""
- self.logger.info(
- event_type=event_type.value,
- category="ecosystem",
- macro_regime=macro_regime,
- meso_health=meso_health,
- micro_state=micro_state,
- confidence=confidence,
- details=details or {}
- )
- def log_agent(
- self,
- event_type: EventType,
- agent_name: str,
- signal: Optional[str] = None,
- weight: Optional[float] = None,
- health_score: Optional[float] = None,
- expected_return: Optional[float] = None,
- details: Optional[Dict[str, Any]] = None
- ):
- """记录智能体决策"""
- self.logger.info(
- event_type=event_type.value,
- category="agent",
- agent_name=agent_name,
- signal=signal,
- weight=weight,
- health_score=health_score,
- expected_return=expected_return,
- details=details or {}
- )
- def log_risk(
- self,
- event_type: EventType,
- level: str, # "info", "warning", "critical"
- metric_name: Optional[str] = None,
- metric_value: Optional[float] = None,
- threshold: Optional[float] = None,
- action: Optional[str] = None,
- details: Optional[Dict[str, Any]] = None
- ):
- """记录风险事件"""
- log_method = getattr(self.logger, level)
- log_method(
- event_type=event_type.value,
- category="risk",
- metric_name=metric_name,
- metric_value=metric_value,
- threshold=threshold,
- action=action,
- details=details or {}
- )
- def log_execution(
- self,
- event_type: EventType,
- order_id: str,
- symbol: str,
- side: str,
- quantity: int,
- price: Optional[float] = None,
- cost: Optional[float] = None,
- strategy: Optional[str] = None,
- details: Optional[Dict[str, Any]] = None
- ):
- """记录执行事件"""
- self.logger.info(
- event_type=event_type.value,
- category="execution",
- order_id=order_id,
- symbol=symbol,
- side=side,
- quantity=quantity,
- price=price,
- cost=cost,
- strategy=strategy,
- details=details or {}
- )
- def log_system(
- self,
- event_type: EventType,
- message: str,
- level: str = "info",
- details: Optional[Dict[str, Any]] = None
- ):
- """记录系统事件"""
- log_method = getattr(self.logger, level)
- log_method(
- event_type=event_type.value,
- category="system",
- message=message,
- details=details or {}
- )
- # 全局日志实例
- _logger: Optional[CYB50Logger] = None
- def get_logger(log_dir: str = "logs", log_level: str = "INFO") -> CYB50Logger:
- """获取全局日志实例"""
- global _logger
- if _logger is None:
- _logger = CYB50Logger(log_dir, log_level)
- return _logger
- def configure_logging(log_dir: str = "logs", log_level: str = "INFO"):
- """配置日志系统"""
- global _logger
- _logger = CYB50Logger(log_dir, log_level)
|