From f40b20d13ce6e2604d70cec187c143a0e8100dfc Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 27 Apr 2026 19:09:32 -0700 Subject: [PATCH] fix(gateway): keep typing indicator alive across slow send_typing calls (#16763) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The typing-indicator refresh loop in BasePlatformAdapter._keep_typing awaited each send_typing call unconditionally. Each call is an HTTP round-trip to the platform API (Telegram/Discord), normally ~100ms. When the same network instability that causes upstream provider timeouts (e.g. Anthropic capacity blips slowing first-token latency past the 120s stream-read timeout) also slows the platform typing API to multi-second response times, the refresh loop stalls inside the await. Platform-side typing expires at ~5s, so the bubble dies and stays dead until the stuck send_typing call returns — right when the user most needs the 'still working' signal and instead sees a bot that looks dead, then asks 'wtf are you doing' which itself interrupts the eventually-recovering turn. Bound each send_typing with asyncio.wait_for (1.5s cap, derived from interval so it's always below the 2s cadence). Slow calls get abandoned so the next scheduled tick fires a fresh send_typing on schedule. As long as any one of them reaches the platform within its ~5s typing-expiry window, the bubble stays visible across the stall. Also catches non-timeout send_typing exceptions (transient HTTP errors) so one bad tick doesn't terminate the whole loop. Tests: 4 new in tests/gateway/test_keep_typing_timeout.py covering slow-send non-blocking, fast-send still-awaited, exception resilience, and paused-chat regression guard. --- gateway/platforms/base.py | 30 +++- tests/gateway/test_keep_typing_timeout.py | 200 ++++++++++++++++++++++ 2 files changed, 229 insertions(+), 1 deletion(-) create mode 100644 tests/gateway/test_keep_typing_timeout.py diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 72054e3364..d03978cd1b 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -1702,13 +1702,41 @@ class BasePlatformAdapter(ABC): the agent is waiting for dangerous-command approval). This is critical for Slack's Assistant API where ``assistant_threads_setStatus`` disables the compose box — pausing lets the user type ``/approve`` or ``/deny``. + + Each ``send_typing`` call is bounded by a ~1.5s timeout so a slow + network round-trip can't stall the refresh cadence. Telegram- and + Discord-side typing expire after ~5s; if any individual send_typing + takes longer than the refresh interval, the bubble would die and + stay dead until that call returns. Abandoning the slow call lets + the next tick fire a fresh send_typing on schedule — as long as + one of them succeeds within the 5s platform-side window, the bubble + stays visible across provider stalls / upstream API timeouts. """ + # Bound each send_typing round-trip so the refresh cadence isn't + # gated on network health. Must stay below ``interval`` so a slow + # call gets abandoned before the next scheduled tick. + _send_typing_timeout = max(0.25, min(1.5, interval - 0.25)) try: while True: if stop_event is not None and stop_event.is_set(): return if chat_id not in self._typing_paused: - await self.send_typing(chat_id, metadata=metadata) + try: + await asyncio.wait_for( + self.send_typing(chat_id, metadata=metadata), + timeout=_send_typing_timeout, + ) + except asyncio.TimeoutError: + # Slow network — abandon this tick, keep the loop + # on schedule so the next send_typing fires fresh. + pass + except asyncio.CancelledError: + raise + except Exception as typing_err: + logger.debug( + "[%s] send_typing error (non-fatal): %s", + self.name, typing_err, + ) if stop_event is None: await asyncio.sleep(interval) continue diff --git a/tests/gateway/test_keep_typing_timeout.py b/tests/gateway/test_keep_typing_timeout.py new file mode 100644 index 0000000000..2cabe2f7d1 --- /dev/null +++ b/tests/gateway/test_keep_typing_timeout.py @@ -0,0 +1,200 @@ +"""Tests for BasePlatformAdapter._keep_typing timeout-per-tick behavior. + +When the gateway is waiting on a long upstream provider response (e.g. +Anthropic/opus-4.7 first-token latency climbing during an upstream blip), +the model-call socket is blocked on the worker thread but the asyncio loop +is still running, and ``_keep_typing`` refreshes the platform typing +indicator every 2 seconds. + +The bug: each ``send_typing`` call is an HTTP round-trip to the platform API +(Telegram/Discord). If the same network instability that's slowing the model +call also makes ``send_typing`` slow (5-30s response time), the refresh loop +stalls inside the ``await self.send_typing(...)`` call. Platform-side typing +expires at ~5s, so the bubble dies and doesn't come back until that stuck +call returns — exactly when the user most needs the "yes, still working" +signal. + +The fix: bound each ``send_typing`` with ``asyncio.wait_for``. If a +send_typing takes longer than the per-tick budget (default 1.5s when +interval=2.0), abandon it and let the next scheduled tick fire a fresh +call. As long as any one of them succeeds within the ~5s platform window, +the bubble stays visible across provider stalls. +""" + +import asyncio +from unittest.mock import MagicMock + +import pytest + +from gateway.platforms.base import ( + BasePlatformAdapter, + Platform, + PlatformConfig, + SendResult, +) + + +class _StubAdapter(BasePlatformAdapter): + def __init__(self): + super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) + + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + self._mark_disconnected() + + async def send(self, chat_id, content, reply_to=None, metadata=None): + return SendResult(success=True, message_id="m1") + + async def get_chat_info(self, chat_id): + return {"id": chat_id, "type": "dm"} + + +class TestKeepTypingTimeoutPerTick: + @pytest.mark.asyncio + async def test_slow_send_typing_does_not_block_cadence(self, monkeypatch): + """A send_typing that hangs longer than the per-tick budget must be + abandoned so the next scheduled tick can fire a fresh call.""" + adapter = _StubAdapter() + call_events = [] + + async def slow_send_typing(chat_id, metadata=None): + # Simulate a stuck HTTP round-trip. If _keep_typing awaits this + # unconditionally, the loop stalls for the full duration. + call_events.append("start") + try: + await asyncio.sleep(10) + finally: + call_events.append("finish-or-cancel") + + monkeypatch.setattr(adapter, "send_typing", slow_send_typing) + # Avoid stop_typing side-effects in the finally block. + adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) + + stop_event = asyncio.Event() + # Start the typing loop, let it run ~3s (should fire 2 ticks) then stop. + task = asyncio.create_task( + adapter._keep_typing( + chat_id="123", + interval=1.0, + stop_event=stop_event, + ) + ) + await asyncio.sleep(3.0) + stop_event.set() + try: + await asyncio.wait_for(task, timeout=2.0) + except asyncio.TimeoutError: + task.cancel() + pytest.fail( + "_keep_typing did not exit within 2s of stop_event.set() — " + "it is blocked on a slow send_typing call" + ) + + # With per-tick timeout, we should see MULTIPLE send_typing starts + # despite each being slow (abandoned via TimeoutError). Without the + # fix there would be exactly 1 start (the one still stuck). + starts = [e for e in call_events if e == "start"] + assert len(starts) >= 2, ( + f"expected at least 2 send_typing ticks across 3s of slow " + f"operation, got {len(starts)} — refresh cadence is stalled " + f"on a slow send_typing" + ) + + @pytest.mark.asyncio + async def test_fast_send_typing_still_gets_awaited(self, monkeypatch): + """When send_typing is fast (normal case), it must still complete + normally — the timeout is only an upper bound, not a cap on + successful calls.""" + adapter = _StubAdapter() + completed = [] + + async def fast_send_typing(chat_id, metadata=None): + await asyncio.sleep(0.01) # well under the timeout + completed.append(chat_id) + + monkeypatch.setattr(adapter, "send_typing", fast_send_typing) + adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) + + stop_event = asyncio.Event() + task = asyncio.create_task( + adapter._keep_typing( + chat_id="456", + interval=0.5, + stop_event=stop_event, + ) + ) + await asyncio.sleep(1.2) # ~3 ticks + stop_event.set() + await asyncio.wait_for(task, timeout=1.0) + + assert len(completed) >= 2, ( + f"expected multiple completed send_typing calls, got " + f"{len(completed)}" + ) + assert all(c == "456" for c in completed) + + @pytest.mark.asyncio + async def test_send_typing_exception_does_not_kill_loop(self, monkeypatch): + """A send_typing that raises (e.g. transient HTTP 500) must be + caught so the loop continues refreshing on schedule.""" + adapter = _StubAdapter() + tick_count = {"n": 0} + + async def flaky_send_typing(chat_id, metadata=None): + tick_count["n"] += 1 + if tick_count["n"] == 1: + raise RuntimeError("transient upstream error") + # Subsequent calls succeed. + + monkeypatch.setattr(adapter, "send_typing", flaky_send_typing) + adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) + + stop_event = asyncio.Event() + task = asyncio.create_task( + adapter._keep_typing( + chat_id="789", + interval=0.3, + stop_event=stop_event, + ) + ) + await asyncio.sleep(1.0) + stop_event.set() + await asyncio.wait_for(task, timeout=1.0) + + assert tick_count["n"] >= 2, ( + f"loop exited after first send_typing exception; expected it to " + f"keep ticking (got {tick_count['n']} ticks)" + ) + + @pytest.mark.asyncio + async def test_paused_chat_skips_send_typing(self, monkeypatch): + """When a chat is in _typing_paused (e.g. awaiting approval), the + loop must not call send_typing at all. Regression guard — existing + behavior, preserved through the timeout change.""" + adapter = _StubAdapter() + calls = [] + + async def recording_send_typing(chat_id, metadata=None): + calls.append(chat_id) + + monkeypatch.setattr(adapter, "send_typing", recording_send_typing) + adapter.stop_typing = MagicMock(return_value=asyncio.sleep(0)) + adapter._typing_paused.add("paused-chat") + + stop_event = asyncio.Event() + task = asyncio.create_task( + adapter._keep_typing( + chat_id="paused-chat", + interval=0.3, + stop_event=stop_event, + ) + ) + await asyncio.sleep(1.0) + stop_event.set() + await asyncio.wait_for(task, timeout=1.0) + + assert calls == [], ( + f"send_typing was called on a paused chat: {calls}" + )