This commit is contained in:
David Zhang 2026-04-24 17:27:52 -05:00 committed by GitHub
commit a4ab79bc09
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 106 additions and 8 deletions

View file

@ -192,3 +192,19 @@ def make_message_cb(
_send_update(conn, session_id, loop, update)
return _message
def make_stream_delta_cb(
conn: acp.Client,
session_id: str,
loop: asyncio.AbstractEventLoop,
) -> Callable:
"""Create a callback that forwards live LLM text deltas to ACP."""
def _stream_delta(text: str) -> None:
if not text:
return
update = acp.update_agent_message_text(text)
_send_update(conn, session_id, loop, update)
return _stream_delta

View file

@ -56,6 +56,7 @@ from acp_adapter.auth import detect_provider
from acp_adapter.events import (
make_message_cb,
make_step_cb,
make_stream_delta_cb,
make_thinking_cb,
make_tool_progress_cb,
)
@ -545,12 +546,14 @@ class HermesACPAgent(acp.Agent):
thinking_cb = make_thinking_cb(conn, session_id, loop)
step_cb = make_step_cb(conn, session_id, loop, tool_call_ids, tool_call_meta)
message_cb = make_message_cb(conn, session_id, loop)
stream_delta_cb = make_stream_delta_cb(conn, session_id, loop)
approval_cb = make_approval_callback(conn.request_permission, loop, session_id)
else:
tool_progress_cb = None
thinking_cb = None
step_cb = None
message_cb = None
stream_delta_cb = None
approval_cb = None
agent = state.agent
@ -558,6 +561,7 @@ class HermesACPAgent(acp.Agent):
agent.thinking_callback = thinking_cb
agent.step_callback = step_cb
agent.message_callback = message_cb
agent.stream_delta_callback = stream_delta_cb
# Approval callback is per-thread (thread-local, GHSA-qg5c-hvr5-hjgr).
# Set it INSIDE _run_agent so the TLS write happens in the executor
@ -633,7 +637,10 @@ class HermesACPAgent(acp.Agent):
)
except Exception:
logger.debug("Failed to auto-title ACP session %s", session_id, exc_info=True)
if final_response and conn:
# When stream_delta_callback is active, the client already received
# every text delta incrementally — sending final_response again would
# duplicate the output for clients that append chunks.
if final_response and conn and not stream_delta_cb:
update = acp.update_agent_message_text(final_response)
await conn.session_update(session_id, update)

View file

@ -12,6 +12,7 @@ from acp.schema import ToolCallStart, ToolCallProgress, AgentThoughtChunk, Agent
from acp_adapter.events import (
make_message_cb,
make_step_cb,
make_stream_delta_cb,
make_thinking_cb,
make_tool_progress_cb,
)
@ -325,3 +326,30 @@ class TestMessageCallback:
cb("")
mock_rcts.assert_not_called()
class TestStreamDeltaCallback:
def test_emits_agent_message_chunk(self, mock_conn, event_loop_fixture):
"""Stream delta callback should emit AgentMessageChunk."""
loop = event_loop_fixture
cb = make_stream_delta_cb(mock_conn, "session-1", loop)
with patch("acp_adapter.events._send_update") as mock_send:
cb("Partial")
mock_send.assert_called_once()
update = mock_send.call_args.args[3]
assert update.session_update == "agent_message_chunk"
assert update.content.text == "Partial"
def test_ignores_empty_message(self, mock_conn, event_loop_fixture):
"""Empty stream deltas should not emit any update."""
loop = event_loop_fixture
cb = make_stream_delta_cb(mock_conn, "session-1", loop)
with patch("acp_adapter.events._send_update") as mock_send:
cb("")
mock_send.assert_not_called()

View file

@ -493,9 +493,50 @@ class TestPrompt:
assert state.history == expected_history
@pytest.mark.asyncio
async def test_prompt_wires_stream_delta_callback(self, agent):
"""prompt() should bridge live text deltas through ACP session updates."""
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
def _run_conversation(**kwargs):
assert callable(state.agent.stream_delta_callback)
state.agent.stream_delta_callback("live chunk")
return {
"final_response": "final message",
"messages": [],
}
state.agent.run_conversation = MagicMock(side_effect=_run_conversation)
mock_conn = MagicMock(spec=acp.Client)
mock_conn.session_update = AsyncMock()
agent._conn = mock_conn
prompt = [TextContentBlock(type="text", text="stream this")]
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
# Stream delta should have been sent as agent_message_chunk
streamed_updates = [
call[1].get("update") or call[0][1]
for call in mock_conn.session_update.call_args_list
if (call[1].get("update") or call[0][1]).session_update == "agent_message_chunk"
]
assert any(update.content.text == "live chunk" for update in streamed_updates)
# Final response should NOT be sent separately when stream_delta_cb is active
final_texts = [u.content.text for u in streamed_updates if u.content.text == "final message"]
assert len(final_texts) == 0, "Final response should be suppressed when streaming"
@pytest.mark.asyncio
async def test_prompt_sends_final_message_update(self, agent):
"""The final response should be sent as an AgentMessageChunk."""
"""Final response is suppressed when stream_delta_callback is active.
When streaming is wired (the normal ACP path with a connected
client), the client receives every text delta incrementally via
stream_delta_callback, so sending final_response again would
duplicate the output. Verify the suppression works.
"""
new_resp = await agent.new_session(cwd=".")
state = agent.session_manager.get_session(new_resp.session_id)
@ -511,12 +552,18 @@ class TestPrompt:
prompt = [TextContentBlock(type="text", text="help me")]
await agent.prompt(prompt=prompt, session_id=new_resp.session_id)
# session_update should have been called with the final message
mock_conn.session_update.assert_called()
# Get the last call's update argument
last_call = mock_conn.session_update.call_args_list[-1]
update = last_call[1].get("update") or last_call[0][1]
assert update.session_update == "agent_message_chunk"
# With stream_delta_cb active, final_response must NOT appear as
# a separate agent_message_chunk — it was already delivered via deltas.
all_updates = [
call[1].get("update") or call[0][1]
for call in mock_conn.session_update.call_args_list
if hasattr(call[1].get("update") or call[0][1], "session_update")
and (call[1].get("update") or call[0][1]).session_update == "agent_message_chunk"
]
final_chunks = [u for u in all_updates if u.content.text == "I can help with that!"]
assert len(final_chunks) == 0, (
"final_response should be suppressed when stream_delta_cb is active"
)
@pytest.mark.asyncio
async def test_prompt_auto_titles_session(self, agent):