diff --git a/acp_adapter/events.py b/acp_adapter/events.py index 1257f902eb..e2842f8ce6 100644 --- a/acp_adapter/events.py +++ b/acp_adapter/events.py @@ -192,3 +192,19 @@ def make_message_cb( _send_update(conn, session_id, loop, update) return _message + + +def make_stream_delta_cb( + conn: acp.Client, + session_id: str, + loop: asyncio.AbstractEventLoop, +) -> Callable: + """Create a callback that forwards live LLM text deltas to ACP.""" + + def _stream_delta(text: str) -> None: + if not text: + return + update = acp.update_agent_message_text(text) + _send_update(conn, session_id, loop, update) + + return _stream_delta diff --git a/acp_adapter/server.py b/acp_adapter/server.py index 612748d568..66774e39c4 100644 --- a/acp_adapter/server.py +++ b/acp_adapter/server.py @@ -56,6 +56,7 @@ from acp_adapter.auth import detect_provider from acp_adapter.events import ( make_message_cb, make_step_cb, + make_stream_delta_cb, make_thinking_cb, make_tool_progress_cb, ) @@ -545,12 +546,14 @@ class HermesACPAgent(acp.Agent): thinking_cb = make_thinking_cb(conn, session_id, loop) step_cb = make_step_cb(conn, session_id, loop, tool_call_ids, tool_call_meta) message_cb = make_message_cb(conn, session_id, loop) + stream_delta_cb = make_stream_delta_cb(conn, session_id, loop) approval_cb = make_approval_callback(conn.request_permission, loop, session_id) else: tool_progress_cb = None thinking_cb = None step_cb = None message_cb = None + stream_delta_cb = None approval_cb = None agent = state.agent @@ -558,6 +561,7 @@ class HermesACPAgent(acp.Agent): agent.thinking_callback = thinking_cb agent.step_callback = step_cb agent.message_callback = message_cb + agent.stream_delta_callback = stream_delta_cb # Approval callback is per-thread (thread-local, GHSA-qg5c-hvr5-hjgr). # Set it INSIDE _run_agent so the TLS write happens in the executor @@ -633,7 +637,10 @@ class HermesACPAgent(acp.Agent): ) except Exception: logger.debug("Failed to auto-title ACP session %s", session_id, exc_info=True) - if final_response and conn: + # When stream_delta_callback is active, the client already received + # every text delta incrementally — sending final_response again would + # duplicate the output for clients that append chunks. + if final_response and conn and not stream_delta_cb: update = acp.update_agent_message_text(final_response) await conn.session_update(session_id, update) diff --git a/tests/acp/test_events.py b/tests/acp/test_events.py index c9f91a181e..9754c02df5 100644 --- a/tests/acp/test_events.py +++ b/tests/acp/test_events.py @@ -12,6 +12,7 @@ from acp.schema import ToolCallStart, ToolCallProgress, AgentThoughtChunk, Agent from acp_adapter.events import ( make_message_cb, make_step_cb, + make_stream_delta_cb, make_thinking_cb, make_tool_progress_cb, ) @@ -325,3 +326,30 @@ class TestMessageCallback: cb("") mock_rcts.assert_not_called() + + +class TestStreamDeltaCallback: + def test_emits_agent_message_chunk(self, mock_conn, event_loop_fixture): + """Stream delta callback should emit AgentMessageChunk.""" + loop = event_loop_fixture + + cb = make_stream_delta_cb(mock_conn, "session-1", loop) + + with patch("acp_adapter.events._send_update") as mock_send: + cb("Partial") + + mock_send.assert_called_once() + update = mock_send.call_args.args[3] + assert update.session_update == "agent_message_chunk" + assert update.content.text == "Partial" + + def test_ignores_empty_message(self, mock_conn, event_loop_fixture): + """Empty stream deltas should not emit any update.""" + loop = event_loop_fixture + + cb = make_stream_delta_cb(mock_conn, "session-1", loop) + + with patch("acp_adapter.events._send_update") as mock_send: + cb("") + + mock_send.assert_not_called() diff --git a/tests/acp/test_server.py b/tests/acp/test_server.py index d4afed101f..57e0dd5968 100644 --- a/tests/acp/test_server.py +++ b/tests/acp/test_server.py @@ -493,9 +493,50 @@ class TestPrompt: assert state.history == expected_history + @pytest.mark.asyncio + async def test_prompt_wires_stream_delta_callback(self, agent): + """prompt() should bridge live text deltas through ACP session updates.""" + new_resp = await agent.new_session(cwd=".") + state = agent.session_manager.get_session(new_resp.session_id) + + def _run_conversation(**kwargs): + assert callable(state.agent.stream_delta_callback) + state.agent.stream_delta_callback("live chunk") + return { + "final_response": "final message", + "messages": [], + } + + state.agent.run_conversation = MagicMock(side_effect=_run_conversation) + + mock_conn = MagicMock(spec=acp.Client) + mock_conn.session_update = AsyncMock() + agent._conn = mock_conn + + prompt = [TextContentBlock(type="text", text="stream this")] + await agent.prompt(prompt=prompt, session_id=new_resp.session_id) + + # Stream delta should have been sent as agent_message_chunk + streamed_updates = [ + call[1].get("update") or call[0][1] + for call in mock_conn.session_update.call_args_list + if (call[1].get("update") or call[0][1]).session_update == "agent_message_chunk" + ] + assert any(update.content.text == "live chunk" for update in streamed_updates) + + # Final response should NOT be sent separately when stream_delta_cb is active + final_texts = [u.content.text for u in streamed_updates if u.content.text == "final message"] + assert len(final_texts) == 0, "Final response should be suppressed when streaming" + @pytest.mark.asyncio async def test_prompt_sends_final_message_update(self, agent): - """The final response should be sent as an AgentMessageChunk.""" + """Final response is suppressed when stream_delta_callback is active. + + When streaming is wired (the normal ACP path with a connected + client), the client receives every text delta incrementally via + stream_delta_callback, so sending final_response again would + duplicate the output. Verify the suppression works. + """ new_resp = await agent.new_session(cwd=".") state = agent.session_manager.get_session(new_resp.session_id) @@ -511,12 +552,18 @@ class TestPrompt: prompt = [TextContentBlock(type="text", text="help me")] await agent.prompt(prompt=prompt, session_id=new_resp.session_id) - # session_update should have been called with the final message - mock_conn.session_update.assert_called() - # Get the last call's update argument - last_call = mock_conn.session_update.call_args_list[-1] - update = last_call[1].get("update") or last_call[0][1] - assert update.session_update == "agent_message_chunk" + # With stream_delta_cb active, final_response must NOT appear as + # a separate agent_message_chunk — it was already delivered via deltas. + all_updates = [ + call[1].get("update") or call[0][1] + for call in mock_conn.session_update.call_args_list + if hasattr(call[1].get("update") or call[0][1], "session_update") + and (call[1].get("update") or call[0][1]).session_update == "agent_message_chunk" + ] + final_chunks = [u for u in all_updates if u.content.text == "I can help with that!"] + assert len(final_chunks) == 0, ( + "final_response should be suppressed when stream_delta_cb is active" + ) @pytest.mark.asyncio async def test_prompt_auto_titles_session(self, agent):