diff --git a/acp_adapter/server.py b/acp_adapter/server.py index 9c8bb416999..3031de161fd 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -889,14 +889,17 @@ class HermesACPAgent(acp.Agent): ).strip() async def _replay_session_history(self, state: SessionState) -> None: - """Send persisted user/assistant history to clients during session/load. + """Replay persisted user/assistant history during session/load or session/resume. - Zed's ACP history UI calls ``session/load`` after the user picks an item - from the Agents sidebar. The agent must then replay the full conversation - as user/assistant chunks, thinking-mode thought chunks, plus reconstructed - tool-call start/completion notifications; merely restoring server-side - state makes Hermes remember context, but leaves the editor looking like a - clean thread. + Invoked inline (``await``) from both ``load_session`` and + ``resume_session`` so that spec-compliant ACP clients receive the + full transcript within the request's lifetime — see the comment at + the call sites for the rationale and prior-art citations. + + Replays the conversation as user/assistant chunks, thinking-mode + thought chunks, plus reconstructed tool-call start/completion + notifications. Merely restoring server-side state makes Hermes + remember context, but leaves the editor looking like a clean thread. """ if not self._conn or not state.history: return @@ -991,18 +994,6 @@ class HermesACPAgent(acp.Agent): models=self._build_model_state(state), ) - def _schedule_history_replay(self, state: SessionState) -> None: - """Replay persisted history after session/load or session/resume returns. - - Zed only attaches streamed transcript/tool updates once the load/resume - response has completed. Sending replay notifications while the request is - still in-flight can make the server look correct in logs while the editor - drops or fails to attach the tool-call history. - """ - loop = asyncio.get_running_loop() - replay_coro = self._replay_session_history(state) - loop.call_soon(asyncio.create_task, replay_coro) - async def load_session( self, cwd: str, @@ -1016,7 +1007,30 @@ class HermesACPAgent(acp.Agent): return None await self._register_session_mcp_servers(state, mcp_servers) logger.info("Loaded session %s", session_id) - self._schedule_history_replay(state) + # Per ACP spec, `session/load` must stream the prior conversation back + # to the client via `session/update` notifications BEFORE responding, + # so the client receives the full transcript within the load request's + # lifetime. Awaiting the replay here matches Codex / Claude Code / + # OpenCode / Pi and the Zed client (which registers the session-update + # routing entry before awaiting the loadSession RPC specifically so + # in-call history replay updates can find the thread). Deferring this + # via `loop.call_soon` (as we did briefly in May 2026) broke every + # spec-compliant ACP client that measures notifications synchronously + # against the load response — see #12285 follow-up. + try: + await self._replay_session_history(state) + except Exception: + # Replay is best-effort — a corrupted or unexpected message shape + # must not turn a successful session/load into a JSON-RPC error + # response. Per-notification failures are already caught inside + # ``_replay_session_history``; this outer guard covers anything + # raised by the helpers themselves before reaching ``_send``. + logger.warning( + "ACP history replay raised during session/load for %s — " + "load will still succeed, partial transcript may be missing", + session_id, + exc_info=True, + ) self._schedule_available_commands_update(session_id) self._schedule_usage_update(state) return LoadSessionResponse(models=self._build_model_state(state)) @@ -1034,7 +1048,18 @@ class HermesACPAgent(acp.Agent): state = self.session_manager.create_session(cwd=cwd) await self._register_session_mcp_servers(state, mcp_servers) logger.info("Resumed session %s", state.session_id) - self._schedule_history_replay(state) + # See `load_session` above for the spec rationale — replay must + # complete before the response so clients receive the full transcript + # within the request's lifetime. + try: + await self._replay_session_history(state) + except Exception: + logger.warning( + "ACP history replay raised during session/resume for %s — " + "resume will still succeed, partial transcript may be missing", + state.session_id, + exc_info=True, + ) self._schedule_available_commands_update(state.session_id) self._schedule_usage_update(state) return ResumeSessionResponse(models=self._build_model_state(state)) diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py index a7bd0367b67..65dd6fd6b72 100644 --- a/tests/acp/test_server.py +++ b/tests/acp/test_server.py @@ -673,25 +673,91 @@ class TestSessionOps: ] @pytest.mark.asyncio - async def test_load_session_schedules_history_replay_after_response(self, agent): - """Zed only attaches replayed updates after session/load has completed.""" + async def test_load_session_replays_history_before_returning_response(self, agent): + """Per ACP spec, replay must complete BEFORE load_session returns. + + Spec-compliant ACP clients (Codex, Claude Code, OpenCode, Pi, Zed) + attach their ``session/update`` listeners before awaiting the + ``loadSession`` RPC and rely on receiving the full transcript within + the request's lifetime. Deferring replay via ``loop.call_soon`` (the + prior behavior in May 2026) broke clients that read notification + counts synchronously against the load response — see #12285 follow-up. + """ new_resp = await agent.new_session(cwd="/tmp") state = agent.session_manager.get_session(new_resp.session_id) state.history = [{"role": "user", "content": "hello from history"}] - events = [] + events: list[str] = [] - async def replay_after_response(_state): + async def replay_records(_state): events.append("replay") - with patch.object(agent, "_replay_session_history", side_effect=replay_after_response): + with patch.object(agent, "_replay_session_history", side_effect=replay_records): resp = await agent.load_session(cwd="/tmp", session_id=new_resp.session_id) events.append("returned") assert isinstance(resp, LoadSessionResponse) - assert events == ["returned"] - await asyncio.sleep(0) - await asyncio.sleep(0) - assert events == ["returned", "replay"] + # Replay must have happened BEFORE the response was constructed — + # i.e. before the `events.append("returned")` after the await resolves. + assert events == ["replay", "returned"] + + @pytest.mark.asyncio + async def test_resume_session_replays_history_before_returning_response(self, agent): + """Same spec rationale as ``load_session`` — replay before responding.""" + new_resp = await agent.new_session(cwd="/tmp") + state = agent.session_manager.get_session(new_resp.session_id) + state.history = [{"role": "user", "content": "hello from history"}] + events: list[str] = [] + + async def replay_records(_state): + events.append("replay") + + with patch.object(agent, "_replay_session_history", side_effect=replay_records): + resp = await agent.resume_session(cwd="/tmp", session_id=new_resp.session_id) + events.append("returned") + + assert isinstance(resp, ResumeSessionResponse) + assert events == ["replay", "returned"] + + @pytest.mark.asyncio + async def test_load_session_survives_replay_helper_exception(self, agent, caplog): + """A replay helper raising must not turn load_session into an error. + + With awaited replay, an exception in ``_replay_session_history`` now + propagates into the ``load_session`` handler. The defensive try/except + guard at the call site must catch and log it so the JSON-RPC client + still receives a ``LoadSessionResponse`` — partial transcripts are + acceptable, total load failure is not. + """ + new_resp = await agent.new_session(cwd="/tmp") + state = agent.session_manager.get_session(new_resp.session_id) + state.history = [{"role": "user", "content": "hi"}] + + async def boom(_state): + raise RuntimeError("simulated replay helper crash") + + with caplog.at_level("WARNING", logger="acp_adapter.server"): + with patch.object(agent, "_replay_session_history", side_effect=boom): + resp = await agent.load_session(cwd="/tmp", session_id=new_resp.session_id) + + assert isinstance(resp, LoadSessionResponse) + assert "history replay raised during session/load" in caplog.text + + @pytest.mark.asyncio + async def test_resume_session_survives_replay_helper_exception(self, agent, caplog): + """Same guarantee as ``load_session`` for the resume path.""" + new_resp = await agent.new_session(cwd="/tmp") + state = agent.session_manager.get_session(new_resp.session_id) + state.history = [{"role": "user", "content": "hi"}] + + async def boom(_state): + raise RuntimeError("simulated replay helper crash") + + with caplog.at_level("WARNING", logger="acp_adapter.server"): + with patch.object(agent, "_replay_session_history", side_effect=boom): + resp = await agent.resume_session(cwd="/tmp", session_id=new_resp.session_id) + + assert isinstance(resp, ResumeSessionResponse) + assert "history replay raised during session/resume" in caplog.text @pytest.mark.asyncio async def test_resume_session_creates_new_if_missing(self, agent):