diff --git a/gateway/run.py b/gateway/run.py index 7b9622485..294e71287 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7816,6 +7816,11 @@ class GatewayRunner: # response, just without the typing indicator. _adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True) _effective_cursor = _scfg.cursor if _adapter_supports_edit else "" + # Some Matrix clients render the streaming cursor + # as a visible tofu/white-box artifact. Keep + # streaming text on Matrix, but suppress the cursor. + if source.platform == Platform.MATRIX: + _effective_cursor = "" _consumer_cfg = StreamConsumerConfig( edit_interval=_scfg.edit_interval, buffer_threshold=_scfg.buffer_threshold, diff --git a/tests/gateway/test_run_progress_topics.py b/tests/gateway/test_run_progress_topics.py index c1dda60b5..7859edd74 100644 --- a/tests/gateway/test_run_progress_topics.py +++ b/tests/gateway/test_run_progress_topics.py @@ -378,6 +378,25 @@ class PreviewedResponseAgent: } +class StreamingRefineAgent: + def __init__(self, **kwargs): + self.stream_delta_callback = kwargs.get("stream_delta_callback") + self.tools = [] + + def run_conversation(self, message, conversation_history=None, task_id=None): + if self.stream_delta_callback: + self.stream_delta_callback("Continuing to refine:") + time.sleep(0.1) + if self.stream_delta_callback: + self.stream_delta_callback(" Final answer.") + return { + "final_response": "Continuing to refine: Final answer.", + "response_previewed": True, + "messages": [], + "api_calls": 1, + } + + class QueuedCommentaryAgent: calls = 0 @@ -425,6 +444,10 @@ async def _run_with_agent( session_id, pending_text=None, config_data=None, + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", ): if config_data: import yaml @@ -439,7 +462,7 @@ async def _run_with_agent( fake_run_agent.AIAgent = agent_cls monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) - adapter = ProgressCaptureAdapter() + adapter = ProgressCaptureAdapter(platform=platform) runner = _make_runner(adapter) gateway_run = importlib.import_module("gateway.run") if config_data and "streaming" in config_data: @@ -447,12 +470,14 @@ async def _run_with_agent( monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"}) source = SessionSource( - platform=Platform.TELEGRAM, - chat_id="-1001", - chat_type="group", - thread_id="17585", + platform=platform, + chat_id=chat_id, + chat_type=chat_type, + thread_id=thread_id, ) - session_key = "agent:main:telegram:group:-1001:17585" + session_key = f"agent:main:{platform.value}:{chat_type}:{chat_id}" + if thread_id: + session_key = f"{session_key}:{thread_id}" if pending_text is not None: adapter._pending_messages[session_key] = MessageEvent( text=pending_text, @@ -580,6 +605,30 @@ async def test_run_agent_previewed_final_marks_already_sent(monkeypatch, tmp_pat assert [call["content"] for call in adapter.sent] == ["You're welcome."] +@pytest.mark.asyncio +async def test_run_agent_matrix_streaming_omits_cursor(monkeypatch, tmp_path): + adapter, result = await _run_with_agent( + monkeypatch, + tmp_path, + StreamingRefineAgent, + session_id="sess-matrix-streaming", + config_data={ + "display": {"tool_progress": "off", "interim_assistant_messages": False}, + "streaming": {"enabled": True, "edit_interval": 0.01, "buffer_threshold": 1}, + }, + platform=Platform.MATRIX, + chat_id="!room:matrix.example.org", + chat_type="group", + thread_id="$thread", + ) + + assert result.get("already_sent") is True + all_text = [call["content"] for call in adapter.sent] + [call["content"] for call in adapter.edits] + assert all_text, "expected streamed Matrix content to be sent or edited" + assert all("▉" not in text for text in all_text) + assert any("Continuing to refine:" in text for text in all_text) + + @pytest.mark.asyncio async def test_run_agent_queued_message_does_not_treat_commentary_as_final(monkeypatch, tmp_path): QueuedCommentaryAgent.calls = 0