mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(acp): wire stream_delta_callback for real-time ACP text delta streaming
Root cause: ACP turns emit partial session/update progress (thinking spinners, tool events) then go silent for the entire duration of run_conversation() because stream_delta_callback was never wired in the ACP adapter. The LLM's streamed text deltas were silently discarded since _fire_stream_delta() only dispatches to stream_delta_callback and _stream_callback — neither of which ACP set. Fix: Add make_stream_delta_cb() factory that forwards each text delta to conn.session_update() as an agent_message_chunk, and wire it as agent.stream_delta_callback in prompt(). This gives ACP clients (CAR/PMA/VS Code) real-time token-by-token progress during LLM streaming, eliminating the multi-minute silent gap before final prompt_return delivery. Also suppresses the final_response session_update when stream_delta_cb is active to avoid sending duplicate text to clients that append incremental chunks (P1 from automated review). Fixes #9
This commit is contained in:
parent
b05d30418d
commit
cb792b9f1e
4 changed files with 106 additions and 8 deletions
|
|
@ -192,3 +192,19 @@ def make_message_cb(
|
|||
_send_update(conn, session_id, loop, update)
|
||||
|
||||
return _message
|
||||
|
||||
|
||||
def make_stream_delta_cb(
|
||||
conn: acp.Client,
|
||||
session_id: str,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
) -> Callable:
|
||||
"""Create a callback that forwards live LLM text deltas to ACP."""
|
||||
|
||||
def _stream_delta(text: str) -> None:
|
||||
if not text:
|
||||
return
|
||||
update = acp.update_agent_message_text(text)
|
||||
_send_update(conn, session_id, loop, update)
|
||||
|
||||
return _stream_delta
|
||||
|
|
|
|||
|
|
@ -55,6 +55,7 @@ from acp_adapter.auth import detect_provider, has_provider
|
|||
from acp_adapter.events import (
|
||||
make_message_cb,
|
||||
make_step_cb,
|
||||
make_stream_delta_cb,
|
||||
make_thinking_cb,
|
||||
make_tool_progress_cb,
|
||||
)
|
||||
|
|
@ -503,12 +504,14 @@ class HermesACPAgent(acp.Agent):
|
|||
thinking_cb = make_thinking_cb(conn, session_id, loop)
|
||||
step_cb = make_step_cb(conn, session_id, loop, tool_call_ids, tool_call_meta)
|
||||
message_cb = make_message_cb(conn, session_id, loop)
|
||||
stream_delta_cb = make_stream_delta_cb(conn, session_id, loop)
|
||||
approval_cb = make_approval_callback(conn.request_permission, loop, session_id)
|
||||
else:
|
||||
tool_progress_cb = None
|
||||
thinking_cb = None
|
||||
step_cb = None
|
||||
message_cb = None
|
||||
stream_delta_cb = None
|
||||
approval_cb = None
|
||||
|
||||
agent = state.agent
|
||||
|
|
@ -516,6 +519,7 @@ class HermesACPAgent(acp.Agent):
|
|||
agent.thinking_callback = thinking_cb
|
||||
agent.step_callback = step_cb
|
||||
agent.message_callback = message_cb
|
||||
agent.stream_delta_callback = stream_delta_cb
|
||||
|
||||
if approval_cb:
|
||||
try:
|
||||
|
|
@ -569,7 +573,10 @@ class HermesACPAgent(acp.Agent):
|
|||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to auto-title ACP session %s", session_id, exc_info=True)
|
||||
if final_response and conn:
|
||||
# When stream_delta_callback is active, the client already received
|
||||
# every text delta incrementally — sending final_response again would
|
||||
# duplicate the output for clients that append chunks.
|
||||
if final_response and conn and not stream_delta_cb:
|
||||
update = acp.update_agent_message_text(final_response)
|
||||
await conn.session_update(session_id, update)
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ from acp.schema import ToolCallStart, ToolCallProgress, AgentThoughtChunk, Agent
|
|||
from acp_adapter.events import (
|
||||
make_message_cb,
|
||||
make_step_cb,
|
||||
make_stream_delta_cb,
|
||||
make_thinking_cb,
|
||||
make_tool_progress_cb,
|
||||
)
|
||||
|
|
@ -325,3 +326,30 @@ class TestMessageCallback:
|
|||
cb("")
|
||||
|
||||
mock_rcts.assert_not_called()
|
||||
|
||||
|
||||
class TestStreamDeltaCallback:
|
||||
def test_emits_agent_message_chunk(self, mock_conn, event_loop_fixture):
|
||||
"""Stream delta callback should emit AgentMessageChunk."""
|
||||
loop = event_loop_fixture
|
||||
|
||||
cb = make_stream_delta_cb(mock_conn, "session-1", loop)
|
||||
|
||||
with patch("acp_adapter.events._send_update") as mock_send:
|
||||
cb("Partial")
|
||||
|
||||
mock_send.assert_called_once()
|
||||
update = mock_send.call_args.args[3]
|
||||
assert update.session_update == "agent_message_chunk"
|
||||
assert update.content.text == "Partial"
|
||||
|
||||
def test_ignores_empty_message(self, mock_conn, event_loop_fixture):
|
||||
"""Empty stream deltas should not emit any update."""
|
||||
loop = event_loop_fixture
|
||||
|
||||
cb = make_stream_delta_cb(mock_conn, "session-1", loop)
|
||||
|
||||
with patch("acp_adapter.events._send_update") as mock_send:
|
||||
cb("")
|
||||
|
||||
mock_send.assert_not_called()
|
||||
|
|
|
|||
|
|
@ -424,9 +424,50 @@ class TestPrompt:
|
|||
|
||||
assert state.history == expected_history
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prompt_wires_stream_delta_callback(self, agent):
|
||||
"""prompt() should bridge live text deltas through ACP session updates."""
|
||||
new_resp = await agent.new_session(cwd=".")
|
||||
state = agent.session_manager.get_session(new_resp.session_id)
|
||||
|
||||
def _run_conversation(**kwargs):
|
||||
assert callable(state.agent.stream_delta_callback)
|
||||
state.agent.stream_delta_callback("live chunk")
|
||||
return {
|
||||
"final_response": "final message",
|
||||
"messages": [],
|
||||
}
|
||||
|
||||
state.agent.run_conversation = MagicMock(side_effect=_run_conversation)
|
||||
|
||||
mock_conn = MagicMock(spec=acp.Client)
|
||||
mock_conn.session_update = AsyncMock()
|
||||
agent._conn = mock_conn
|
||||
|
||||
prompt = [TextContentBlock(type="text", text="stream this")]
|
||||
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
|
||||
|
||||
# Stream delta should have been sent as agent_message_chunk
|
||||
streamed_updates = [
|
||||
call[1].get("update") or call[0][1]
|
||||
for call in mock_conn.session_update.call_args_list
|
||||
if (call[1].get("update") or call[0][1]).session_update == "agent_message_chunk"
|
||||
]
|
||||
assert any(update.content.text == "live chunk" for update in streamed_updates)
|
||||
|
||||
# Final response should NOT be sent separately when stream_delta_cb is active
|
||||
final_texts = [u.content.text for u in streamed_updates if u.content.text == "final message"]
|
||||
assert len(final_texts) == 0, "Final response should be suppressed when streaming"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prompt_sends_final_message_update(self, agent):
|
||||
"""The final response should be sent as an AgentMessageChunk."""
|
||||
"""Final response is suppressed when stream_delta_callback is active.
|
||||
|
||||
When streaming is wired (the normal ACP path with a connected
|
||||
client), the client receives every text delta incrementally via
|
||||
stream_delta_callback, so sending final_response again would
|
||||
duplicate the output. Verify the suppression works.
|
||||
"""
|
||||
new_resp = await agent.new_session(cwd=".")
|
||||
state = agent.session_manager.get_session(new_resp.session_id)
|
||||
|
||||
|
|
@ -442,12 +483,18 @@ class TestPrompt:
|
|||
prompt = [TextContentBlock(type="text", text="help me")]
|
||||
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
|
||||
|
||||
# session_update should have been called with the final message
|
||||
mock_conn.session_update.assert_called()
|
||||
# Get the last call's update argument
|
||||
last_call = mock_conn.session_update.call_args_list[-1]
|
||||
update = last_call[1].get("update") or last_call[0][1]
|
||||
assert update.session_update == "agent_message_chunk"
|
||||
# With stream_delta_cb active, final_response must NOT appear as
|
||||
# a separate agent_message_chunk — it was already delivered via deltas.
|
||||
all_updates = [
|
||||
call[1].get("update") or call[0][1]
|
||||
for call in mock_conn.session_update.call_args_list
|
||||
if hasattr(call[1].get("update") or call[0][1], "session_update")
|
||||
and (call[1].get("update") or call[0][1]).session_update == "agent_message_chunk"
|
||||
]
|
||||
final_chunks = [u for u in all_updates if u.content.text == "I can help with that!"]
|
||||
assert len(final_chunks) == 0, (
|
||||
"final_response should be suppressed when stream_delta_cb is active"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prompt_auto_titles_session(self, agent):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue