diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 56aebf848b3..f5fc0d12a31 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -4047,7 +4047,6 @@ def run_conversation( model=agent.model, platform=getattr(agent, "platform", None) or "", ) - _response_transformed = False for _hook_result in _transform_results: if isinstance(_hook_result, str) and _hook_result: final_response = _hook_result diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 8d74d3be8cf..4ba65ddf4c5 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -197,11 +197,6 @@ class GatewayStreamConsumer: """The Discord/chat message ID of the last-sent or edited message.""" return self._message_id - @property - def accumulated_text(self) -> str: - """The accumulated streamed text (without think-block content).""" - return self._accumulated - @property def final_content_delivered(self) -> bool: """True when the final response content reached the user, even if diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index 8f218dfc11c..5b7dfb821b0 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -942,6 +942,62 @@ async def test_run_agent_matrix_streaming_omits_cursor(monkeypatch, tmp_path): assert any("Continuing to refine:" in text for text in all_text) +class TransformedStreamAgent: + """Streams a response, then signals the gateway that a plugin hook + (``transform_llm_output``) modified the final text after streaming + finished. ``run_conversation`` returns ``response_transformed=True`` + plus a ``final_response`` that diverges from what was streamed. + """ + + def __init__(self, **kwargs): + self.stream_delta_callback = kwargs.get("stream_delta_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.stream_delta_callback: + self.stream_delta_callback("original answer") + return { + "final_response": "original answer\n\n[plugin appended this]", + "response_previewed": True, + "response_transformed": True, + "messages": [], + "api_calls": 1, + } + + +@pytest.mark.asyncio +async def test_transformed_response_edits_streamed_message_in_place(monkeypatch, tmp_path): + """When a transform_llm_output hook modifies the response after streaming, + the gateway must edit the existing streamed message in place with the full + transformed content (so plugins like content filters / appenders reach the + user) and still mark already_sent=True (no duplicate send). + """ + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + TransformedStreamAgent, + session_id="sess-transformed-stream", + config_data={ + "display": {"tool_progress": "off", "interim_assistant_messages": False}, + "streaming": {"enabled": True, "edit_interval": 0.01, "buffer_threshold": 1}, + }, + platform=Platform.MATRIX, + chat_id="!room:matrix.example.org", + chat_type="group", + thread_id="$thread", + adapter_cls=MetadataEditProgressCaptureAdapter, + ) + + # Final delivery happened (no duplicate send fallback). + assert result.get("already_sent") is True + # The transformed final text reached the user — appended portion is present + # in an edit_message call (not just in the streamed sends). + edited_texts = [e["content"] for e in adapter.edits] + assert any("[plugin appended this]" in text for text in edited_texts), ( + f"expected transformed text in adapter.edits, got: {edited_texts!r}" + ) + + @pytest.mark.asyncio async def test_run_agent_queued_message_does_not_treat_commentary_as_final(monkeypatch, tmp_path): QueuedCommentaryAgent.calls = 0