""" 智能体协同机制 管理多智能体协同,处理信号冲突和叠加 """ from dataclasses import dataclass, field from typing import Dict, List, Optional, Tuple, Any from datetime import datetime from enum import Enum import uuid import pandas as pd from agents.base import AgentBase, AgentSignal, SignalDirection class ConflictResolutionMethod(Enum): """冲突解决方式""" CONFIDENCE_WEIGHTED = "confidence_weighted" HIGHER_WINS = "higher_wins" CANCEL = "cancel" @dataclass class CoordinatedSignal: """协同后的信号""" original_signals: List[AgentSignal] final_direction: SignalDirection final_strength: float # 0-1 final_position: float # 0-1 coordination_type: str # "conflict_resolved", "reinforced", "single" reasoning: str timestamp: datetime = field(default_factory=datetime.now) @dataclass class ReinforcementResult: """信号叠加增强结果""" is_reinforced: bool direction: SignalDirection boost_factor: float original_count: int avg_confidence: float class AgentCoordinator: """ 智能体协同器 功能: 1. 信号冲突处理:当智能体生成反向信号时,比较置信度处理 2. 信号叠加增强:当多个智能体生成同向信号时,提升仓位上限 """ def __init__( self, conflict_resolution: ConflictResolutionMethod = ConflictResolutionMethod.CONFIDENCE_WEIGHTED, min_confidence_diff: float = 0.2, reinforcement_threshold: int = 3, position_boost: float = 0.2, stop_tighten: float = 0.5, max_position_boost: float = 0.3 ): self.conflict_resolution = conflict_resolution self.min_confidence_diff = min_confidence_diff self.reinforcement_threshold = reinforcement_threshold self.position_boost = position_boost self.stop_tighten = stop_tighten self.max_position_boost = max_position_boost self.coordination_history: List[CoordinatedSignal] = [] def coordinate( self, signals: Dict[str, AgentSignal], weights: Dict[str, float] ) -> Optional[CoordinatedSignal]: """ 协同多个智能体的信号 Args: signals: 各智能体的信号 weights: 各智能体的权重 Returns: CoordinatedSignal: 协同后的信号 """ if not signals: return None # 单信号直接返回 if len(signals) == 1: signal = list(signals.values())[0] return CoordinatedSignal( original_signals=[signal], final_direction=signal.direction, final_strength=signal.confidence, final_position=signal.suggested_position, coordination_type="single", reasoning="单信号,无需协同" ) # 分组:多头、空头、中性 long_signals = [] short_signals = [] neutral_signals = [] for name, signal in signals.items(): if signal.direction == SignalDirection.LONG: long_signals.append((name, signal)) elif signal.direction == SignalDirection.SHORT: short_signals.append((name, signal)) else: neutral_signals.append((name, signal)) # 情况1: 只有同向信号 - 叠加增强 if long_signals and not short_signals: return self._reinforce_signals(long_signals, weights, SignalDirection.LONG) if short_signals and not long_signals: return self._reinforce_signals(short_signals, weights, SignalDirection.SHORT) # 情况2: 有反向信号 - 冲突处理 if long_signals and short_signals: return self._resolve_conflict(long_signals, short_signals, weights) # 情况3: 只有中性信号 if neutral_signals: avg_confidence = sum(s.confidence for _, s in neutral_signals) / len(neutral_signals) return CoordinatedSignal( original_signals=[s for _, s in neutral_signals], final_direction=SignalDirection.NEUTRAL, final_strength=avg_confidence, final_position=0.0, coordination_type="conflict_resolved", reasoning="所有智能体均输出中性信号,观望" ) return None def _reinforce_signals( self, signals: List[Tuple[str, AgentSignal]], weights: Dict[str, float], direction: SignalDirection ) -> CoordinatedSignal: """ 信号叠加增强 当多个智能体生成同向信号时: 1. 提升该方向仓位上限 2. 收紧止损 """ if len(signals) < self.reinforcement_threshold: # 未达到增强阈值,正常加权 weighted_position = sum( signal.suggested_position * weights.get(name, 1/len(signals)) for name, signal in signals ) avg_confidence = sum(s.confidence for _, s in signals) / len(signals) coordinated = CoordinatedSignal( original_signals=[s for _, s in signals], final_direction=direction, final_strength=avg_confidence, final_position=min(1.0, weighted_position), coordination_type="reinforced", reasoning=f"{len(signals)}个智能体同向,未达增强阈值" ) else: # 达到增强阈值,提升仓位 base_position = sum( signal.suggested_position * weights.get(name, 1/len(signals)) for name, signal in signals ) # 计算增强因子 boost = min( self.max_position_boost, self.position_boost * (len(signals) - self.reinforcement_threshold + 1) ) enhanced_position = min(1.0, base_position * (1 + boost)) avg_confidence = sum(s.confidence for _, s in signals) / len(signals) coordinated = CoordinatedSignal( original_signals=[s for _, s in signals], final_direction=direction, final_strength=min(1.0, avg_confidence * 1.1), # 置信度小幅提升 final_position=enhanced_position, coordination_type="reinforced", reasoning=f"{len(signals)}个智能体同向,仓位提升{boost:.1%},止损收紧{self.stop_tighten:.1%}" ) self.coordination_history.append(coordinated) return coordinated def _resolve_conflict( self, long_signals: List[Tuple[str, AgentSignal]], short_signals: List[Tuple[str, AgentSignal]], weights: Dict[str, float] ) -> CoordinatedSignal: """ 信号冲突处理 当存在反向信号时: 1. 比较双方加权置信度 2. 根据配置的策略处理冲突 """ # 计算各方加权力量和 long_strength = sum( signal.confidence * weights.get(name, 0.5) for name, signal in long_signals ) short_strength = sum( signal.confidence * weights.get(name, 0.5) for name, signal in short_signals ) # 计算数量 long_count = len(long_signals) short_count = len(short_signals) all_signals = [s for _, s in long_signals] + [s for _, s in short_signals] # 冲突解决策略 if self.conflict_resolution == ConflictResolutionMethod.CANCEL: # 取消所有信号,观望 return CoordinatedSignal( original_signals=all_signals, final_direction=SignalDirection.NEUTRAL, final_strength=0.0, final_position=0.0, coordination_type="conflict_resolved", reasoning=f"信号冲突(多:{long_count} vs 空:{short_count}),取消所有信号观望" ) elif self.conflict_resolution == ConflictResolutionMethod.HIGHER_WINS: # 较强方获胜 if long_strength > short_strength: winner_signals = long_signals winner_direction = SignalDirection.LONG else: winner_signals = short_signals winner_direction = SignalDirection.SHORT avg_confidence = sum(s.confidence for _, s in winner_signals) / len(winner_signals) weighted_position = sum( signal.suggested_position * weights.get(name, 1/len(winner_signals)) for name, signal in winner_signals ) return CoordinatedSignal( original_signals=all_signals, final_direction=winner_direction, final_strength=avg_confidence, final_position=weighted_position, coordination_type="conflict_resolved", reasoning=f"信号冲突,{winner_direction.value}方获胜(力量比{long_strength:.2f}:{short_strength:.2f})" ) else: # CONFIDENCE_WEIGHTED # 按置信度加权执行双方 total_strength = long_strength + short_strength if total_strength == 0: return CoordinatedSignal( original_signals=all_signals, final_direction=SignalDirection.NEUTRAL, final_strength=0.0, final_position=0.0, coordination_type="conflict_resolved", reasoning="信号冲突,双方力量均为0,观望" ) long_ratio = long_strength / total_strength short_ratio = short_strength / total_strength # 净仓位 net_position = long_ratio - short_ratio if abs(net_position) < self.min_confidence_diff: return CoordinatedSignal( original_signals=all_signals, final_direction=SignalDirection.NEUTRAL, final_strength=abs(net_position), final_position=0.0, coordination_type="conflict_resolved", reasoning=f"信号冲突,净仓位{net_position:.2f}低于阈值,观望" ) # 确定方向和仓位 if net_position > 0: direction = SignalDirection.LONG position = min(1.0, net_position) else: direction = SignalDirection.SHORT position = min(1.0, abs(net_position)) # 综合置信度 avg_confidence = sum(s.confidence for s in all_signals) / len(all_signals) return CoordinatedSignal( original_signals=all_signals, final_direction=direction, final_strength=avg_confidence, final_position=position, coordination_type="conflict_resolved", reasoning=f"信号冲突,置信度加权结果:{direction.value},仓位{position:.2f}(多{long_ratio:.2f} vs 空{short_ratio:.2f})" ) def get_coordination_summary(self) -> Dict[str, Any]: """获取协同历史摘要""" if not self.coordination_history: return {} total = len(self.coordination_history) type_counts = {} for coord in self.coordination_history: type_counts[coord.coordination_type] = type_counts.get(coord.coordination_type, 0) + 1 return { "total_coordinations": total, "type_distribution": type_counts, "avg_final_position": sum(c.final_position for c in self.coordination_history) / total, "last_direction": self.coordination_history[-1].final_direction.value if self.coordination_history else None }