diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 259750bf1e9..4bc7f5da844 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -468,6 +468,10 @@ class TelegramAdapter(BasePlatformAdapter): # "all" — every message triggers a push notification (legacy # behavior; opt-in via display.platforms.telegram.notifications). self._notifications_mode: str = "important" + # send_or_update_status() bookkeeping: {(chat_id, status_key) -> bot message_id} + # Tracks status bubbles owned by this adapter so subsequent calls with the + # same key edit the same message instead of appending new ones (#30045). + self._status_message_ids: Dict[tuple, str] = {} def _notification_kwargs( self, metadata: Optional[Dict[str, Any]] @@ -1908,6 +1912,40 @@ class TelegramAdapter(BasePlatformAdapter): is_connect_timeout = self._looks_like_connect_timeout(e) return SendResult(success=False, error=str(e), retryable=(is_connect_timeout or not is_timeout)) + async def send_or_update_status( + self, + chat_id: str, + status_key: str, + content: str, + *, + metadata: Optional[Dict[str, Any]] = None, + ) -> SendResult: + """Send a status message, or edit the previous one with the same key. + + Issue #30045: progress/status callbacks (context-pressure, lifecycle, + compression, etc.) used to append a fresh bubble on every call. With + this method, the first call sends and the message id is remembered; + subsequent calls with the same (chat_id, status_key) edit that same + message in place. If the edit fails (message deleted, too old, etc.) + we drop the cached id and send fresh. + """ + key = (str(chat_id), str(status_key)) + cached_id = self._status_message_ids.get(key) + if cached_id is not None: + result = await self.edit_message( + chat_id, cached_id, content, finalize=True, metadata=metadata, + ) + if result.success: + if result.message_id: + self._status_message_ids[key] = str(result.message_id) + return result + # Edit failed — clear the cached id and fall through to a fresh send. + self._status_message_ids.pop(key, None) + result = await self.send(chat_id, content, metadata=metadata) + if result.success and result.message_id: + self._status_message_ids[key] = str(result.message_id) + return result + async def edit_message( self, chat_id: str, diff --git a/gateway/run.py b/gateway/run.py index 5eaa19b7568..c658cf8f430 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -238,6 +238,19 @@ def _prepare_gateway_status_message(platform: Any, event_type: str, message: str return text +async def _send_or_update_status_coro(adapter, chat_id, status_key, content, metadata): + """Route a status message through adapter.send_or_update_status when supported. + + Issue #30045: adapters that implement send_or_update_status (currently + Telegram) edit the previous bubble for the same status_key instead of + appending a new one. Adapters without the method fall back to plain send. + """ + sender = getattr(adapter, "send_or_update_status", None) + if callable(sender): + return await sender(chat_id, status_key, content, metadata=metadata) + return await adapter.send(chat_id, content, metadata=metadata) + + def _telegramize_command_mentions(text: str, platform: Any) -> str: """Rewrite slash-command mentions to Telegram-valid command names. @@ -16141,11 +16154,7 @@ class GatewayRunner: ) return _fut = safe_schedule_threadsafe( - _status_adapter.send( - _status_chat_id, - prepared_message, - metadata=_status_thread_metadata, - ), + _send_or_update_status_coro(_status_adapter, _status_chat_id, event_type, prepared_message, _status_thread_metadata), _loop_for_step, logger=logger, log_message=f"status_callback ({event_type}) scheduling error", diff --git a/tests/gateway/test_telegram_status_update.py b/tests/gateway/test_telegram_status_update.py new file mode 100644 index 00000000000..f49ca9c60e1 --- /dev/null +++ b/tests/gateway/test_telegram_status_update.py @@ -0,0 +1,162 @@ +"""Tests for TelegramAdapter.send_or_update_status (issue #30045). + +The status-update path must: + 1. Send a fresh message on the first call for a (chat_id, status_key) pair. + 2. Edit that same message on subsequent calls with the same key. + 3. Fall back to sending fresh when the cached message edit fails. + 4. Keep distinct keys independent (no cross-talk). +""" + +from __future__ import annotations + +import sys +import types +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from gateway.config import PlatformConfig +from gateway.platforms.base import SendResult + + +def _install_fake_telegram(monkeypatch): + """Stub the python-telegram-bot package so TelegramAdapter can be imported.""" + fake_telegram = types.ModuleType("telegram") + fake_telegram.Update = SimpleNamespace(ALL_TYPES=()) + fake_telegram.Bot = object + fake_telegram.Message = object + fake_telegram.InlineKeyboardButton = object + fake_telegram.InlineKeyboardMarkup = object + + fake_error = types.ModuleType("telegram.error") + fake_error.NetworkError = type("NetworkError", (Exception,), {}) + fake_error.BadRequest = type("BadRequest", (Exception,), {}) + fake_error.TimedOut = type("TimedOut", (Exception,), {}) + fake_telegram.error = fake_error + + fake_constants = types.ModuleType("telegram.constants") + fake_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2") + fake_constants.ChatType = SimpleNamespace( + GROUP="group", SUPERGROUP="supergroup", + CHANNEL="channel", PRIVATE="private", + ) + fake_telegram.constants = fake_constants + + fake_ext = types.ModuleType("telegram.ext") + fake_ext.Application = object + fake_ext.CommandHandler = object + fake_ext.CallbackQueryHandler = object + fake_ext.MessageHandler = object + fake_ext.ContextTypes = SimpleNamespace(DEFAULT_TYPE=object) + fake_ext.filters = object + + fake_request = types.ModuleType("telegram.request") + fake_request.HTTPXRequest = object + + monkeypatch.setitem(sys.modules, "telegram", fake_telegram) + monkeypatch.setitem(sys.modules, "telegram.error", fake_error) + monkeypatch.setitem(sys.modules, "telegram.constants", fake_constants) + monkeypatch.setitem(sys.modules, "telegram.ext", fake_ext) + monkeypatch.setitem(sys.modules, "telegram.request", fake_request) + + +@pytest.fixture +def adapter(monkeypatch): + _install_fake_telegram(monkeypatch) + from gateway.platforms.telegram import TelegramAdapter + + a = TelegramAdapter(PlatformConfig(enabled=True, token="fake-token")) + a._bot = MagicMock() + # Patch send / edit_message so tests can drive them directly. + a.send = AsyncMock() + a.edit_message = AsyncMock() + return a + + +@pytest.mark.asyncio +async def test_first_call_sends_and_caches_message_id(adapter): + """First call for a (chat, key) pair must send and remember the id.""" + adapter.send.return_value = SendResult(success=True, message_id="100") + + result = await adapter.send_or_update_status("chat-1", "lifecycle", "starting") + + assert result.success is True + assert result.message_id == "100" + adapter.send.assert_awaited_once() + adapter.edit_message.assert_not_awaited() + assert adapter._status_message_ids[("chat-1", "lifecycle")] == "100" + + +@pytest.mark.asyncio +async def test_second_call_edits_in_place(adapter): + """Same (chat, key) on the second call must edit, not send.""" + adapter.send.return_value = SendResult(success=True, message_id="100") + adapter.edit_message.return_value = SendResult(success=True, message_id="100") + + await adapter.send_or_update_status("chat-1", "lifecycle", "step 1") + await adapter.send_or_update_status("chat-1", "lifecycle", "step 2") + + adapter.send.assert_awaited_once() + adapter.edit_message.assert_awaited_once() + # Edit was directed at the cached message id. + args, kwargs = adapter.edit_message.call_args + assert args[0] == "chat-1" + assert args[1] == "100" + assert args[2] == "step 2" + + +@pytest.mark.asyncio +async def test_edit_failure_falls_back_to_fresh_send(adapter): + """When edit_message fails the cache is cleared and a new send happens.""" + adapter.send.side_effect = [ + SendResult(success=True, message_id="100"), + SendResult(success=True, message_id="200"), + ] + adapter.edit_message.return_value = SendResult( + success=False, error="Bad Request: message to edit not found", + ) + + await adapter.send_or_update_status("chat-1", "lifecycle", "step 1") + result = await adapter.send_or_update_status("chat-1", "lifecycle", "step 2") + + assert result.success is True + assert result.message_id == "200" + assert adapter.send.await_count == 2 + assert adapter.edit_message.await_count == 1 + # Cache now points at the fresh message id. + assert adapter._status_message_ids[("chat-1", "lifecycle")] == "200" + + +@pytest.mark.asyncio +async def test_distinct_status_keys_do_not_collide(adapter): + """A different status_key gets its own message; the original isn't touched.""" + adapter.send.side_effect = [ + SendResult(success=True, message_id="100"), + SendResult(success=True, message_id="200"), + ] + + await adapter.send_or_update_status("chat-1", "lifecycle", "ctx pressure") + await adapter.send_or_update_status("chat-1", "model-switch", "switched to opus") + + assert adapter.send.await_count == 2 + adapter.edit_message.assert_not_awaited() + assert adapter._status_message_ids[("chat-1", "lifecycle")] == "100" + assert adapter._status_message_ids[("chat-1", "model-switch")] == "200" + + +@pytest.mark.asyncio +async def test_distinct_chat_ids_do_not_collide(adapter): + """Same status_key in different chats must not edit each other's messages.""" + adapter.send.side_effect = [ + SendResult(success=True, message_id="100"), + SendResult(success=True, message_id="200"), + ] + + await adapter.send_or_update_status("chat-1", "lifecycle", "first") + await adapter.send_or_update_status("chat-2", "lifecycle", "second") + + assert adapter.send.await_count == 2 + adapter.edit_message.assert_not_awaited() + assert adapter._status_message_ids[("chat-1", "lifecycle")] == "100" + assert adapter._status_message_ids[("chat-2", "lifecycle")] == "200"