"""ACP agent server — exposes Hermes Agent via the Agent Client Protocol.""" from __future__ import annotations import asyncio import logging from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from typing import Any, Deque, Optional import acp from acp.schema import ( AgentCapabilities, AuthenticateResponse, AvailableCommand, AvailableCommandsUpdate, ClientCapabilities, EmbeddedResourceContentBlock, ForkSessionResponse, ImageContentBlock, AudioContentBlock, Implementation, InitializeResponse, ListSessionsResponse, LoadSessionResponse, McpServerHttp, McpServerSse, McpServerStdio, ModelInfo, NewSessionResponse, PromptResponse, ResumeSessionResponse, SetSessionConfigOptionResponse, SetSessionModelResponse, SetSessionModeResponse, ResourceContentBlock, SessionCapabilities, SessionForkCapabilities, SessionListCapabilities, SessionModelState, SessionResumeCapabilities, SessionInfo, TextContentBlock, UnstructuredCommandInput, Usage, ) # AuthMethodAgent was renamed from AuthMethod in agent-client-protocol 0.9.0 try: from acp.schema import AuthMethodAgent except ImportError: from acp.schema import AuthMethod as AuthMethodAgent # type: ignore[attr-defined] from acp_adapter.auth import detect_provider from acp_adapter.events import ( make_message_cb, make_step_cb, make_thinking_cb, make_tool_progress_cb, ) from acp_adapter.permissions import make_approval_callback from acp_adapter.session import SessionManager, SessionState logger = logging.getLogger(__name__) try: from hermes_cli import __version__ as HERMES_VERSION except Exception: HERMES_VERSION = "0.0.0" # Thread pool for running AIAgent (synchronous) in parallel. _executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="acp-agent") def _extract_text( prompt: list[ TextContentBlock | ImageContentBlock | AudioContentBlock | ResourceContentBlock | EmbeddedResourceContentBlock ], ) -> str: """Extract plain text from ACP content blocks.""" parts: list[str] = [] for block in prompt: if isinstance(block, TextContentBlock): parts.append(block.text) elif hasattr(block, "text"): parts.append(str(block.text)) # Non-text blocks are ignored for now. return "\n".join(parts) class HermesACPAgent(acp.Agent): """ACP Agent implementation wrapping Hermes AIAgent.""" _SLASH_COMMANDS = { "help": "Show available commands", "model": "Show or change current model", "tools": "List available tools", "context": "Show conversation context info", "reset": "Clear conversation history", "compact": "Compress conversation context", "version": "Show Hermes version", } _ADVERTISED_COMMANDS = ( { "name": "help", "description": "List available commands", }, { "name": "model", "description": "Show current model and provider, or switch models", "input_hint": "model name to switch to", }, { "name": "tools", "description": "List available tools with descriptions", }, { "name": "context", "description": "Show conversation message counts by role", }, { "name": "reset", "description": "Clear conversation history", }, { "name": "compact", "description": "Compress conversation context", }, { "name": "version", "description": "Show Hermes version", }, ) def __init__(self, session_manager: SessionManager | None = None): super().__init__() self.session_manager = session_manager or SessionManager() self._conn: Optional[acp.Client] = None # ---- Connection lifecycle ----------------------------------------------- def on_connect(self, conn: acp.Client) -> None: """Store the client connection for sending session updates.""" self._conn = conn logger.info("ACP client connected") @staticmethod def _encode_model_choice(provider: str | None, model: str | None) -> str: """Encode a model selection so ACP clients can keep provider context.""" raw_model = str(model or "").strip() if not raw_model: return "" raw_provider = str(provider or "").strip().lower() if not raw_provider: return raw_model return f"{raw_provider}:{raw_model}" def _build_model_state(self, state: SessionState) -> SessionModelState | None: """Return the ACP model selector payload for editors like Zed.""" model = str(state.model or getattr(state.agent, "model", "") or "").strip() provider = getattr(state.agent, "provider", None) or detect_provider() or "openrouter" try: from hermes_cli.models import curated_models_for_provider, normalize_provider, provider_label normalized_provider = normalize_provider(provider) provider_name = provider_label(normalized_provider) available_models: list[ModelInfo] = [] seen_ids: set[str] = set() for model_id, description in curated_models_for_provider(normalized_provider): rendered_model = str(model_id or "").strip() if not rendered_model: continue choice_id = self._encode_model_choice(normalized_provider, rendered_model) if choice_id in seen_ids: continue desc_parts = [f"Provider: {provider_name}"] if description: desc_parts.append(str(description).strip()) if rendered_model == model: desc_parts.append("current") available_models.append( ModelInfo( model_id=choice_id, name=rendered_model, description=" • ".join(part for part in desc_parts if part), ) ) seen_ids.add(choice_id) current_model_id = self._encode_model_choice(normalized_provider, model) if current_model_id and current_model_id not in seen_ids: available_models.insert( 0, ModelInfo( model_id=current_model_id, name=model, description=f"Provider: {provider_name} • current", ), ) if available_models: return SessionModelState( available_models=available_models, current_model_id=current_model_id or available_models[0].model_id, ) except Exception: logger.debug("Could not build ACP model state", exc_info=True) if not model: return None fallback_choice = self._encode_model_choice(provider, model) return SessionModelState( available_models=[ModelInfo(model_id=fallback_choice, name=model)], current_model_id=fallback_choice, ) @staticmethod def _resolve_model_selection(raw_model: str, current_provider: str) -> tuple[str, str]: """Resolve ``provider:model`` input into the provider and normalized model id.""" target_provider = current_provider new_model = raw_model.strip() try: from hermes_cli.models import detect_provider_for_model, parse_model_input target_provider, new_model = parse_model_input(new_model, current_provider) if target_provider == current_provider: detected = detect_provider_for_model(new_model, current_provider) if detected: target_provider, new_model = detected except Exception: logger.debug("Provider detection failed, using model as-is", exc_info=True) return target_provider, new_model async def _register_session_mcp_servers( self, state: SessionState, mcp_servers: list[McpServerStdio | McpServerHttp | McpServerSse] | None, ) -> None: """Register ACP-provided MCP servers and refresh the agent tool surface.""" if not mcp_servers: return try: from tools.mcp_tool import register_mcp_servers config_map: dict[str, dict] = {} for server in mcp_servers: name = server.name if isinstance(server, McpServerStdio): config = { "command": server.command, "args": list(server.args), "env": {item.name: item.value for item in server.env}, } else: config = { "url": server.url, "headers": {item.name: item.value for item in server.headers}, } config_map[name] = config await asyncio.to_thread(register_mcp_servers, config_map) except Exception: logger.warning( "Session %s: failed to register ACP MCP servers", state.session_id, exc_info=True, ) return try: from model_tools import get_tool_definitions enabled_toolsets = getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"] disabled_toolsets = getattr(state.agent, "disabled_toolsets", None) state.agent.tools = get_tool_definitions( enabled_toolsets=enabled_toolsets, disabled_toolsets=disabled_toolsets, quiet_mode=True, ) state.agent.valid_tool_names = { tool["function"]["name"] for tool in state.agent.tools or [] } invalidate = getattr(state.agent, "_invalidate_system_prompt", None) if callable(invalidate): invalidate() logger.info( "Session %s: refreshed tool surface after ACP MCP registration (%d tools)", state.session_id, len(state.agent.tools or []), ) except Exception: logger.warning( "Session %s: failed to refresh tool surface after ACP MCP registration", state.session_id, exc_info=True, ) # ---- ACP lifecycle ------------------------------------------------------ async def initialize( self, protocol_version: int | None = None, client_capabilities: ClientCapabilities | None = None, client_info: Implementation | None = None, **kwargs: Any, ) -> InitializeResponse: resolved_protocol_version = ( protocol_version if isinstance(protocol_version, int) else acp.PROTOCOL_VERSION ) provider = detect_provider() auth_methods = None if provider: auth_methods = [ AuthMethodAgent( id=provider, name=f"{provider} runtime credentials", description=f"Authenticate Hermes using the currently configured {provider} runtime credentials.", ) ] client_name = client_info.name if client_info else "unknown" logger.info( "Initialize from %s (protocol v%s)", client_name, resolved_protocol_version, ) return InitializeResponse( protocol_version=acp.PROTOCOL_VERSION, agent_info=Implementation(name="hermes-agent", version=HERMES_VERSION), agent_capabilities=AgentCapabilities( load_session=True, session_capabilities=SessionCapabilities( fork=SessionForkCapabilities(), list=SessionListCapabilities(), resume=SessionResumeCapabilities(), ), ), auth_methods=auth_methods, ) async def authenticate(self, method_id: str, **kwargs: Any) -> AuthenticateResponse | None: # Only accept authenticate() calls whose method_id matches the # provider we advertised in initialize(). Without this check, # authenticate() would acknowledge any method_id as long as the # server has provider credentials configured — harmless under # Hermes' threat model (ACP is stdio-only, local-trust), but poor # API hygiene and confusing if ACP ever grows multi-method auth. provider = detect_provider() if not provider: return None if not isinstance(method_id, str) or method_id.strip().lower() != provider: return None return AuthenticateResponse() # ---- Session management ------------------------------------------------- async def new_session( self, cwd: str, mcp_servers: list | None = None, **kwargs: Any, ) -> NewSessionResponse: state = self.session_manager.create_session(cwd=cwd) await self._register_session_mcp_servers(state, mcp_servers) logger.info("New session %s (cwd=%s)", state.session_id, cwd) self._schedule_available_commands_update(state.session_id) return NewSessionResponse( session_id=state.session_id, models=self._build_model_state(state), ) async def load_session( self, cwd: str, session_id: str, mcp_servers: list | None = None, **kwargs: Any, ) -> LoadSessionResponse | None: state = self.session_manager.update_cwd(session_id, cwd) if state is None: logger.warning("load_session: session %s not found", session_id) return None await self._register_session_mcp_servers(state, mcp_servers) logger.info("Loaded session %s", session_id) self._schedule_available_commands_update(session_id) return LoadSessionResponse(models=self._build_model_state(state)) async def resume_session( self, cwd: str, session_id: str, mcp_servers: list | None = None, **kwargs: Any, ) -> ResumeSessionResponse: state = self.session_manager.update_cwd(session_id, cwd) if state is None: logger.warning("resume_session: session %s not found, creating new", session_id) state = self.session_manager.create_session(cwd=cwd) await self._register_session_mcp_servers(state, mcp_servers) logger.info("Resumed session %s", state.session_id) self._schedule_available_commands_update(state.session_id) return ResumeSessionResponse(models=self._build_model_state(state)) async def cancel(self, session_id: str, **kwargs: Any) -> None: state = self.session_manager.get_session(session_id) if state and state.cancel_event: state.cancel_event.set() try: if getattr(state, "agent", None) and hasattr(state.agent, "interrupt"): state.agent.interrupt() except Exception: logger.debug("Failed to interrupt ACP session %s", session_id, exc_info=True) logger.info("Cancelled session %s", session_id) async def fork_session( self, cwd: str, session_id: str, mcp_servers: list | None = None, **kwargs: Any, ) -> ForkSessionResponse: state = self.session_manager.fork_session(session_id, cwd=cwd) new_id = state.session_id if state else "" if state is not None: await self._register_session_mcp_servers(state, mcp_servers) logger.info("Forked session %s -> %s", session_id, new_id) if new_id: self._schedule_available_commands_update(new_id) return ForkSessionResponse(session_id=new_id) async def list_sessions( self, cursor: str | None = None, cwd: str | None = None, **kwargs: Any, ) -> ListSessionsResponse: infos = self.session_manager.list_sessions(cwd=cwd) if cursor: # Find the cursor index for idx, s in enumerate(infos): if s["session_id"] == cursor: infos = infos[idx + 1:] break else: # Cursor not found, return empty infos = [] # Cap limit limit = kwargs.get("limit", 50) has_more = len(infos) > limit infos = infos[:limit] sessions = [] for s in infos: updated_at = s.get("updated_at") if updated_at is not None and not isinstance(updated_at, str): updated_at = str(updated_at) sessions.append( SessionInfo( session_id=s["session_id"], cwd=s["cwd"], title=s.get("title"), updated_at=updated_at, ) ) next_cursor = sessions[-1].session_id if has_more and sessions else None return ListSessionsResponse(sessions=sessions, nextCursor=next_cursor) # ---- Prompt (core) ------------------------------------------------------ async def prompt( self, prompt: list[ TextContentBlock | ImageContentBlock | AudioContentBlock | ResourceContentBlock | EmbeddedResourceContentBlock ], session_id: str, **kwargs: Any, ) -> PromptResponse: """Run Hermes on the user's prompt and stream events back to the editor.""" state = self.session_manager.get_session(session_id) if state is None: logger.error("prompt: session %s not found", session_id) return PromptResponse(stop_reason="refusal") user_text = _extract_text(prompt).strip() if not user_text: return PromptResponse(stop_reason="end_turn") # Intercept slash commands — handle locally without calling the LLM if user_text.startswith("/"): response_text = self._handle_slash_command(user_text, state) if response_text is not None: if self._conn: update = acp.update_agent_message_text(response_text) await self._conn.session_update(session_id, update) return PromptResponse(stop_reason="end_turn") logger.info("Prompt on session %s: %s", session_id, user_text[:100]) conn = self._conn loop = asyncio.get_running_loop() if state.cancel_event: state.cancel_event.clear() tool_call_ids: dict[str, Deque[str]] = defaultdict(deque) tool_call_meta: dict[str, dict[str, Any]] = {} previous_approval_cb = None if conn: tool_progress_cb = make_tool_progress_cb(conn, session_id, loop, tool_call_ids, tool_call_meta) thinking_cb = make_thinking_cb(conn, session_id, loop) step_cb = make_step_cb(conn, session_id, loop, tool_call_ids, tool_call_meta) message_cb = make_message_cb(conn, session_id, loop) approval_cb = make_approval_callback(conn.request_permission, loop, session_id) else: tool_progress_cb = None thinking_cb = None step_cb = None message_cb = None approval_cb = None agent = state.agent agent.tool_progress_callback = tool_progress_cb agent.thinking_callback = thinking_cb agent.step_callback = step_cb agent.message_callback = message_cb if approval_cb: try: from tools import terminal_tool as _terminal_tool previous_approval_cb = getattr(_terminal_tool, "_approval_callback", None) _terminal_tool.set_approval_callback(approval_cb) except Exception: logger.debug("Could not set ACP approval callback", exc_info=True) def _run_agent() -> dict: try: result = agent.run_conversation( user_message=user_text, conversation_history=state.history, task_id=session_id, ) return result except Exception as e: logger.exception("Agent error in session %s", session_id) return {"final_response": f"Error: {e}", "messages": state.history} finally: if approval_cb: try: from tools import terminal_tool as _terminal_tool _terminal_tool.set_approval_callback(previous_approval_cb) except Exception: logger.debug("Could not restore approval callback", exc_info=True) try: result = await loop.run_in_executor(_executor, _run_agent) except Exception: logger.exception("Executor error for session %s", session_id) return PromptResponse(stop_reason="end_turn") if result.get("messages"): state.history = result["messages"] # Persist updated history so sessions survive process restarts. self.session_manager.save_session(session_id) final_response = result.get("final_response", "") if final_response: try: from agent.title_generator import maybe_auto_title maybe_auto_title( self.session_manager._get_db(), session_id, user_text, final_response, state.history, ) except Exception: logger.debug("Failed to auto-title ACP session %s", session_id, exc_info=True) if final_response and conn: update = acp.update_agent_message_text(final_response) await conn.session_update(session_id, update) usage = None if any(result.get(key) is not None for key in ("prompt_tokens", "completion_tokens", "total_tokens")): usage = Usage( input_tokens=result.get("prompt_tokens", 0), output_tokens=result.get("completion_tokens", 0), total_tokens=result.get("total_tokens", 0), thought_tokens=result.get("reasoning_tokens"), cached_read_tokens=result.get("cache_read_tokens"), ) stop_reason = "cancelled" if state.cancel_event and state.cancel_event.is_set() else "end_turn" return PromptResponse(stop_reason=stop_reason, usage=usage) # ---- Slash commands (headless) ------------------------------------------- @classmethod def _available_commands(cls) -> list[AvailableCommand]: commands: list[AvailableCommand] = [] for spec in cls._ADVERTISED_COMMANDS: input_hint = spec.get("input_hint") commands.append( AvailableCommand( name=spec["name"], description=spec["description"], input=UnstructuredCommandInput(hint=input_hint) if input_hint else None, ) ) return commands async def _send_available_commands_update(self, session_id: str) -> None: """Advertise supported slash commands to the connected ACP client.""" if not self._conn: return try: await self._conn.session_update( session_id=session_id, update=AvailableCommandsUpdate( session_update="available_commands_update", available_commands=self._available_commands(), ), ) except Exception: logger.warning( "Failed to advertise ACP slash commands for session %s", session_id, exc_info=True, ) def _schedule_available_commands_update(self, session_id: str) -> None: """Send the command advertisement after the session response is queued.""" if not self._conn: return loop = asyncio.get_running_loop() loop.call_soon( asyncio.create_task, self._send_available_commands_update(session_id) ) def _handle_slash_command(self, text: str, state: SessionState) -> str | None: """Dispatch a slash command and return the response text. Returns ``None`` for unrecognized commands so they fall through to the LLM (the user may have typed ``/something`` as prose). """ parts = text.split(maxsplit=1) cmd = parts[0].lstrip("/").lower() args = parts[1].strip() if len(parts) > 1 else "" handler = { "help": self._cmd_help, "model": self._cmd_model, "tools": self._cmd_tools, "context": self._cmd_context, "reset": self._cmd_reset, "compact": self._cmd_compact, "version": self._cmd_version, }.get(cmd) if handler is None: return None # not a known command — let the LLM handle it try: return handler(args, state) except Exception as e: logger.error("Slash command /%s error: %s", cmd, e, exc_info=True) return f"Error executing /{cmd}: {e}" def _cmd_help(self, args: str, state: SessionState) -> str: lines = ["Available commands:", ""] for cmd, desc in self._SLASH_COMMANDS.items(): lines.append(f" /{cmd:10s} {desc}") lines.append("") lines.append("Unrecognized /commands are sent to the model as normal messages.") return "\n".join(lines) def _cmd_model(self, args: str, state: SessionState) -> str: if not args: model = state.model or getattr(state.agent, "model", "unknown") provider = getattr(state.agent, "provider", None) or "auto" return f"Current model: {model}\nProvider: {provider}" current_provider = getattr(state.agent, "provider", None) or "openrouter" target_provider, new_model = self._resolve_model_selection(args, current_provider) state.model = new_model state.agent = self.session_manager._make_agent( session_id=state.session_id, cwd=state.cwd, model=new_model, requested_provider=target_provider, ) self.session_manager.save_session(state.session_id) provider_label = getattr(state.agent, "provider", None) or target_provider or current_provider logger.info("Session %s: model switched to %s", state.session_id, new_model) return f"Model switched to: {new_model}\nProvider: {provider_label}" def _cmd_tools(self, args: str, state: SessionState) -> str: try: from model_tools import get_tool_definitions toolsets = getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"] tools = get_tool_definitions(enabled_toolsets=toolsets, quiet_mode=True) if not tools: return "No tools available." lines = [f"Available tools ({len(tools)}):"] for t in tools: name = t.get("function", {}).get("name", "?") desc = t.get("function", {}).get("description", "") # Truncate long descriptions if len(desc) > 80: desc = desc[:77] + "..." lines.append(f" {name}: {desc}") return "\n".join(lines) except Exception as e: return f"Could not list tools: {e}" def _cmd_context(self, args: str, state: SessionState) -> str: n_messages = len(state.history) if n_messages == 0: return "Conversation is empty (no messages yet)." # Count by role roles: dict[str, int] = {} for msg in state.history: role = msg.get("role", "unknown") roles[role] = roles.get(role, 0) + 1 lines = [ f"Conversation: {n_messages} messages", f" user: {roles.get('user', 0)}, assistant: {roles.get('assistant', 0)}, " f"tool: {roles.get('tool', 0)}, system: {roles.get('system', 0)}", ] model = state.model or getattr(state.agent, "model", "") if model: lines.append(f"Model: {model}") return "\n".join(lines) def _cmd_reset(self, args: str, state: SessionState) -> str: state.history.clear() self.session_manager.save_session(state.session_id) return "Conversation history cleared." def _cmd_compact(self, args: str, state: SessionState) -> str: if not state.history: return "Nothing to compress — conversation is empty." try: agent = state.agent if not getattr(agent, "compression_enabled", True): return "Context compression is disabled for this agent." if not hasattr(agent, "_compress_context"): return "Context compression not available for this agent." from agent.model_metadata import estimate_messages_tokens_rough original_count = len(state.history) approx_tokens = estimate_messages_tokens_rough(state.history) original_session_db = getattr(agent, "_session_db", None) try: # ACP sessions must keep a stable session id, so avoid the # SQLite session-splitting side effect inside _compress_context. agent._session_db = None compressed, _ = agent._compress_context( state.history, getattr(agent, "_cached_system_prompt", "") or "", approx_tokens=approx_tokens, task_id=state.session_id, ) finally: agent._session_db = original_session_db state.history = compressed self.session_manager.save_session(state.session_id) new_count = len(state.history) new_tokens = estimate_messages_tokens_rough(state.history) return ( f"Context compressed: {original_count} -> {new_count} messages\n" f"~{approx_tokens:,} -> ~{new_tokens:,} tokens" ) except Exception as e: return f"Compression failed: {e}" def _cmd_version(self, args: str, state: SessionState) -> str: return f"Hermes Agent v{HERMES_VERSION}" # ---- Model switching (ACP protocol method) ------------------------------- async def set_session_model( self, model_id: str, session_id: str, **kwargs: Any ) -> SetSessionModelResponse | None: """Switch the model for a session (called by ACP protocol).""" state = self.session_manager.get_session(session_id) if state: current_provider = getattr(state.agent, "provider", None) requested_provider, resolved_model = self._resolve_model_selection( model_id, current_provider or "openrouter", ) state.model = resolved_model provider_changed = bool(current_provider and requested_provider != current_provider) current_base_url = None if provider_changed else getattr(state.agent, "base_url", None) current_api_mode = None if provider_changed else getattr(state.agent, "api_mode", None) state.agent = self.session_manager._make_agent( session_id=session_id, cwd=state.cwd, model=resolved_model, requested_provider=requested_provider, base_url=current_base_url, api_mode=current_api_mode, ) self.session_manager.save_session(session_id) logger.info( "Session %s: model switched to %s via provider %s", session_id, resolved_model, requested_provider, ) return SetSessionModelResponse() logger.warning("Session %s: model switch requested for missing session", session_id) return None async def set_session_mode( self, mode_id: str, session_id: str, **kwargs: Any ) -> SetSessionModeResponse | None: """Persist the editor-requested mode so ACP clients do not fail on mode switches.""" state = self.session_manager.get_session(session_id) if state is None: logger.warning("Session %s: mode switch requested for missing session", session_id) return None setattr(state, "mode", mode_id) self.session_manager.save_session(session_id) logger.info("Session %s: mode switched to %s", session_id, mode_id) return SetSessionModeResponse() async def set_config_option( self, config_id: str, session_id: str, value: str, **kwargs: Any ) -> SetSessionConfigOptionResponse | None: """Accept ACP config option updates even when Hermes has no typed ACP config surface yet.""" state = self.session_manager.get_session(session_id) if state is None: logger.warning("Session %s: config update requested for missing session", session_id) return None options = getattr(state, "config_options", None) if not isinstance(options, dict): options = {} options[str(config_id)] = value setattr(state, "config_options", options) self.session_manager.save_session(session_id) logger.info("Session %s: config option %s updated", session_id, config_id) return SetSessionConfigOptionResponse(config_options=[])