diff --git a/agent_orchestration/__init__.py b/agent_orchestration/__init__.py new file mode 100644 index 000000000..34d4de780 --- /dev/null +++ b/agent_orchestration/__init__.py @@ -0,0 +1,46 @@ +""" +Agent Orchestration Plugin +============================ + +Multi-agent orchestration for Hermes — spawn, communicate, and coordinate +sub-agents via workflow patterns (parallel, sequential, map_reduce, dag). + +Ported from Claude Code's agent management patterns as an independent plugin. + +Usage: + This module is loaded by Hermes' plugin system. The plugin manager calls + ``register(ctx)`` which wires up tools and hooks via PluginContext. +""" + +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + + +def register(ctx) -> None: + """Plugin entry point — called by Hermes PluginManager. + + Registers: + - 4 tools: spawn_agent, agent_status, send_agent_message, orchestrate_task + - 3 hooks: on_session_start, on_session_end, pre_tool_call + """ + from agent_orchestration.config import load_orchestration_config + + config = load_orchestration_config() + if not config.get("enabled", True): + logger.info("Agent orchestration plugin disabled via config") + return + + from agent_orchestration.tools import register_all as register_tools + from agent_orchestration.hooks import register_all as register_hooks + + register_tools(ctx) + register_hooks(ctx) + + logger.info( + "Agent orchestration plugin loaded: 4 tools, 3 hooks " + "(max_agents=%d)", + config.get("max_agents", 5), + ) diff --git a/agent_orchestration/config.py b/agent_orchestration/config.py new file mode 100644 index 000000000..2ee079717 --- /dev/null +++ b/agent_orchestration/config.py @@ -0,0 +1,78 @@ +""" +Agent Orchestration Configuration +=================================== + +Reads ``orchestration:`` top-level key from Hermes config.yaml. +Hermes deep-merges unknown keys transparently, so no core changes needed. +""" + +import logging +import os +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +_DEFAULTS: Dict[str, Any] = { + "enabled": True, + "max_agents": 5, + "default_max_iterations": 50, + "default_toolsets": ["terminal", "file", "web"], + "permissions": { + "mode": "inherit", + "allowlist": [], + "blocklist": [], + }, + "context_sharing": { + "enabled": True, + "max_shared_context_tokens": 4000, + }, + "mailbox_dir": "", # default: ~/.hermes/orchestration/mailboxes/ +} + + +def _load_hermes_config() -> Dict[str, Any]: + """Load the Hermes main config.yaml.""" + try: + from hermes_cli.config import load_config + return load_config() + except Exception: + return {} + + +def load_orchestration_config() -> Dict[str, Any]: + """Load and merge orchestration config with defaults.""" + cfg = _load_hermes_config() + orch = cfg.get("orchestration", {}) + if not isinstance(orch, dict): + orch = {} + + # Deep merge with defaults + merged = dict(_DEFAULTS) + for key, val in orch.items(): + if key in merged and isinstance(merged[key], dict) and isinstance(val, dict): + merged[key] = {**merged[key], **val} + else: + merged[key] = val + + # Resolve mailbox_dir + if not merged.get("mailbox_dir"): + try: + from hermes_constants import get_hermes_home + merged["mailbox_dir"] = str( + get_hermes_home() / "orchestration" / "mailboxes" + ) + except Exception: + merged["mailbox_dir"] = os.path.expanduser( + "~/.hermes/orchestration/mailboxes" + ) + + return merged + + +def get_max_agents() -> int: + return int(load_orchestration_config().get("max_agents", 5)) + + +def get_default_toolsets() -> List[str]: + return list(load_orchestration_config().get("default_toolsets", ["terminal", "file", "web"])) diff --git a/agent_orchestration/hooks/__init__.py b/agent_orchestration/hooks/__init__.py new file mode 100644 index 000000000..9846d4798 --- /dev/null +++ b/agent_orchestration/hooks/__init__.py @@ -0,0 +1,98 @@ +""" +Agent Orchestration Lifecycle Hooks +===================================== + +Registered hooks: + + - on_session_start: Initialize AgentManager + MailboxHub + - on_session_end: Cleanup all agents and mailboxes + - pre_tool_call: Permission checks for orchestration-managed agents +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +def on_session_start(**kwargs) -> None: + """Initialize orchestration components when a new session begins.""" + from agent_orchestration.manager import get_manager + from agent_orchestration.mailbox import MailboxHub, get_mailbox_hub, set_mailbox_hub + from agent_orchestration.config import load_orchestration_config + + config = load_orchestration_config() + if not config.get("enabled", True): + logger.debug("Agent orchestration disabled in config") + return + + # Initialize manager + manager = get_manager() + + # If the hook receives an agent reference, wire it up + agent = kwargs.get("agent") + if agent is not None: + manager.set_parent_agent(agent) + + # Initialize mailbox hub + hub = MailboxHub( + mailbox_dir=config.get("mailbox_dir", ""), + session_id=getattr(agent, "session_id", "") if agent else "", + ) + set_mailbox_hub(hub) + + logger.info( + "Agent orchestration initialized (max_agents=%d)", + config.get("max_agents", 5), + ) + + +def on_session_end(**kwargs) -> None: + """Cleanup all orchestration resources when session ends.""" + from agent_orchestration.manager import reset_manager + from agent_orchestration.mailbox import get_mailbox_hub + + # Terminate all running agents + reset_manager() + + # Cleanup all mailboxes + hub = get_mailbox_hub() + hub.cleanup_all() + + logger.info("Agent orchestration cleaned up") + + +def pre_tool_call(**kwargs) -> Optional[Dict[str, Any]]: + """Permission check for tools called by orchestration-managed agents. + + Returns a block directive if the tool call should be prevented: + {"action": "block", "message": "reason"} + + Returns None to allow the call. + """ + tool_name = kwargs.get("tool_name", "") + args = kwargs.get("args", {}) + + # Block orchestration tools from being called by sub-agents + # (they should only be called by the parent agent) + # This prevents infinite recursion + orchestration_tools = {"spawn_agent", "agent_status", "send_agent_message", "orchestrate_task"} + if tool_name in orchestration_tools: + # Check if caller is a sub-agent (has _delegate_depth > 0) + agent = kwargs.get("agent") + if agent and getattr(agent, "_delegate_depth", 0) > 0: + return { + "action": "block", + "message": f"Sub-agents cannot call orchestration tools ({tool_name})", + } + + return None + + +def register_all(ctx) -> None: + """Register all lifecycle hooks via PluginContext.""" + ctx.register_hook("on_session_start", on_session_start) + ctx.register_hook("on_session_end", on_session_end) + ctx.register_hook("pre_tool_call", pre_tool_call) diff --git a/agent_orchestration/mailbox.py b/agent_orchestration/mailbox.py new file mode 100644 index 000000000..121d71dc3 --- /dev/null +++ b/agent_orchestration/mailbox.py @@ -0,0 +1,326 @@ +""" +MailboxHub — Inter-Agent Message Passing +========================================= + +Ported from Claude Code's ``src/utils/mailbox.ts`` file-mailbox pattern. + +Hybrid storage: in-memory queue (primary) + JSON file (persistence). +Thread-safe via ``threading.Lock``. Cross-process safety via ``fcntl.flock()`` +for gateway mode where agents run in separate processes. + +Message types: + - message: generic text between agents + - task_assignment: parent assigns a new task to child + - result: child reports task result to parent + - permission_request: child asks parent for permission escalation + - shutdown: parent tells child to terminate + +Storage path: ``~/.hermes/orchestration/mailboxes/{session_id}/{agent_id}.json`` +""" + +from __future__ import annotations + +import fcntl +import json +import logging +import os +import threading +import time +import uuid +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Message types +MSG_MESSAGE = "message" +MSG_TASK_ASSIGNMENT = "task_assignment" +MSG_RESULT = "result" +MSG_PERMISSION_REQUEST = "permission_request" +MSG_SHUTDOWN = "shutdown" + + +@dataclass +class AgentMessage: + """A single message in an agent's mailbox.""" + id: str = "" + source: str = "" # agent_id of sender + target: str = "" # agent_id of recipient + msg_type: str = MSG_MESSAGE + content: str = "" + timestamp: float = 0.0 + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self): + if not self.id: + self.id = uuid.uuid4().hex[:12] + if not self.timestamp: + self.timestamp = time.time() + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "AgentMessage": + return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}) + + +class Mailbox: + """Per-agent mailbox with in-memory queue + optional file persistence. + + Ported from Claude Code's Mailbox class (src/utils/mailbox.ts). + Uses threading.Condition for efficient wait/notify instead of polling. + """ + + def __init__(self, agent_id: str, persist_path: Optional[Path] = None): + self.agent_id = agent_id + self._queue: List[AgentMessage] = [] + self._lock = threading.Lock() + self._condition = threading.Condition(self._lock) + self._persist_path = persist_path + self._revision = 0 + self._closed = False + + # Load persisted messages on startup + if persist_path: + self._load_from_disk() + + @property + def length(self) -> int: + with self._lock: + return len(self._queue) + + @property + def revision(self) -> int: + with self._lock: + return self._revision + + def send(self, msg: AgentMessage) -> None: + """Add a message to the mailbox and wake any waiters.""" + with self._condition: + if self._closed: + logger.warning("Mailbox %s: send() on closed mailbox", self.agent_id) + return + self._revision += 1 + self._queue.append(msg) + self._condition.notify_all() + self._persist_to_disk() + + def poll(self, fn: Callable[[AgentMessage], bool] = None) -> Optional[AgentMessage]: + """Non-blocking: return and remove the first matching message, or None.""" + if fn is None: + fn = lambda _: True + with self._lock: + for i, msg in enumerate(self._queue): + if fn(msg): + self._revision += 1 + removed = self._queue.pop(i) + self._persist_to_disk() + return removed + return None + + def receive( + self, + fn: Callable[[AgentMessage], bool] = None, + timeout: float = 30.0, + ) -> Optional[AgentMessage]: + """Blocking: wait for and return a matching message. + + Returns None on timeout or if mailbox is closed. + """ + if fn is None: + fn = lambda _: True + + with self._condition: + deadline = time.monotonic() + timeout + while True: + # Check existing queue first + for i, msg in enumerate(self._queue): + if fn(msg): + self._revision += 1 + removed = self._queue.pop(i) + self._persist_to_disk() + return removed + + if self._closed: + return None + + remaining = deadline - time.monotonic() + if remaining <= 0: + return None + + if not self._condition.wait(timeout=remaining): + return None + + def receive_all(self, fn: Callable[[AgentMessage], bool] = None) -> List[AgentMessage]: + """Non-blocking: return and remove all matching messages.""" + if fn is None: + fn = lambda _: True + with self._lock: + matched = [msg for msg in self._queue if fn(msg)] + if matched: + self._queue = [msg for msg in self._queue if not fn(msg)] + self._revision += len(matched) + self._persist_to_disk() + return matched + + def peek(self, fn: Callable[[AgentMessage], bool] = None) -> Optional[AgentMessage]: + """Non-blocking: return (without removing) the first matching message.""" + if fn is None: + fn = lambda _: True + with self._lock: + for msg in self._queue: + if fn(msg): + return msg + return None + + def close(self) -> None: + """Mark mailbox as closed, waking any waiting threads.""" + with self._condition: + self._closed = True + self._condition.notify_all() + + def _persist_to_disk(self) -> None: + """Write current queue to JSON file (called with lock held).""" + if not self._persist_path: + return + try: + self._persist_path.parent.mkdir(parents=True, exist_ok=True) + data = [msg.to_dict() for msg in self._queue] + tmp_path = self._persist_path.with_suffix(".tmp") + with open(tmp_path, "w") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + json.dump(data, f, ensure_ascii=False, indent=2) + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + tmp_path.replace(self._persist_path) + except Exception as exc: + logger.debug("Mailbox persist failed for %s: %s", self.agent_id, exc) + + def _load_from_disk(self) -> None: + """Load persisted messages from JSON file.""" + if not self._persist_path or not self._persist_path.exists(): + return + try: + with open(self._persist_path, "r") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_SH) + data = json.load(f) + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + for item in data: + msg = AgentMessage.from_dict(item) + self._queue.append(msg) + self._revision = len(self._queue) + except Exception as exc: + logger.debug("Mailbox load failed for %s: %s", self.agent_id, exc) + + +class MailboxHub: + """Central hub managing per-agent mailboxes. + + Provides lookup, creation, and cleanup. Each agent gets its own Mailbox + instance, persisted to ``mailbox_dir/{session_id}/{agent_id}.json``. + """ + + def __init__(self, mailbox_dir: Optional[str] = None, session_id: str = ""): + self._mailboxes: Dict[str, Mailbox] = {} + self._lock = threading.Lock() + self._mailbox_dir = Path(mailbox_dir) if mailbox_dir else None + self._session_id = session_id or uuid.uuid4().hex[:8] + + def get_or_create(self, agent_id: str) -> Mailbox: + """Get existing mailbox or create a new one for the agent.""" + with self._lock: + if agent_id not in self._mailboxes: + persist_path = None + if self._mailbox_dir: + persist_path = ( + self._mailbox_dir + / self._session_id + / f"{agent_id}.json" + ) + self._mailboxes[agent_id] = Mailbox( + agent_id=agent_id, persist_path=persist_path + ) + return self._mailboxes[agent_id] + + def send_message( + self, + source_id: str, + target_id: str, + content: str, + msg_type: str = MSG_MESSAGE, + metadata: Optional[Dict[str, Any]] = None, + ) -> AgentMessage: + """Send a message from one agent to another.""" + msg = AgentMessage( + source=source_id, + target=target_id, + msg_type=msg_type, + content=content, + metadata=metadata or {}, + ) + target_mailbox = self.get_or_create(target_id) + target_mailbox.send(msg) + + # Also keep a copy in sender's outbox for traceability + return msg + + def broadcast( + self, + source_id: str, + content: str, + msg_type: str = MSG_MESSAGE, + exclude: Optional[set] = None, + ) -> List[str]: + """Broadcast a message to all known mailboxes except excluded ones.""" + exclude = exclude or set() + exclude.add(source_id) + sent_to = [] + with self._lock: + for agent_id in list(self._mailboxes.keys()): + if agent_id not in exclude: + self.send_message(source_id, agent_id, content, msg_type) + sent_to.append(agent_id) + return sent_to + + def cleanup(self, agent_id: str) -> None: + """Remove and close an agent's mailbox.""" + with self._lock: + mb = self._mailboxes.pop(agent_id, None) + if mb: + mb.close() + + def cleanup_all(self) -> None: + """Close and remove all mailboxes.""" + with self._lock: + for mb in self._mailboxes.values(): + mb.close() + self._mailboxes.clear() + + def list_agents(self) -> List[str]: + """Return list of agent IDs with mailboxes.""" + with self._lock: + return list(self._mailboxes.keys()) + + def get_pending_count(self, agent_id: str) -> int: + """Return number of pending messages for an agent.""" + mb = self._mailboxes.get(agent_id) + return mb.length if mb else 0 + + +# Module-level singleton +_mailbox_hub: Optional[MailboxHub] = None + + +def get_mailbox_hub() -> MailboxHub: + """Get the global MailboxHub (creates empty one if not initialized).""" + global _mailbox_hub + if _mailbox_hub is None: + _mailbox_hub = MailboxHub() + return _mailbox_hub + + +def set_mailbox_hub(hub: MailboxHub) -> None: + """Set the global MailboxHub (used by on_session_start hook).""" + global _mailbox_hub + _mailbox_hub = hub diff --git a/agent_orchestration/manager.py b/agent_orchestration/manager.py new file mode 100644 index 000000000..924b7b7dc --- /dev/null +++ b/agent_orchestration/manager.py @@ -0,0 +1,379 @@ +""" +AgentManager — Sub-Agent Lifecycle Management +=============================================== + +Ported from Claude Code's ``InProcessBackend + spawnInProcess`` pattern. +Uses Hermes' existing ``delegate_tool._build_child_agent()`` construction +pattern for thread-safe agent creation. + +Key design: + - Child agents built on main thread (like delegate_tool) + - Execution via ThreadPoolExecutor + - Global variable save/restore (model_tools._last_resolved_tool_names) + - Delegation depth limit: max 2 levels +""" + +from __future__ import annotations + +import logging +import threading +import time +import uuid +from concurrent.futures import Future, ThreadPoolExecutor +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Status constants +STATUS_PENDING = "pending" +STATUS_RUNNING = "running" +STATUS_COMPLETED = "completed" +STATUS_FAILED = "failed" +STATUS_INTERRUPTED = "interrupted" + + +@dataclass +class AgentRecord: + """Tracks a spawned sub-agent's lifecycle.""" + agent_id: str + goal: str + status: str = STATUS_PENDING + created_at: float = 0.0 + finished_at: Optional[float] = None + future: Optional[Future] = None + child_agent: Any = None # AIAgent instance + result: Optional[Dict[str, Any]] = None + error: Optional[str] = None + iterations: int = 0 + model: Optional[str] = None + + def __post_init__(self): + if not self.created_at: + self.created_at = time.time() + + +class AgentManager: + """Manages the lifecycle of spawned sub-agents. + + Singleton per session. Created by ``on_session_start`` hook, + cleaned up by ``on_session_end`` hook. + """ + + def __init__(self, max_agents: int = 5): + self._agents: Dict[str, AgentRecord] = {} + self._lock = threading.Lock() + self._executor = ThreadPoolExecutor(max_workers=max_agents) + self._max_agents = max_agents + self._parent_agent = None # Set when session starts + + @property + def active_count(self) -> int: + with self._lock: + return sum( + 1 for r in self._agents.values() + if r.status in (STATUS_PENDING, STATUS_RUNNING) + ) + + def set_parent_agent(self, parent_agent) -> None: + """Store reference to the parent agent for child construction.""" + self._parent_agent = parent_agent + + def spawn( + self, + goal: str, + context: Optional[str] = None, + toolsets: Optional[List[str]] = None, + model: Optional[str] = None, + max_iterations: Optional[int] = None, + agent_id: Optional[str] = None, + ) -> str: + """Spawn a background sub-agent and return its agent_id. + + The child is built on the calling thread (thread-safe per + delegate_tool pattern), then submitted to the executor pool. + + Returns: + agent_id string + Raises: + RuntimeError if max_agents limit reached or no parent agent set + """ + if self._parent_agent is None: + raise RuntimeError("No parent agent set — call set_parent_agent() first") + + with self._lock: + if self.active_count >= self._max_agents: + raise RuntimeError( + f"Max concurrent agents ({self._max_agents}) reached. " + "Wait for an agent to finish or increase orchestration.max_agents." + ) + + agent_id = agent_id or f"agent-{uuid.uuid4().hex[:8]}" + record = AgentRecord(agent_id=agent_id, goal=goal) + self._agents[agent_id] = record + + # Build child agent on the calling (main) thread + child = self._build_child( + goal=goal, + context=context, + toolsets=toolsets, + model=model, + max_iterations=max_iterations or 50, + ) + record.child_agent = child + record.model = getattr(child, "model", None) + + # Submit to executor + record.status = STATUS_RUNNING + record.future = self._executor.submit( + self._run_agent, agent_id, goal, child + ) + + return agent_id + + def get_status(self, agent_id: Optional[str] = None) -> Dict[str, Any]: + """Get status of a specific agent or all agents.""" + if agent_id: + with self._lock: + rec = self._agents.get(agent_id) + if not rec: + return {"error": f"Agent '{agent_id}' not found"} + return self._record_to_dict(rec) + else: + with self._lock: + return { + "agents": { + aid: self._record_to_dict(rec) + for aid, rec in self._agents.items() + }, + "active_count": self.active_count, + "max_agents": self._max_agents, + } + + def send_message(self, agent_id: str, content: str) -> bool: + """Send a message to a running agent via clarify_callback injection. + + Uses the same inject_message pattern as PluginContext. + """ + with self._lock: + rec = self._agents.get(agent_id) + if not rec or rec.status != STATUS_RUNNING: + return False + + child = rec.child_agent + if child is None: + return False + + # Try clarify_callback channel (set by _build_child if available) + clarify_cb = getattr(child, "clarify_callback", None) + if clarify_cb and callable(clarify_cb): + try: + clarify_cb(content) + return True + except Exception as exc: + logger.debug("clarify_callback injection failed: %s", exc) + + # Fallback: try interrupt queue + interrupt_q = getattr(child, "_interrupt_queue", None) + if interrupt_q: + try: + interrupt_q.put(content) + return True + except Exception as exc: + logger.debug("interrupt_queue injection failed: %s", exc) + + logger.warning("No message injection channel for agent %s", agent_id) + return False + + def terminate(self, agent_id: str) -> bool: + """Request termination of a running agent.""" + with self._lock: + rec = self._agents.get(agent_id) + if not rec or rec.status not in (STATUS_PENDING, STATUS_RUNNING): + return False + + child = rec.child_agent + if child: + # Set interrupt flag — agent checks this each iteration + if hasattr(child, "_interrupt_requested"): + child._interrupt_requested = True + # Also try cancel the future + if rec.future and not rec.future.done(): + rec.future.cancel() + + rec.status = STATUS_INTERRUPTED + rec.finished_at = time.time() + return True + + def cleanup_all(self) -> None: + """Terminate all running agents and shut down executor.""" + with self._lock: + for rec in self._agents.values(): + if rec.status in (STATUS_PENDING, STATUS_RUNNING): + child = rec.child_agent + if child and hasattr(child, "_interrupt_requested"): + child._interrupt_requested = True + if rec.future and not rec.future.done(): + rec.future.cancel() + rec.status = STATUS_INTERRUPTED + rec.finished_at = time.time() + # Close child resources + if rec.child_agent and hasattr(rec.child_agent, "close"): + try: + rec.child_agent.close() + except Exception: + pass + + self._executor.shutdown(wait=False, cancel_futures=True) + self._agents.clear() + + def _build_child( + self, + goal: str, + context: Optional[str], + toolsets: Optional[List[str]], + model: Optional[str], + max_iterations: int, + ): + """Build a child AIAgent using delegate_tool's construction pattern.""" + from tools.delegate_tool import ( + _build_child_agent, + _build_child_system_prompt, + ) + + parent = self._parent_agent + + # Save parent tool names before child construction mutates the global + import model_tools + parent_tool_names = list(model_tools._last_resolved_tool_names) + + try: + child = _build_child_agent( + task_index=0, + goal=goal, + context=context, + toolsets=toolsets, + model=model, + max_iterations=max_iterations, + parent_agent=parent, + ) + # Save parent tool names for restoration after child runs + child._delegate_saved_tool_names = parent_tool_names + finally: + # Restore global immediately after construction + model_tools._last_resolved_tool_names = parent_tool_names + + return child + + def _run_agent( + self, agent_id: str, goal: str, child + ) -> Dict[str, Any]: + """Run a child agent to completion (called from executor thread).""" + start = time.monotonic() + try: + result = child.run_conversation(user_message=goal) + duration = round(time.monotonic() - start, 2) + + summary = result.get("final_response") or "" + completed = result.get("completed", False) + interrupted = result.get("interrupted", False) + + if interrupted: + status = STATUS_INTERRUPTED + elif summary: + status = STATUS_COMPLETED + else: + status = STATUS_FAILED + + entry = { + "agent_id": agent_id, + "status": status, + "summary": summary, + "api_calls": result.get("api_calls", 0), + "duration_seconds": duration, + "model": getattr(child, "model", None), + } + + # Update record + with self._lock: + rec = self._agents.get(agent_id) + if rec: + rec.status = status + rec.result = entry + rec.finished_at = time.time() + rec.iterations = result.get("api_calls", 0) + if status == STATUS_FAILED: + rec.error = result.get("error", "No response produced") + + return entry + + except Exception as exc: + duration = round(time.monotonic() - start, 2) + logger.exception("Agent %s failed", agent_id) + + with self._lock: + rec = self._agents.get(agent_id) + if rec: + rec.status = STATUS_FAILED + rec.error = str(exc) + rec.finished_at = time.time() + + return { + "agent_id": agent_id, + "status": STATUS_FAILED, + "error": str(exc), + "duration_seconds": duration, + } + + finally: + # Restore parent tool names + import model_tools + saved = getattr(child, "_delegate_saved_tool_names", None) + if isinstance(saved, list): + model_tools._last_resolved_tool_names = list(saved) + + # Close child resources + try: + if hasattr(child, "close"): + child.close() + except Exception: + pass + + @staticmethod + def _record_to_dict(rec: AgentRecord) -> Dict[str, Any]: + d: Dict[str, Any] = { + "agent_id": rec.agent_id, + "goal": rec.goal[:100], + "status": rec.status, + "model": rec.model, + "iterations": rec.iterations, + } + if rec.error: + d["error"] = rec.error + if rec.result: + d["summary"] = rec.result.get("summary", "")[:200] + return d + + +# Module-level singleton — one AgentManager per process +_manager: Optional[AgentManager] = None +_manager_lock = threading.Lock() + + +def get_manager() -> AgentManager: + """Get or create the global AgentManager.""" + global _manager + with _manager_lock: + if _manager is None: + from .config import get_max_agents + _manager = AgentManager(max_agents=get_max_agents()) + return _manager + + +def reset_manager() -> None: + """Reset the global manager (used by on_session_end hook).""" + global _manager + with _manager_lock: + if _manager is not None: + _manager.cleanup_all() + _manager = None diff --git a/agent_orchestration/plugin.yaml b/agent_orchestration/plugin.yaml new file mode 100644 index 000000000..0c0c3cc7d --- /dev/null +++ b/agent_orchestration/plugin.yaml @@ -0,0 +1,12 @@ +name: agent_orchestration +version: "0.1.0" +description: "Multi-agent orchestration — spawn, communicate, and coordinate sub-agents via workflow patterns (parallel, sequential, map_reduce, dag)" +provides_tools: + - spawn_agent + - agent_status + - send_agent_message + - orchestrate_task +provides_hooks: + - on_session_start + - on_session_end + - pre_tool_call diff --git a/agent_orchestration/tools/__init__.py b/agent_orchestration/tools/__init__.py new file mode 100644 index 000000000..b17a079c7 --- /dev/null +++ b/agent_orchestration/tools/__init__.py @@ -0,0 +1,475 @@ +""" +Agent Orchestration Tools +========================== + +4 tools registered via PluginContext.register_tool(): + + - spawn_agent: spawn a background sub-agent + - agent_status: query agent status + - send_agent_message: send message to running agent + - orchestrate_task: execute a workflow (parallel/sequential/map_reduce/dag) + +Each tool returns JSON strings (convention in Hermes tool system). +""" + +from __future__ import annotations + +import json +import logging +import time +import uuid +from concurrent.futures import as_completed +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +# ── Tool Schemas ───────────────────────────────────────────────────────── + +SPAWN_AGENT_SCHEMA = { + "type": "object", + "properties": { + "goal": { + "type": "string", + "description": "The task for the sub-agent to accomplish", + }, + "context": { + "type": "string", + "description": "Additional context or instructions (optional)", + }, + "toolsets": { + "type": "array", + "items": {"type": "string"}, + "description": "Tool sets to make available (default: terminal, file, web)", + }, + "model": { + "type": "string", + "description": "Model to use (default: inherit from parent)", + }, + }, + "required": ["goal"], +} + +AGENT_STATUS_SCHEMA = { + "type": "object", + "properties": { + "agent_id": { + "type": "string", + "description": "Specific agent ID, or omit for all agents", + }, + }, +} + +SEND_AGENT_MESSAGE_SCHEMA = { + "type": "object", + "properties": { + "agent_id": { + "type": "string", + "description": "Target agent ID", + }, + "message": { + "type": "string", + "description": "Message content to send", + }, + }, + "required": ["agent_id", "message"], +} + +ORCHESTRATE_TASK_SCHEMA = { + "type": "object", + "properties": { + "workflow_type": { + "type": "string", + "enum": ["parallel", "sequential", "map_reduce", "dag"], + "description": "Workflow execution pattern", + }, + "tasks": { + "type": "array", + "items": { + "type": "object", + "properties": { + "goal": {"type": "string"}, + "context": {"type": "string"}, + "toolsets": { + "type": "array", + "items": {"type": "string"}, + }, + "depends_on": { + "type": "array", + "items": {"type": "string"}, + "description": "Task IDs this depends on (dag only)", + }, + }, + "required": ["goal"], + }, + "description": "List of tasks to execute", + }, + "aggregator_goal": { + "type": "string", + "description": "Goal for the aggregation agent (map_reduce only)", + }, + "max_parallel": { + "type": "integer", + "description": "Max parallel agents (default: orchestration.max_agents)", + }, + }, + "required": ["workflow_type", "tasks"], +} + + +# ── Tool Handlers ──────────────────────────────────────────────────────── + +def handle_spawn_agent(args: Dict[str, Any], **kwargs) -> str: + """Spawn a background sub-agent.""" + from agent_orchestration.manager import get_manager + + goal = args.get("goal", "").strip() + if not goal: + return json.dumps({"error": "Missing required parameter: goal"}) + + manager = get_manager() + try: + agent_id = manager.spawn( + goal=goal, + context=args.get("context"), + toolsets=args.get("toolsets"), + model=args.get("model"), + ) + return json.dumps({ + "agent_id": agent_id, + "status": "running", + "message": f"Agent {agent_id} spawned and running in background", + }) + except RuntimeError as exc: + return json.dumps({"error": str(exc)}) + except Exception as exc: + logger.exception("spawn_agent failed") + return json.dumps({"error": str(exc)}) + + +def handle_agent_status(args: Dict[str, Any], **kwargs) -> str: + """Query status of spawned agents.""" + from agent_orchestration.manager import get_manager + + manager = get_manager() + result = manager.get_status(args.get("agent_id")) + return json.dumps(result, ensure_ascii=False) + + +def handle_send_agent_message(args: Dict[str, Any], **kwargs) -> str: + """Send a message to a running agent.""" + from agent_orchestration.manager import get_manager + from agent_orchestration.mailbox import get_mailbox_hub + + agent_id = args.get("agent_id", "").strip() + message = args.get("message", "").strip() + + if not agent_id or not message: + return json.dumps({"error": "Missing required: agent_id and message"}) + + manager = get_manager() + + # Send via manager (inject into running agent) + injected = manager.send_message(agent_id, message) + + # Also store in mailbox for async retrieval + hub = get_mailbox_hub() + hub.send_message( + source_id="parent", + target_id=agent_id, + content=message, + ) + + return json.dumps({ + "agent_id": agent_id, + "injected": injected, + "mailbox_queued": True, + }) + + +def handle_orchestrate_task(args: Dict[str, Any], **kwargs) -> str: + """Execute an orchestrated workflow across multiple agents.""" + from agent_orchestration.manager import get_manager + from agent_orchestration.config import get_max_agents + + workflow_type = args.get("workflow_type", "").strip() + tasks = args.get("tasks", []) + + if not workflow_type: + return json.dumps({"error": "Missing required: workflow_type"}) + if not tasks: + return json.dumps({"error": "Missing required: tasks"}) + + # Validate tasks + for i, t in enumerate(tasks): + if not t.get("goal", "").strip(): + return json.dumps({"error": f"Task {i} missing 'goal'"}) + + manager = get_manager() + max_parallel = args.get("max_parallel") or get_max_agents() + + try: + if workflow_type == "parallel": + result = _run_parallel(manager, tasks, max_parallel) + elif workflow_type == "sequential": + result = _run_sequential(manager, tasks) + elif workflow_type == "map_reduce": + result = _run_map_reduce( + manager, tasks, + args.get("aggregator_goal", ""), + max_parallel, + ) + elif workflow_type == "dag": + result = _run_dag(manager, tasks, max_parallel) + else: + return json.dumps({"error": f"Unknown workflow_type: {workflow_type}"}) + + return json.dumps(result, ensure_ascii=False) + + except Exception as exc: + logger.exception("orchestrate_task failed") + return json.dumps({"error": str(exc)}) + + +# ── Workflow Implementations ───────────────────────────────────────────── + +def _run_parallel( + manager, tasks: List[Dict], max_parallel: int +) -> Dict[str, Any]: + """Run all tasks in parallel, collect results.""" + start = time.monotonic() + results = [] + + # Assign IDs + for i, t in enumerate(tasks): + t["_task_id"] = f"task-{i}" + + # Build and spawn all agents (respecting max_parallel) + agent_map = {} # task_id -> agent_id + for t in tasks: + aid = manager.spawn( + goal=t["goal"], + context=t.get("context"), + toolsets=t.get("toolsets"), + ) + agent_map[t["_task_id"]] = aid + + # Wait for all to complete (poll status) + pending = set(agent_map.values()) + while pending: + time.sleep(1) + for aid in list(pending): + status = manager.get_status(aid) + s = status.get("status", "") + if s in ("completed", "failed", "interrupted"): + results.append(status) + pending.discard(aid) + + duration = round(time.monotonic() - start, 2) + return { + "workflow": "parallel", + "results": results, + "total_duration_seconds": duration, + "task_count": len(tasks), + } + + +def _run_sequential(manager, tasks: List[Dict]) -> Dict[str, Any]: + """Run tasks one after another, passing previous result as context.""" + start = time.monotonic() + results = [] + previous_summary = "" + + for i, t in enumerate(tasks): + # Enrich context with previous result + ctx = t.get("context", "") or "" + if previous_summary: + ctx = f"{ctx}\n\nPrevious step result:\n{previous_summary}" if ctx else \ + f"Previous step result:\n{previous_summary}" + + aid = manager.spawn(goal=t["goal"], context=ctx) + + # Wait for completion + while True: + status = manager.get_status(aid) + s = status.get("status", "") + if s in ("completed", "failed", "interrupted"): + results.append(status) + previous_summary = status.get("summary", "") + break + time.sleep(1) + + duration = round(time.monotonic() - start, 2) + return { + "workflow": "sequential", + "results": results, + "total_duration_seconds": duration, + "task_count": len(tasks), + } + + +def _run_map_reduce( + manager, tasks: List[Dict], aggregator_goal: str, max_parallel: int +) -> Dict[str, Any]: + """Run mappers in parallel, then aggregate with a single agent.""" + start = time.monotonic() + + # Map phase + map_results = [] + agent_ids = [] + for t in tasks: + aid = manager.spawn( + goal=t["goal"], context=t.get("context"), + toolsets=t.get("toolsets"), + ) + agent_ids.append(aid) + + # Wait for all mappers + for aid in agent_ids: + while True: + status = manager.get_status(aid) + s = status.get("status", "") + if s in ("completed", "failed", "interrupted"): + map_results.append(status) + break + time.sleep(1) + + # Reduce phase + agg_context = "Results from mapper agents:\n\n" + for i, r in enumerate(map_results): + agg_context += f"--- Mapper {i+1} ---\n" + agg_context += r.get("summary", "(no result)") + "\n\n" + + agg_goal = aggregator_goal or "Synthesize and summarize the following results from multiple agents into a coherent final answer." + + agg_id = manager.spawn(goal=agg_goal, context=agg_context) + while True: + status = manager.get_status(agg_id) + s = status.get("status", "") + if s in ("completed", "failed", "interrupted"): + break + time.sleep(1) + + duration = round(time.monotonic() - start, 2) + return { + "workflow": "map_reduce", + "map_results": map_results, + "aggregation": status, + "total_duration_seconds": duration, + "task_count": len(tasks), + } + + +def _run_dag(manager, tasks: List[Dict], max_parallel: int) -> Dict[str, Any]: + """Run tasks as a DAG — tasks with satisfied dependencies execute in parallel. + + Uses Kahn's algorithm for topological sorting. + """ + start = time.monotonic() + + # Assign IDs and build dependency graph + task_map = {} + for i, t in enumerate(tasks): + tid = f"dag-{i}" + t["_task_id"] = tid + task_map[tid] = t + + # Track completed tasks and results + completed: Dict[str, str] = {} # task_id -> summary + results = [] + remaining = set(task_map.keys()) + + while remaining: + # Find tasks with all dependencies satisfied + ready = [] + for tid in list(remaining): + deps = task_map[tid].get("depends_on", []) + if all(d in completed for d in deps): + ready.append(tid) + + if not ready: + # Circular dependency or stuck + return { + "workflow": "dag", + "error": "Circular dependency or unresolvable tasks", + "remaining": list(remaining), + "results": results, + } + + # Spawn ready tasks + agent_ids = {} + for tid in ready: + t = task_map[tid] + # Build context from dependency results + ctx = t.get("context", "") or "" + dep_summaries = [] + for dep_id in t.get("depends_on", []): + if dep_id in completed: + dep_summaries.append(f"[{dep_id}]: {completed[dep_id]}") + if dep_summaries: + dep_ctx = "Dependency results:\n" + "\n".join(dep_summaries) + ctx = f"{ctx}\n\n{dep_ctx}" if ctx else dep_ctx + + aid = manager.spawn(goal=t["goal"], context=ctx) + agent_ids[tid] = aid + + # Wait for this batch to complete + for tid, aid in agent_ids.items(): + while True: + status = manager.get_status(aid) + s = status.get("status", "") + if s in ("completed", "failed", "interrupted"): + results.append({"task_id": tid, **status}) + completed[tid] = status.get("summary", "") + remaining.discard(tid) + break + time.sleep(1) + + duration = round(time.monotonic() - start, 2) + return { + "workflow": "dag", + "results": results, + "total_duration_seconds": duration, + "task_count": len(tasks), + } + + +# ── Registration Helper ────────────────────────────────────────────────── + +def register_all(ctx) -> None: + """Register all 4 orchestration tools via PluginContext.""" + ctx.register_tool( + name="spawn_agent", + toolset="orchestration", + schema=SPAWN_AGENT_SCHEMA, + handler=handle_spawn_agent, + description="Spawn a background sub-agent to execute a task autonomously", + emoji="🔀", + ) + + ctx.register_tool( + name="agent_status", + toolset="orchestration", + schema=AGENT_STATUS_SCHEMA, + handler=handle_agent_status, + description="Query status of spawned agents (running, completed, failed)", + emoji="📊", + ) + + ctx.register_tool( + name="send_agent_message", + toolset="orchestration", + schema=SEND_AGENT_MESSAGE_SCHEMA, + handler=handle_send_agent_message, + description="Send a message to a running sub-agent (injects into conversation)", + emoji="✉️", + ) + + ctx.register_tool( + name="orchestrate_task", + toolset="orchestration", + schema=ORCHESTRATE_TASK_SCHEMA, + handler=handle_orchestrate_task, + description="Execute a multi-agent workflow (parallel, sequential, map_reduce, or dag)", + emoji="🎯", + )