diff --git a/gateway/run.py b/gateway/run.py index b107a58f1a7..622881b83f5 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -14133,6 +14133,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig _adapter = self.adapters.get(source.platform) if _adapter: + _pause_typing_before_finalize = None + if source.platform == Platform.TELEGRAM and hasattr(_adapter, "pause_typing_for_chat"): + def _pause_typing_before_finalize( + _adapter=_adapter, + _chat_id=source.chat_id, + ) -> None: + _adapter.pause_typing_for_chat(_chat_id) _adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True) _effective_cursor = _scfg.cursor if _adapter_supports_edit else "" _buffer_only = False @@ -14162,6 +14169,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew chat_id=source.chat_id, config=_consumer_cfg, metadata=_thread_metadata, + on_before_finalize=_pause_typing_before_finalize, initial_reply_to_id=event_message_id, ) except Exception as _sc_err: @@ -15290,6 +15298,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig _adapter = self.adapters.get(source.platform) if _adapter: + _pause_typing_before_finalize = None + if source.platform == Platform.TELEGRAM and hasattr(_adapter, "pause_typing_for_chat"): + def _pause_typing_before_finalize( + _adapter=_adapter, + _chat_id=source.chat_id, + ) -> None: + _adapter.pause_typing_for_chat(_chat_id) # Platforms that don't support editing sent messages # (e.g. QQ, WeChat) should skip streaming entirely — # without edit support, the consumer sends a partial @@ -15334,6 +15349,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if progress_queue is not None else None ), + on_before_finalize=_pause_typing_before_finalize, initial_reply_to_id=event_message_id, ) if _want_stream_deltas: diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index f559d7ecd43..9e005754aa3 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -119,6 +119,7 @@ class GatewayStreamConsumer: config: Optional[StreamConsumerConfig] = None, metadata: Optional[dict] = None, on_new_message: Optional[callable] = None, + on_before_finalize: Optional[Callable[[], Any]] = None, initial_reply_to_id: Optional[str] = None, ): self.adapter = adapter @@ -133,6 +134,10 @@ class GatewayStreamConsumer: # the content, not edit the old bubble above it. # Called with no arguments. Exceptions are swallowed. self._on_new_message = on_new_message + # Fired once when the stream transitions into its finalization path. + # Gateway callers use this to pause typing refreshes before a slow + # final rich-text edit (Telegram MarkdownV2 finalize, etc.). + self._on_before_finalize = on_before_finalize self._initial_reply_to_id = initial_reply_to_id self._queue: queue.Queue = queue.Queue() self._accumulated = "" @@ -196,6 +201,7 @@ class GatewayStreamConsumer: # first failure we permanently disable drafts for the remainder of # this response and route through edit-based for graceful degradation. self._draft_failures = 0 + self._before_finalize_notified = False def _metadata_for_send( self, @@ -242,6 +248,20 @@ class GatewayStreamConsumer: the subsequent cosmetic edit (cursor removal) failed.""" return self._final_content_delivered + async def _notify_before_finalize(self) -> None: + """Run the pre-finalize hook exactly once, swallowing hook errors.""" + if self._before_finalize_notified: + return + self._before_finalize_notified = True + if self._on_before_finalize is None: + return + try: + result = self._on_before_finalize() + if inspect.isawaitable(result): + await result + except Exception: + pass + async def _edit_message( self, *, @@ -620,6 +640,8 @@ class GatewayStreamConsumer: self._last_edit_time = time.monotonic() if got_done: + if self._accumulated or self._message_id is not None or self._already_sent: + await self._notify_before_finalize() # Final edit without cursor. If progressive editing failed # mid-stream, send a single continuation/fallback message # here instead of letting the base gateway path send the diff --git a/tests/gateway/test_stream_consumer.py b/tests/gateway/test_stream_consumer.py index 0b8aebf07e5..9dca1f9bedd 100644 --- a/tests/gateway/test_stream_consumer.py +++ b/tests/gateway/test_stream_consumer.py @@ -361,6 +361,67 @@ class TestStreamRunMediaStripping: assert consumer.already_sent +class TestBeforeFinalizeHook: + """Verify the optional pre-finalize hook fires at the right time.""" + + @pytest.mark.asyncio + async def test_hook_runs_before_finalize_edit(self): + """Adapters that require finalize should pause typing before the edit.""" + events = [] + adapter = MagicMock() + adapter.REQUIRES_EDIT_FINALIZE = True + adapter.send = AsyncMock( + side_effect=lambda **_kw: ( + events.append("send"), + SimpleNamespace(success=True, message_id="msg_1"), + )[1] + ) + adapter.edit_message = AsyncMock( + side_effect=lambda **_kw: ( + events.append("edit"), + SimpleNamespace(success=True, message_id="msg_1"), + )[1] + ) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + on_before_finalize=lambda: events.append("pause"), + ) + consumer.on_delta("Hello") + consumer.finish() + + await consumer.run() + + assert events == ["send", "pause", "edit"] + + @pytest.mark.asyncio + async def test_hook_runs_once_when_final_text_already_visible(self): + """The hook still fires once even when no final edit is required.""" + events = [] + adapter = MagicMock() + adapter.REQUIRES_EDIT_FINALIZE = False + adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1")) + adapter.MAX_MESSAGE_LENGTH = 4096 + + consumer = GatewayStreamConsumer( + adapter, + "chat_123", + StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5), + on_before_finalize=lambda: events.append("pause"), + ) + consumer.on_delta("Hello") + consumer.finish() + + await consumer.run() + + assert events == ["pause"] + adapter.edit_message.assert_not_called() + + # ── Segment break (tool boundary) tests ──────────────────────────────────