| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331 |
- """
- 智能体协同机制
- 管理多智能体协同,处理信号冲突和叠加
- """
- from dataclasses import dataclass, field
- from typing import Dict, List, Optional, Tuple, Any
- from datetime import datetime
- from enum import Enum
- import uuid
- import pandas as pd
- from agents.base import AgentBase, AgentSignal, SignalDirection
- class ConflictResolutionMethod(Enum):
- """冲突解决方式"""
- CONFIDENCE_WEIGHTED = "confidence_weighted"
- HIGHER_WINS = "higher_wins"
- CANCEL = "cancel"
- @dataclass
- class CoordinatedSignal:
- """协同后的信号"""
- original_signals: List[AgentSignal]
- final_direction: SignalDirection
- final_strength: float # 0-1
- final_position: float # 0-1
- coordination_type: str # "conflict_resolved", "reinforced", "single"
- reasoning: str
- timestamp: datetime = field(default_factory=datetime.now)
- @dataclass
- class ReinforcementResult:
- """信号叠加增强结果"""
- is_reinforced: bool
- direction: SignalDirection
- boost_factor: float
- original_count: int
- avg_confidence: float
- class AgentCoordinator:
- """
- 智能体协同器
- 功能:
- 1. 信号冲突处理:当智能体生成反向信号时,比较置信度处理
- 2. 信号叠加增强:当多个智能体生成同向信号时,提升仓位上限
- """
- def __init__(
- self,
- conflict_resolution: ConflictResolutionMethod = ConflictResolutionMethod.CONFIDENCE_WEIGHTED,
- min_confidence_diff: float = 0.2,
- reinforcement_threshold: int = 3,
- position_boost: float = 0.2,
- stop_tighten: float = 0.5,
- max_position_boost: float = 0.3
- ):
- self.conflict_resolution = conflict_resolution
- self.min_confidence_diff = min_confidence_diff
- self.reinforcement_threshold = reinforcement_threshold
- self.position_boost = position_boost
- self.stop_tighten = stop_tighten
- self.max_position_boost = max_position_boost
- self.coordination_history: List[CoordinatedSignal] = []
- def coordinate(
- self,
- signals: Dict[str, AgentSignal],
- weights: Dict[str, float]
- ) -> Optional[CoordinatedSignal]:
- """
- 协同多个智能体的信号
- Args:
- signals: 各智能体的信号
- weights: 各智能体的权重
- Returns:
- CoordinatedSignal: 协同后的信号
- """
- if not signals:
- return None
- # 单信号直接返回
- if len(signals) == 1:
- signal = list(signals.values())[0]
- return CoordinatedSignal(
- original_signals=[signal],
- final_direction=signal.direction,
- final_strength=signal.confidence,
- final_position=signal.suggested_position,
- coordination_type="single",
- reasoning="单信号,无需协同"
- )
- # 分组:多头、空头、中性
- long_signals = []
- short_signals = []
- neutral_signals = []
- for name, signal in signals.items():
- if signal.direction == SignalDirection.LONG:
- long_signals.append((name, signal))
- elif signal.direction == SignalDirection.SHORT:
- short_signals.append((name, signal))
- else:
- neutral_signals.append((name, signal))
- # 情况1: 只有同向信号 - 叠加增强
- if long_signals and not short_signals:
- return self._reinforce_signals(long_signals, weights, SignalDirection.LONG)
- if short_signals and not long_signals:
- return self._reinforce_signals(short_signals, weights, SignalDirection.SHORT)
- # 情况2: 有反向信号 - 冲突处理
- if long_signals and short_signals:
- return self._resolve_conflict(long_signals, short_signals, weights)
- # 情况3: 只有中性信号
- if neutral_signals:
- avg_confidence = sum(s.confidence for _, s in neutral_signals) / len(neutral_signals)
- return CoordinatedSignal(
- original_signals=[s for _, s in neutral_signals],
- final_direction=SignalDirection.NEUTRAL,
- final_strength=avg_confidence,
- final_position=0.0,
- coordination_type="conflict_resolved",
- reasoning="所有智能体均输出中性信号,观望"
- )
- return None
- def _reinforce_signals(
- self,
- signals: List[Tuple[str, AgentSignal]],
- weights: Dict[str, float],
- direction: SignalDirection
- ) -> CoordinatedSignal:
- """
- 信号叠加增强
- 当多个智能体生成同向信号时:
- 1. 提升该方向仓位上限
- 2. 收紧止损
- """
- if len(signals) < self.reinforcement_threshold:
- # 未达到增强阈值,正常加权
- weighted_position = sum(
- signal.suggested_position * weights.get(name, 1/len(signals))
- for name, signal in signals
- )
- avg_confidence = sum(s.confidence for _, s in signals) / len(signals)
- coordinated = CoordinatedSignal(
- original_signals=[s for _, s in signals],
- final_direction=direction,
- final_strength=avg_confidence,
- final_position=min(1.0, weighted_position),
- coordination_type="reinforced",
- reasoning=f"{len(signals)}个智能体同向,未达增强阈值"
- )
- else:
- # 达到增强阈值,提升仓位
- base_position = sum(
- signal.suggested_position * weights.get(name, 1/len(signals))
- for name, signal in signals
- )
- # 计算增强因子
- boost = min(
- self.max_position_boost,
- self.position_boost * (len(signals) - self.reinforcement_threshold + 1)
- )
- enhanced_position = min(1.0, base_position * (1 + boost))
- avg_confidence = sum(s.confidence for _, s in signals) / len(signals)
- coordinated = CoordinatedSignal(
- original_signals=[s for _, s in signals],
- final_direction=direction,
- final_strength=min(1.0, avg_confidence * 1.1), # 置信度小幅提升
- final_position=enhanced_position,
- coordination_type="reinforced",
- reasoning=f"{len(signals)}个智能体同向,仓位提升{boost:.1%},止损收紧{self.stop_tighten:.1%}"
- )
- self.coordination_history.append(coordinated)
- return coordinated
- def _resolve_conflict(
- self,
- long_signals: List[Tuple[str, AgentSignal]],
- short_signals: List[Tuple[str, AgentSignal]],
- weights: Dict[str, float]
- ) -> CoordinatedSignal:
- """
- 信号冲突处理
- 当存在反向信号时:
- 1. 比较双方加权置信度
- 2. 根据配置的策略处理冲突
- """
- # 计算各方加权力量和
- long_strength = sum(
- signal.confidence * weights.get(name, 0.5)
- for name, signal in long_signals
- )
- short_strength = sum(
- signal.confidence * weights.get(name, 0.5)
- for name, signal in short_signals
- )
- # 计算数量
- long_count = len(long_signals)
- short_count = len(short_signals)
- all_signals = [s for _, s in long_signals] + [s for _, s in short_signals]
- # 冲突解决策略
- if self.conflict_resolution == ConflictResolutionMethod.CANCEL:
- # 取消所有信号,观望
- return CoordinatedSignal(
- original_signals=all_signals,
- final_direction=SignalDirection.NEUTRAL,
- final_strength=0.0,
- final_position=0.0,
- coordination_type="conflict_resolved",
- reasoning=f"信号冲突(多:{long_count} vs 空:{short_count}),取消所有信号观望"
- )
- elif self.conflict_resolution == ConflictResolutionMethod.HIGHER_WINS:
- # 较强方获胜
- if long_strength > short_strength:
- winner_signals = long_signals
- winner_direction = SignalDirection.LONG
- else:
- winner_signals = short_signals
- winner_direction = SignalDirection.SHORT
- avg_confidence = sum(s.confidence for _, s in winner_signals) / len(winner_signals)
- weighted_position = sum(
- signal.suggested_position * weights.get(name, 1/len(winner_signals))
- for name, signal in winner_signals
- )
- return CoordinatedSignal(
- original_signals=all_signals,
- final_direction=winner_direction,
- final_strength=avg_confidence,
- final_position=weighted_position,
- coordination_type="conflict_resolved",
- reasoning=f"信号冲突,{winner_direction.value}方获胜(力量比{long_strength:.2f}:{short_strength:.2f})"
- )
- else: # CONFIDENCE_WEIGHTED
- # 按置信度加权执行双方
- total_strength = long_strength + short_strength
- if total_strength == 0:
- return CoordinatedSignal(
- original_signals=all_signals,
- final_direction=SignalDirection.NEUTRAL,
- final_strength=0.0,
- final_position=0.0,
- coordination_type="conflict_resolved",
- reasoning="信号冲突,双方力量均为0,观望"
- )
- long_ratio = long_strength / total_strength
- short_ratio = short_strength / total_strength
- # 净仓位
- net_position = long_ratio - short_ratio
- if abs(net_position) < self.min_confidence_diff:
- return CoordinatedSignal(
- original_signals=all_signals,
- final_direction=SignalDirection.NEUTRAL,
- final_strength=abs(net_position),
- final_position=0.0,
- coordination_type="conflict_resolved",
- reasoning=f"信号冲突,净仓位{net_position:.2f}低于阈值,观望"
- )
- # 确定方向和仓位
- if net_position > 0:
- direction = SignalDirection.LONG
- position = min(1.0, net_position)
- else:
- direction = SignalDirection.SHORT
- position = min(1.0, abs(net_position))
- # 综合置信度
- avg_confidence = sum(s.confidence for s in all_signals) / len(all_signals)
- return CoordinatedSignal(
- original_signals=all_signals,
- final_direction=direction,
- final_strength=avg_confidence,
- final_position=position,
- coordination_type="conflict_resolved",
- reasoning=f"信号冲突,置信度加权结果:{direction.value},仓位{position:.2f}(多{long_ratio:.2f} vs 空{short_ratio:.2f})"
- )
- def get_coordination_summary(self) -> Dict[str, Any]:
- """获取协同历史摘要"""
- if not self.coordination_history:
- return {}
- total = len(self.coordination_history)
- type_counts = {}
- for coord in self.coordination_history:
- type_counts[coord.coordination_type] = type_counts.get(coord.coordination_type, 0) + 1
- return {
- "total_coordinations": total,
- "type_distribution": type_counts,
- "avg_final_position": sum(c.final_position for c in self.coordination_history) / total,
- "last_direction": self.coordination_history[-1].final_direction.value if self.coordination_history else None
- }
|