diff --git a/acp_adapter/server.py b/acp_adapter/server.py index b21e6fa3bf..543d49cb3a 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -6,6 +6,7 @@ import asyncio import contextvars import logging import os +import uuid from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from typing import Any, Deque, Optional @@ -13,6 +14,7 @@ from typing import Any, Deque, Optional import acp from acp.schema import ( AgentCapabilities, + AgentMessageChunk, AuthenticateResponse, AvailableCommand, AvailableCommandsUpdate, @@ -45,6 +47,7 @@ from acp.schema import ( TextContentBlock, UnstructuredCommandInput, Usage, + UserMessageChunk, ) # AuthMethodAgent was renamed from AuthMethod in agent-client-protocol 0.9.0 @@ -377,6 +380,88 @@ class HermesACPAgent(acp.Agent): # ---- Session management ------------------------------------------------- + @staticmethod + def _history_message_text(message: dict[str, Any]) -> str: + """Extract displayable text from a persisted OpenAI-style message.""" + content = message.get("content") + if isinstance(content, str): + return content.strip() + if isinstance(content, list): + parts: list[str] = [] + for item in content: + if isinstance(item, dict): + text = item.get("text") + if isinstance(text, str): + parts.append(text) + elif item.get("type") == "text" and isinstance(item.get("content"), str): + parts.append(item["content"]) + elif isinstance(item, str): + parts.append(item) + return "\n".join(part.strip() for part in parts if part and part.strip()).strip() + return "" + + @staticmethod + def _history_message_update( + *, + session_id: str, + index: int, + role: str, + text: str, + ) -> UserMessageChunk | AgentMessageChunk | None: + """Build an ACP history replay update for a user/assistant message.""" + message_id = str(uuid.uuid5(uuid.NAMESPACE_URL, f"hermes-acp:{session_id}:{index}:{role}")) + block = TextContentBlock(type="text", text=text) + if role == "user": + return UserMessageChunk( + session_update="user_message_chunk", + content=block, + message_id=message_id, + ) + if role == "assistant": + return AgentMessageChunk( + session_update="agent_message_chunk", + content=block, + message_id=message_id, + ) + return None + + async def _replay_session_history(self, state: SessionState) -> None: + """Send persisted user/assistant history to clients during session/load. + + Zed's ACP history UI calls ``session/load`` after the user picks an item + from the Agents sidebar. The agent must then replay the full conversation + as ``user_message_chunk`` / ``agent_message_chunk`` notifications; merely + restoring server-side state makes Hermes remember context, but leaves the + editor looking like a clean thread. + """ + if not self._conn or not state.history: + return + + for index, message in enumerate(state.history): + role = str(message.get("role") or "") + if role not in {"user", "assistant"}: + continue + text = self._history_message_text(message) + if not text: + continue + update = self._history_message_update( + session_id=state.session_id, + index=index, + role=role, + text=text, + ) + if update is None: + continue + try: + await self._conn.session_update(session_id=state.session_id, update=update) + except Exception: + logger.warning( + "Failed to replay ACP history for session %s", + state.session_id, + exc_info=True, + ) + return + async def new_session( self, cwd: str, @@ -405,6 +490,7 @@ class HermesACPAgent(acp.Agent): return None await self._register_session_mcp_servers(state, mcp_servers) logger.info("Loaded session %s", session_id) + await self._replay_session_history(state) self._schedule_available_commands_update(session_id) return LoadSessionResponse(models=self._build_model_state(state)) @@ -421,6 +507,7 @@ class HermesACPAgent(acp.Agent): state = self.session_manager.create_session(cwd=cwd) await self._register_session_mcp_servers(state, mcp_servers) logger.info("Resumed session %s", state.session_id) + await self._replay_session_history(state) self._schedule_available_commands_update(state.session_id) return ResumeSessionResponse(models=self._build_model_state(state)) diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py index d4afed101f..2fee2bf109 100644 --- a/tests/acp/test_server.py +++ b/tests/acp/test_server.py @@ -11,6 +11,7 @@ import acp from acp.agent.router import build_agent_router from acp.schema import ( AgentCapabilities, + AgentMessageChunk, AuthenticateResponse, AvailableCommandsUpdate, Implementation, @@ -27,6 +28,7 @@ from acp.schema import ( SessionInfo, TextContentBlock, Usage, + UserMessageChunk, ) from acp_adapter.server import HermesACPAgent, HERMES_VERSION from acp_adapter.session import SessionManager @@ -224,6 +226,60 @@ class TestSessionOps: resp = await agent.load_session(cwd="/tmp", session_id="bogus") assert resp is None + @pytest.mark.asyncio + async def test_load_session_replays_persisted_history_to_client(self, agent): + mock_conn = MagicMock(spec=acp.Client) + mock_conn.session_update = AsyncMock() + agent._conn = mock_conn + + new_resp = await agent.new_session(cwd="/tmp") + state = agent.session_manager.get_session(new_resp.session_id) + state.history = [ + {"role": "system", "content": "hidden system"}, + {"role": "user", "content": "what controls the / slash commands?"}, + {"role": "assistant", "content": "HermesACPAgent._ADVERTISED_COMMANDS controls them."}, + {"role": "tool", "content": "tool output should not replay"}, + ] + + mock_conn.session_update.reset_mock() + resp = await agent.load_session(cwd="/tmp", session_id=new_resp.session_id) + + assert isinstance(resp, LoadSessionResponse) + calls = mock_conn.session_update.await_args_list + replay_calls = [ + call for call in calls + if getattr(call.kwargs.get("update"), "session_update", None) + in {"user_message_chunk", "agent_message_chunk"} + ] + assert len(replay_calls) == 2 + assert isinstance(replay_calls[0].kwargs["update"], UserMessageChunk) + assert replay_calls[0].kwargs["update"].content.text == "what controls the / slash commands?" + assert replay_calls[0].kwargs["update"].message_id + assert isinstance(replay_calls[1].kwargs["update"], AgentMessageChunk) + assert replay_calls[1].kwargs["update"].content.text.startswith("HermesACPAgent") + assert replay_calls[1].kwargs["update"].message_id + + @pytest.mark.asyncio + async def test_resume_session_replays_persisted_history_to_client(self, agent): + mock_conn = MagicMock(spec=acp.Client) + mock_conn.session_update = AsyncMock() + agent._conn = mock_conn + + new_resp = await agent.new_session(cwd="/tmp") + state = agent.session_manager.get_session(new_resp.session_id) + state.history = [{"role": "user", "content": "So tell me the current state"}] + + mock_conn.session_update.reset_mock() + resp = await agent.resume_session(cwd="/tmp", session_id=new_resp.session_id) + + assert isinstance(resp, ResumeSessionResponse) + updates = [call.kwargs["update"] for call in mock_conn.session_update.await_args_list] + assert any( + isinstance(update, UserMessageChunk) + and update.content.text == "So tell me the current state" + for update in updates + ) + @pytest.mark.asyncio async def test_resume_session_creates_new_if_missing(self, agent): resume_resp = await agent.resume_session(cwd="/tmp", session_id="nonexistent")