fix(acp): replay assistant reasoning as agent_thought_chunk on session/load (#12285) (#26943)

Persisted assistant `reasoning_content` / `reasoning` fields are now emitted
as ACP `agent_thought_chunk` notifications during `_replay_session_history`,
so editor clients (Zed, etc.) rebuild collapsed Thinking panes when the user
re-opens a session that used a thinking model.

Ordering matches live streaming: thought precedes message text within the
same assistant turn, mirroring how `reasoning_callback` deltas arrive before
`stream_delta_callback` deltas in `events.py::make_thinking_cb` /
`make_message_cb`.

Behavior on non-reasoning histories is unchanged; the replay loop's existing
text / tool_call / tool_call_update / plan emission is preserved bit-for-bit.

Closes #12285.

Credit:
- @Yukipukii1 (#14691) — original thought-replay design via
  `acp.update_agent_thought_text`; the tool-call portion of that PR has
  since landed via #19139, but the reasoning replay is theirs.
- @HenkDz (#17652 / #18578) — established the `_replay_session_history` and
  `_history_*` helper conventions this builds on.
- @D1zzyDwarf (#16531) — also closed by this work.
This commit is contained in:
kshitij 2026-05-16 06:45:29 -07:00 committed by GitHub
parent a91a57fa5a
commit f3a4af9cf2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 277 additions and 22 deletions

View file

@ -18,6 +18,7 @@ import acp
from acp.schema import (
AgentCapabilities,
AgentMessageChunk,
AgentThoughtChunk,
AuthenticateResponse,
AvailableCommand,
AvailableCommandsUpdate,
@ -788,14 +789,20 @@ class HermesACPAgent(acp.Agent):
# ---- Session management -------------------------------------------------
@staticmethod
def _history_message_text(message: dict[str, Any]) -> str:
"""Extract displayable text from a persisted OpenAI-style message."""
content = message.get("content")
if isinstance(content, str):
return content.strip()
if isinstance(content, list):
def _flatten_history_text(value: Any) -> str:
"""Normalize a persisted text-or-text-parts value into a single string.
OpenAI-style assistant content (and provider reasoning fields) can arrive
as either a scalar string or a list of ``{"text": ...}`` /
``{"type": "text", "content": ...}`` parts. Whitespace-only inputs
collapse to an empty string so callers can treat ``""`` as "nothing to
emit".
"""
if isinstance(value, str):
return value.strip()
if isinstance(value, list):
parts: list[str] = []
for item in content:
for item in value:
if isinstance(item, dict):
text = item.get("text")
if isinstance(text, str):
@ -807,6 +814,29 @@ class HermesACPAgent(acp.Agent):
return "\n".join(part.strip() for part in parts if part and part.strip()).strip()
return ""
@classmethod
def _history_message_text(cls, message: dict[str, Any]) -> str:
"""Extract displayable text from a persisted OpenAI-style message."""
return cls._flatten_history_text(message.get("content"))
@classmethod
def _history_reasoning_text(cls, message: dict[str, Any]) -> str:
"""Extract displayable reasoning/thought text from a persisted assistant message.
Returns the first non-empty value among ``reasoning_content`` (the
canonical field used by DeepSeek / Moonshot and the post-#16892
chat-completions normalizer) and ``reasoning`` (used by the codex
event projector and several other transports). Both keys are
actively written by live code paths, so neither branch is
deprecated they cover different transports rather than old vs.
new sessions.
"""
for key in ("reasoning_content", "reasoning"):
text = cls._flatten_history_text(message.get(key))
if text:
return text
return ""
@staticmethod
def _history_message_update(
*,
@ -827,6 +857,11 @@ class HermesACPAgent(acp.Agent):
)
return None
@staticmethod
def _history_thought_update(text: str) -> AgentThoughtChunk:
"""Build an ACP history replay update for an assistant thought."""
return acp.update_agent_thought_text(text)
@staticmethod
def _history_tool_call_name_args(tool_call: dict[str, Any]) -> tuple[str, dict[str, Any]]:
"""Extract function name/arguments from an OpenAI-style tool_call."""
@ -858,9 +893,10 @@ class HermesACPAgent(acp.Agent):
Zed's ACP history UI calls ``session/load`` after the user picks an item
from the Agents sidebar. The agent must then replay the full conversation
as user/assistant chunks plus reconstructed tool-call start/completion
notifications; merely restoring server-side state makes Hermes remember
context, but leaves the editor looking like a clean thread.
as user/assistant chunks, thinking-mode thought chunks, plus reconstructed
tool-call start/completion notifications; merely restoring server-side
state makes Hermes remember context, but leaves the editor looking like a
clean thread.
"""
if not self._conn or not state.history:
return
@ -882,24 +918,37 @@ class HermesACPAgent(acp.Agent):
for message in state.history:
role = str(message.get("role") or "")
if role in {"user", "assistant"}:
if role == "user":
text = self._history_message_text(message)
if text:
update = self._history_message_update(role=role, text=text)
if update is not None and not await _send(update):
return
continue
if role == "assistant":
thought = self._history_reasoning_text(message)
if thought and not await _send(self._history_thought_update(thought)):
return
text = self._history_message_text(message)
if text:
update = self._history_message_update(role=role, text=text)
if update is not None and not await _send(update):
return
if role == "assistant" and isinstance(message.get("tool_calls"), list):
for tool_call in message["tool_calls"]:
if not isinstance(tool_call, dict):
continue
tool_call_id = self._history_tool_call_id(tool_call)
if not tool_call_id:
continue
tool_name, args = self._history_tool_call_name_args(tool_call)
active_tool_calls[tool_call_id] = (tool_name, args)
if not await _send(build_tool_start(tool_call_id, tool_name, args)):
return
tool_calls = message.get("tool_calls")
if isinstance(tool_calls, list):
for tool_call in tool_calls:
if not isinstance(tool_call, dict):
continue
tool_call_id = self._history_tool_call_id(tool_call)
if not tool_call_id:
continue
tool_name, args = self._history_tool_call_name_args(tool_call)
active_tool_calls[tool_call_id] = (tool_name, args)
if not await _send(build_tool_start(tool_call_id, tool_name, args)):
return
continue
if role == "tool":

View file

@ -13,6 +13,7 @@ from acp.schema import (
AgentCapabilities,
AgentMessageChunk,
AgentPlanUpdate,
AgentThoughtChunk,
AuthenticateResponse,
AvailableCommandsUpdate,
Implementation,
@ -466,6 +467,211 @@ class TestSessionOps:
for update in updates
)
@pytest.mark.asyncio
async def test_load_session_replays_reasoning_thought_before_message(self, agent):
"""Thinking-model thoughts must be replayed via ``agent_thought_chunk``.
Regression for #12285 — when a session is loaded, persisted assistant
``reasoning_content`` / ``reasoning`` fields must surface as ACP
``AgentThoughtChunk`` notifications in the same relative position they
had live (thought streams before the assistant message text), so Zed's
collapsed Thinking pane rebuilds instead of vanishing on reconnect.
"""
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
new_resp = await agent.new_session(cwd="/tmp")
state = agent.session_manager.get_session(new_resp.session_id)
state.history = [
{"role": "user", "content": "Walk me through it."},
{
"role": "assistant",
"reasoning_content": "Let me think step by step about the request.",
"content": "Here is the plan.",
},
{"role": "user", "content": "And the legacy case?"},
{
"role": "assistant",
# No reasoning_content — exercise the legacy "reasoning" fallback
# path so sessions persisted before #16892 still replay thoughts.
"reasoning": "Older sessions stored the trace under the internal key.",
"content": "Same idea, older field name.",
},
]
mock_conn.session_update.reset_mock()
resp = await agent.load_session(cwd="/tmp", session_id=new_resp.session_id)
await asyncio.sleep(0)
await asyncio.sleep(0)
assert isinstance(resp, LoadSessionResponse)
replay_kinds = [
getattr(call.kwargs.get("update"), "session_update", None)
for call in mock_conn.session_update.await_args_list
if getattr(call.kwargs.get("update"), "session_update", None)
in {"user_message_chunk", "agent_message_chunk", "agent_thought_chunk"}
]
assert replay_kinds == [
"user_message_chunk",
"agent_thought_chunk",
"agent_message_chunk",
"user_message_chunk",
"agent_thought_chunk",
"agent_message_chunk",
]
thought_updates = [
call.kwargs["update"]
for call in mock_conn.session_update.await_args_list
if isinstance(call.kwargs.get("update"), AgentThoughtChunk)
]
assert len(thought_updates) == 2
assert thought_updates[0].content.text == "Let me think step by step about the request."
assert thought_updates[1].content.text == "Older sessions stored the trace under the internal key."
@pytest.mark.asyncio
async def test_load_session_replays_reasoning_only_turn(self, agent):
"""Assistant turns with reasoning but no content should still emit a thought.
Pure reasoning-only assistant entries (e.g. a thinking step before a
tool-call turn) commonly carry ``reasoning_content`` with empty
``content``. The replay must still surface the thought so the editor's
Thinking pane rebuilds, even when there is no message text to follow.
"""
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
new_resp = await agent.new_session(cwd="/tmp")
state = agent.session_manager.get_session(new_resp.session_id)
state.history = [
{
"role": "assistant",
"reasoning_content": "I should call the search tool next.",
"content": "",
},
]
mock_conn.session_update.reset_mock()
await agent.load_session(cwd="/tmp", session_id=new_resp.session_id)
await asyncio.sleep(0)
await asyncio.sleep(0)
thought_updates = [
call.kwargs["update"]
for call in mock_conn.session_update.await_args_list
if isinstance(call.kwargs.get("update"), AgentThoughtChunk)
]
message_updates = [
call.kwargs["update"]
for call in mock_conn.session_update.await_args_list
if isinstance(call.kwargs.get("update"), AgentMessageChunk)
]
assert len(thought_updates) == 1
assert thought_updates[0].content.text == "I should call the search tool next."
assert message_updates == []
@pytest.mark.asyncio
async def test_load_session_skips_empty_reasoning_fields(self, agent):
"""Empty/whitespace reasoning fields must not produce notifications."""
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
new_resp = await agent.new_session(cwd="/tmp")
state = agent.session_manager.get_session(new_resp.session_id)
state.history = [
{
"role": "assistant",
"reasoning_content": "",
"reasoning": " \n\t",
"content": "Just a regular answer.",
},
]
mock_conn.session_update.reset_mock()
await agent.load_session(cwd="/tmp", session_id=new_resp.session_id)
await asyncio.sleep(0)
await asyncio.sleep(0)
thought_updates = [
call.kwargs["update"]
for call in mock_conn.session_update.await_args_list
if isinstance(call.kwargs.get("update"), AgentThoughtChunk)
]
assert thought_updates == []
@pytest.mark.asyncio
async def test_load_session_replays_thought_then_tool_call_without_message(self, agent):
"""Canonical thinking-model shape: reasoning + tool_call + no body text.
Thinking models commonly emit a pre-tool thought followed by a
tool_calls turn with empty ``content``. Replay must emit:
``agent_thought_chunk`` then ``tool_call`` then ``tool_call_update``
for the matching tool result and crucially, NO ``agent_message_chunk``
for the empty-text assistant body. Regression for the canonical
thinking-then-tool flow on #12285.
"""
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
new_resp = await agent.new_session(cwd="/tmp")
state = agent.session_manager.get_session(new_resp.session_id)
state.history = [
{"role": "user", "content": "Find the bug."},
{
"role": "assistant",
"reasoning_content": "I should grep for the function name first.",
"content": "",
"tool_calls": [
{
"id": "call_grep_1",
"type": "function",
"function": {
"name": "search_files",
"arguments": '{"pattern":"foo","path":"."}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_grep_1",
"content": '{"total_count":1,"matches":[{"path":"x.py","line":1,"content":"foo"}]}',
},
]
mock_conn.session_update.reset_mock()
await agent.load_session(cwd="/tmp", session_id=new_resp.session_id)
await asyncio.sleep(0)
await asyncio.sleep(0)
kinds = [
getattr(call.kwargs.get("update"), "session_update", None)
for call in mock_conn.session_update.await_args_list
if getattr(call.kwargs.get("update"), "session_update", None)
in {
"user_message_chunk",
"agent_thought_chunk",
"agent_message_chunk",
"tool_call",
"tool_call_update",
}
]
# No agent_message_chunk for the empty-content assistant turn.
assert "agent_message_chunk" not in kinds
# Thought must precede the tool_call_start within the assistant turn,
# and the tool result follows.
assert kinds == [
"user_message_chunk",
"agent_thought_chunk",
"tool_call",
"tool_call_update",
]
@pytest.mark.asyncio
async def test_load_session_schedules_history_replay_after_response(self, agent):
"""Zed only attaches replayed updates after session/load has completed."""