fix(gateway): pause Telegram typing before stream finalize

In Telegram streaming, the typing indicator persisted through the slow
final rich-text/MarkdownV2 finalize edit, so the '...typing' bubble
lingered for seconds after the last streamed token. Add a one-shot
on_before_finalize hook to GatewayStreamConsumer, fired once when the
stream transitions into its finalization path, and wire it on both
Telegram streaming call sites to call pause_typing_for_chat() before
the final edit. Cover hook ordering and once-only behavior in tests.

Fixes #49712
This commit is contained in:
LeonSGP43 2026-06-21 12:42:16 -07:00 committed by Teknium
parent 6902eb3913
commit 09a96ba0f6
3 changed files with 99 additions and 0 deletions

View file

@ -14133,6 +14133,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
_adapter = self.adapters.get(source.platform)
if _adapter:
_pause_typing_before_finalize = None
if source.platform == Platform.TELEGRAM and hasattr(_adapter, "pause_typing_for_chat"):
def _pause_typing_before_finalize(
_adapter=_adapter,
_chat_id=source.chat_id,
) -> None:
_adapter.pause_typing_for_chat(_chat_id)
_adapter_supports_edit = getattr(_adapter, "SUPPORTS_MESSAGE_EDITING", True)
_effective_cursor = _scfg.cursor if _adapter_supports_edit else ""
_buffer_only = False
@ -14162,6 +14169,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
chat_id=source.chat_id,
config=_consumer_cfg,
metadata=_thread_metadata,
on_before_finalize=_pause_typing_before_finalize,
initial_reply_to_id=event_message_id,
)
except Exception as _sc_err:
@ -15290,6 +15298,13 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig
_adapter = self.adapters.get(source.platform)
if _adapter:
_pause_typing_before_finalize = None
if source.platform == Platform.TELEGRAM and hasattr(_adapter, "pause_typing_for_chat"):
def _pause_typing_before_finalize(
_adapter=_adapter,
_chat_id=source.chat_id,
) -> None:
_adapter.pause_typing_for_chat(_chat_id)
# Platforms that don't support editing sent messages
# (e.g. QQ, WeChat) should skip streaming entirely —
# without edit support, the consumer sends a partial
@ -15334,6 +15349,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if progress_queue is not None
else None
),
on_before_finalize=_pause_typing_before_finalize,
initial_reply_to_id=event_message_id,
)
if _want_stream_deltas:

View file

@ -119,6 +119,7 @@ class GatewayStreamConsumer:
config: Optional[StreamConsumerConfig] = None,
metadata: Optional[dict] = None,
on_new_message: Optional[callable] = None,
on_before_finalize: Optional[Callable[[], Any]] = None,
initial_reply_to_id: Optional[str] = None,
):
self.adapter = adapter
@ -133,6 +134,10 @@ class GatewayStreamConsumer:
# the content, not edit the old bubble above it.
# Called with no arguments. Exceptions are swallowed.
self._on_new_message = on_new_message
# Fired once when the stream transitions into its finalization path.
# Gateway callers use this to pause typing refreshes before a slow
# final rich-text edit (Telegram MarkdownV2 finalize, etc.).
self._on_before_finalize = on_before_finalize
self._initial_reply_to_id = initial_reply_to_id
self._queue: queue.Queue = queue.Queue()
self._accumulated = ""
@ -196,6 +201,7 @@ class GatewayStreamConsumer:
# first failure we permanently disable drafts for the remainder of
# this response and route through edit-based for graceful degradation.
self._draft_failures = 0
self._before_finalize_notified = False
def _metadata_for_send(
self,
@ -242,6 +248,20 @@ class GatewayStreamConsumer:
the subsequent cosmetic edit (cursor removal) failed."""
return self._final_content_delivered
async def _notify_before_finalize(self) -> None:
"""Run the pre-finalize hook exactly once, swallowing hook errors."""
if self._before_finalize_notified:
return
self._before_finalize_notified = True
if self._on_before_finalize is None:
return
try:
result = self._on_before_finalize()
if inspect.isawaitable(result):
await result
except Exception:
pass
async def _edit_message(
self,
*,
@ -620,6 +640,8 @@ class GatewayStreamConsumer:
self._last_edit_time = time.monotonic()
if got_done:
if self._accumulated or self._message_id is not None or self._already_sent:
await self._notify_before_finalize()
# Final edit without cursor. If progressive editing failed
# mid-stream, send a single continuation/fallback message
# here instead of letting the base gateway path send the

View file

@ -361,6 +361,67 @@ class TestStreamRunMediaStripping:
assert consumer.already_sent
class TestBeforeFinalizeHook:
"""Verify the optional pre-finalize hook fires at the right time."""
@pytest.mark.asyncio
async def test_hook_runs_before_finalize_edit(self):
"""Adapters that require finalize should pause typing before the edit."""
events = []
adapter = MagicMock()
adapter.REQUIRES_EDIT_FINALIZE = True
adapter.send = AsyncMock(
side_effect=lambda **_kw: (
events.append("send"),
SimpleNamespace(success=True, message_id="msg_1"),
)[1]
)
adapter.edit_message = AsyncMock(
side_effect=lambda **_kw: (
events.append("edit"),
SimpleNamespace(success=True, message_id="msg_1"),
)[1]
)
adapter.MAX_MESSAGE_LENGTH = 4096
consumer = GatewayStreamConsumer(
adapter,
"chat_123",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
on_before_finalize=lambda: events.append("pause"),
)
consumer.on_delta("Hello")
consumer.finish()
await consumer.run()
assert events == ["send", "pause", "edit"]
@pytest.mark.asyncio
async def test_hook_runs_once_when_final_text_already_visible(self):
"""The hook still fires once even when no final edit is required."""
events = []
adapter = MagicMock()
adapter.REQUIRES_EDIT_FINALIZE = False
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.MAX_MESSAGE_LENGTH = 4096
consumer = GatewayStreamConsumer(
adapter,
"chat_123",
StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5),
on_before_finalize=lambda: events.append("pause"),
)
consumer.on_delta("Hello")
consumer.finish()
await consumer.run()
assert events == ["pause"]
adapter.edit_message.assert_not_called()
# ── Segment break (tool boundary) tests ──────────────────────────────────