From 6dd188d7861e3c18c262cad4ef8eb3b13dcd0b5f Mon Sep 17 00:00:00 2001 From: jasonQin6 <39369769+jasonQin6@users.noreply.github.com> Date: Tue, 30 Jun 2026 03:03:03 -0700 Subject: [PATCH] fix(gateway): add session staleness guard to stream consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GatewayStreamConsumer.run() processed queued deltas in an infinite loop with no check on whether the session was still current. On /new or /stop mid-stream, the consumer kept editing and delivering stale response fragments alongside the 'Session reset!' ack. PR #11016 (b7bdf32d) fixed the runner side via sentinel promotion/release but left the stream consumer unguarded. Every other async callback in run.py already bails via _run_still_current(); the stream consumer was the only one missing it. - stream_consumer.py: optional run_still_current callback, checked at the top of the run() loop; returns early when the session is stale. - run.py: pass the existing _run_still_current closure at both call sites (proxy path and agent path). - tests: TestRunStillCurrentGuard — immediate staleness, mid-stream staleness, always-current, no-callback default, pending-finish. Co-authored-by: jasonQin6 <39369769+jasonQin6@users.noreply.github.com> --- gateway/run.py | 2 + gateway/stream_consumer.py | 12 +++ tests/gateway/test_stream_consumer.py | 134 ++++++++++++++++++++++++++ 3 files changed, 148 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index a919a0f2768..92ae75a4e06 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15148,6 +15148,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew metadata=_thread_metadata, on_before_finalize=_pause_typing_before_finalize, initial_reply_to_id=event_message_id, + run_still_current=_run_still_current, ) except Exception as _sc_err: logger.debug("Proxy: could not set up stream consumer: %s", _sc_err) @@ -16368,6 +16369,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew ), on_before_finalize=_pause_typing_before_finalize, initial_reply_to_id=event_message_id, + run_still_current=_run_still_current, ) if _want_stream_deltas: def _stream_delta_cb(text: str) -> None: diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index d5641b66409..63853b24645 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -121,6 +121,7 @@ class GatewayStreamConsumer: on_new_message: Optional[callable] = None, on_before_finalize: Optional[Callable[[], Any]] = None, initial_reply_to_id: Optional[str] = None, + run_still_current: Optional[Callable[[], bool]] = None, ): self.adapter = adapter self.chat_id = chat_id @@ -185,6 +186,11 @@ class GatewayStreamConsumer: getattr(adapter, "REQUIRES_EDIT_FINALIZE", False) is True ) + # Session staleness guard — when set to False (e.g. after /new or + # /stop), the run() loop will abandon the stream early instead of + # continuing to edit and deliver stale deltas. + self._run_still_current = run_still_current or (lambda: True) + # Think-block filter state (mirrors CLI's _stream_delta tag suppression) self._in_think_block = False self._think_buffer = "" @@ -504,6 +510,12 @@ class GatewayStreamConsumer: try: while True: + # Abandon the stream early if the session has been reset + # (e.g. /new or /stop). Prevents stale deltas from being + # delivered after the user has already moved on. + if not self._run_still_current(): + return + # Drain all available items from the queue got_done = False got_segment_break = False diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index d564f6b1dce..d83d086d650 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -2112,3 +2112,137 @@ class TestFreshFinalRespectsAdapterDecline: f"Expected 2 send calls (initial + fresh-final), got {adapter.send.call_count}" ) + +# ── run_still_current staleness guard ──────────────────────────────────── + +class TestRunStillCurrentGuard: + """Verify that the stream consumer abandons delivery when the session is + reset (e.g. /new or /stop), preventing stale deltas from reaching the user.""" + + @pytest.mark.asyncio + async def test_abandons_stream_when_session_reset_before_first_send(self): + """If _run_still_current returns False immediately, the consumer + exits without sending anything — even with queued deltas.""" + adapter = MagicMock() + adapter.send = AsyncMock() + adapter.edit_message = AsyncMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=3) + consumer = GatewayStreamConsumer( + adapter, "chat_123", config, + run_still_current=lambda: False, + ) + + consumer.on_delta("ABC") + consumer.on_delta("DEF") + consumer.on_delta("GHI") + + await consumer.run() + + adapter.send.assert_not_called() + adapter.edit_message.assert_not_called() + assert consumer._final_response_sent is False + + @pytest.mark.asyncio + async def test_abandons_stream_after_one_edit_when_session_reset(self): + """If staleness flips after the first edit, the consumer stops + on the next loop iteration and does not send the final response.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + adapter.send = AsyncMock(return_value=send_result) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + call_count = [0] + + def is_current(): + call_count[0] += 1 + return call_count[0] == 1 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=3) + consumer = GatewayStreamConsumer( + adapter, "chat_123", config, + run_still_current=is_current, + ) + + consumer.on_delta("First segment") + consumer.on_delta(None) # segment break → resets message_id + consumer.on_delta("Second segment text that will be stale") + # No finish() — staleness should prevent second segment from sending + + await consumer.run() + + # First segment was sent, second was abandoned + assert adapter.send.call_count == 1 + assert "First segment" in adapter.send.call_args_list[0][1]["content"] + assert consumer._final_response_sent is False + + @pytest.mark.asyncio + async def test_normal_delivery_when_session_stays_current(self): + """When _run_still_current always returns True, the consumer + behaves normally and delivers the full response.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + adapter.send = AsyncMock(return_value=send_result) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer( + adapter, "chat_123", config, + run_still_current=lambda: True, + ) + + consumer.on_delta("Hello, world!") + consumer.finish() + + await consumer.run() + + assert adapter.send.call_count >= 1 + assert consumer._final_response_sent is True + + @pytest.mark.asyncio + async def test_no_callback_defaults_to_always_current(self): + """When run_still_current is not provided (default), the consumer + always considers the session current — backward compatible.""" + adapter = MagicMock() + send_result = SimpleNamespace(success=True, message_id="msg_1") + adapter.send = AsyncMock(return_value=send_result) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True)) + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer(adapter, "chat_123", config) + + consumer.on_delta("Normal message") + consumer.finish() + + await consumer.run() + + assert adapter.send.call_count >= 1 + assert consumer._final_response_sent is True + + @pytest.mark.asyncio + async def test_abandons_even_with_pending_finish(self): + """If finish() has been called but the session is already reset + before the run loop starts, nothing is sent.""" + adapter = MagicMock() + adapter.send = AsyncMock() + adapter.edit_message = AsyncMock() + adapter.MAX_MESSAGE_LENGTH = 4096 + + config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) + consumer = GatewayStreamConsumer( + adapter, "chat_123", config, + run_still_current=lambda: False, + ) + + consumer.on_delta("Stale text") + consumer.finish() + + await consumer.run() + + adapter.send.assert_not_called() + adapter.edit_message.assert_not_called() + assert consumer._final_response_sent is False