fix(acp): replay session history on load

This commit is contained in:
Henkey 2026-04-29 23:30:01 +01:00 committed by Teknium
parent 5d253e65b7
commit d2536a72bf
2 changed files with 143 additions and 0 deletions

View file

@ -6,6 +6,7 @@ import asyncio
import contextvars
import logging
import os
import uuid
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Deque, Optional
@ -13,6 +14,7 @@ from typing import Any, Deque, Optional
import acp
from acp.schema import (
AgentCapabilities,
AgentMessageChunk,
AuthenticateResponse,
AvailableCommand,
AvailableCommandsUpdate,
@ -45,6 +47,7 @@ from acp.schema import (
TextContentBlock,
UnstructuredCommandInput,
Usage,
UserMessageChunk,
)
# AuthMethodAgent was renamed from AuthMethod in agent-client-protocol 0.9.0
@ -377,6 +380,88 @@ class HermesACPAgent(acp.Agent):
# ---- Session management -------------------------------------------------
@staticmethod
def _history_message_text(message: dict[str, Any]) -> str:
"""Extract displayable text from a persisted OpenAI-style message."""
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
parts: list[str] = []
for item in content:
if isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
parts.append(text)
elif item.get("type") == "text" and isinstance(item.get("content"), str):
parts.append(item["content"])
elif isinstance(item, str):
parts.append(item)
return "\n".join(part.strip() for part in parts if part and part.strip()).strip()
return ""
@staticmethod
def _history_message_update(
*,
session_id: str,
index: int,
role: str,
text: str,
) -> UserMessageChunk | AgentMessageChunk | None:
"""Build an ACP history replay update for a user/assistant message."""
message_id = str(uuid.uuid5(uuid.NAMESPACE_URL, f"hermes-acp:{session_id}:{index}:{role}"))
block = TextContentBlock(type="text", text=text)
if role == "user":
return UserMessageChunk(
session_update="user_message_chunk",
content=block,
message_id=message_id,
)
if role == "assistant":
return AgentMessageChunk(
session_update="agent_message_chunk",
content=block,
message_id=message_id,
)
return None
async def _replay_session_history(self, state: SessionState) -> None:
"""Send persisted user/assistant history to clients during session/load.
Zed's ACP history UI calls ``session/load`` after the user picks an item
from the Agents sidebar. The agent must then replay the full conversation
as ``user_message_chunk`` / ``agent_message_chunk`` notifications; merely
restoring server-side state makes Hermes remember context, but leaves the
editor looking like a clean thread.
"""
if not self._conn or not state.history:
return
for index, message in enumerate(state.history):
role = str(message.get("role") or "")
if role not in {"user", "assistant"}:
continue
text = self._history_message_text(message)
if not text:
continue
update = self._history_message_update(
session_id=state.session_id,
index=index,
role=role,
text=text,
)
if update is None:
continue
try:
await self._conn.session_update(session_id=state.session_id, update=update)
except Exception:
logger.warning(
"Failed to replay ACP history for session %s",
state.session_id,
exc_info=True,
)
return
async def new_session(
self,
cwd: str,
@ -405,6 +490,7 @@ class HermesACPAgent(acp.Agent):
return None
await self._register_session_mcp_servers(state, mcp_servers)
logger.info("Loaded session %s", session_id)
await self._replay_session_history(state)
self._schedule_available_commands_update(session_id)
return LoadSessionResponse(models=self._build_model_state(state))
@ -421,6 +507,7 @@ class HermesACPAgent(acp.Agent):
state = self.session_manager.create_session(cwd=cwd)
await self._register_session_mcp_servers(state, mcp_servers)
logger.info("Resumed session %s", state.session_id)
await self._replay_session_history(state)
self._schedule_available_commands_update(state.session_id)
return ResumeSessionResponse(models=self._build_model_state(state))

View file

@ -11,6 +11,7 @@ import acp
from acp.agent.router import build_agent_router
from acp.schema import (
AgentCapabilities,
AgentMessageChunk,
AuthenticateResponse,
AvailableCommandsUpdate,
Implementation,
@ -27,6 +28,7 @@ from acp.schema import (
SessionInfo,
TextContentBlock,
Usage,
UserMessageChunk,
)
from acp_adapter.server import HermesACPAgent, HERMES_VERSION
from acp_adapter.session import SessionManager
@ -224,6 +226,60 @@ class TestSessionOps:
resp = await agent.load_session(cwd="/tmp", session_id="bogus")
assert resp is None
@pytest.mark.asyncio
async def test_load_session_replays_persisted_history_to_client(self, agent):
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
new_resp = await agent.new_session(cwd="/tmp")
state = agent.session_manager.get_session(new_resp.session_id)
state.history = [
{"role": "system", "content": "hidden system"},
{"role": "user", "content": "what controls the / slash commands?"},
{"role": "assistant", "content": "HermesACPAgent._ADVERTISED_COMMANDS controls them."},
{"role": "tool", "content": "tool output should not replay"},
]
mock_conn.session_update.reset_mock()
resp = await agent.load_session(cwd="/tmp", session_id=new_resp.session_id)
assert isinstance(resp, LoadSessionResponse)
calls = mock_conn.session_update.await_args_list
replay_calls = [
call for call in calls
if getattr(call.kwargs.get("update"), "session_update", None)
in {"user_message_chunk", "agent_message_chunk"}
]
assert len(replay_calls) == 2
assert isinstance(replay_calls[0].kwargs["update"], UserMessageChunk)
assert replay_calls[0].kwargs["update"].content.text == "what controls the / slash commands?"
assert replay_calls[0].kwargs["update"].message_id
assert isinstance(replay_calls[1].kwargs["update"], AgentMessageChunk)
assert replay_calls[1].kwargs["update"].content.text.startswith("HermesACPAgent")
assert replay_calls[1].kwargs["update"].message_id
@pytest.mark.asyncio
async def test_resume_session_replays_persisted_history_to_client(self, agent):
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
new_resp = await agent.new_session(cwd="/tmp")
state = agent.session_manager.get_session(new_resp.session_id)
state.history = [{"role": "user", "content": "So tell me the current state"}]
mock_conn.session_update.reset_mock()
resp = await agent.resume_session(cwd="/tmp", session_id=new_resp.session_id)
assert isinstance(resp, ResumeSessionResponse)
updates = [call.kwargs["update"] for call in mock_conn.session_update.await_args_list]
assert any(
isinstance(update, UserMessageChunk)
and update.content.text == "So tell me the current state"
for update in updates
)
@pytest.mark.asyncio
async def test_resume_session_creates_new_if_missing(self, agent):
resume_resp = await agent.resume_session(cwd="/tmp", session_id="nonexistent")