From d81b888807a6e09e088a13423f2f30f1fdee440a Mon Sep 17 00:00:00 2001 From: analista Date: Thu, 14 May 2026 11:25:44 +0900 Subject: [PATCH] fix(telegram): report cron topic fallback --- cron/scheduler.py | 13 ++++ gateway/platforms/telegram.py | 21 ++++++- tests/cron/test_scheduler.py | 59 +++++++++++++++++++ .../gateway/test_telegram_thread_fallback.py | 40 ++++++++++++- 4 files changed, 129 insertions(+), 4 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 08221c6640a..ebfc2b153b0 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -691,6 +691,19 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option job["id"], platform_name, chat_id, err, ) adapter_ok = False # fall through to standalone path + elif ( + send_result + and thread_id + and getattr(send_result, "raw_response", None) + and send_result.raw_response.get("thread_fallback") + ): + requested_thread_id = send_result.raw_response.get("requested_thread_id") or thread_id + msg = ( + f"configured thread_id {requested_thread_id} for " + f"{platform_name}:{chat_id} was not found; delivered without thread_id" + ) + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) # Send extracted media files as native attachments via the live adapter if adapter_ok and media_files: diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 5aea4708951..b0b6adcc36d 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -1649,6 +1649,8 @@ class TelegramAdapter(BasePlatformAdapter): message_ids = [] thread_id = self._metadata_thread_id(metadata) + requested_thread_id = self._message_thread_id_for_send(thread_id) + used_thread_fallback = False try: from telegram.error import NetworkError as _NetErr @@ -1666,6 +1668,7 @@ class TelegramAdapter(BasePlatformAdapter): _TimedOut = None # type: ignore[assignment,misc] for i, chunk in enumerate(chunks): + retried_thread_not_found = False metadata_reply_to = self._metadata_reply_to_message_id(metadata) reply_to_source = reply_to or ( str(metadata_reply_to) @@ -1686,6 +1689,9 @@ class TelegramAdapter(BasePlatformAdapter): reply_to_message_id=reply_to_id, reply_to_mode=self._reply_to_mode, ) + if used_thread_fallback and thread_kwargs.get("message_thread_id") is not None: + thread_kwargs = dict(thread_kwargs) + thread_kwargs["message_thread_id"] = None effective_thread_id = thread_kwargs.get("message_thread_id") msg = None @@ -1726,6 +1732,14 @@ class TelegramAdapter(BasePlatformAdapter): # specific cases instead of blindly retrying. if _BadReq and isinstance(send_err, _BadReq): if self._is_thread_not_found_error(send_err) and effective_thread_id is not None: + if not retried_thread_not_found: + retried_thread_not_found = True + logger.warning( + "[%s] Thread %s not found, retrying once with message_thread_id", + self.name, effective_thread_id, + ) + await asyncio.sleep(1) + continue # Thread doesn't exist — retry without # message_thread_id so the message still # reaches the chat. @@ -1733,6 +1747,7 @@ class TelegramAdapter(BasePlatformAdapter): "[%s] Thread %s not found, retrying without message_thread_id", self.name, effective_thread_id, ) + used_thread_fallback = True effective_thread_id = None thread_kwargs = {"message_thread_id": None} continue @@ -1809,7 +1824,11 @@ class TelegramAdapter(BasePlatformAdapter): return SendResult( success=True, message_id=message_ids[0] if message_ids else None, - raw_response={"message_ids": message_ids} + raw_response={ + "message_ids": message_ids, + "requested_thread_id": requested_thread_id, + "thread_fallback": used_thread_fallback, + }, ) except Exception as e: diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 473e7e98b44..32485a917e0 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -2396,6 +2396,65 @@ class TestDeliverResultTimeoutCancelsFuture: assert result is None, f"expected successful delivery, got error: {result!r}" standalone_send.assert_awaited_once() + def test_live_adapter_thread_fallback_records_delivery_error(self): + """A cron target with an explicit topic must not be marked clean if + Telegram falls back to the base chat after "thread not found". + """ + from gateway.config import Platform + from gateway.platforms.base import SendResult + from concurrent.futures import Future + + send_result = SendResult( + success=True, + message_id="42", + raw_response={ + "requested_thread_id": 7072, + "thread_fallback": True, + }, + ) + adapter = MagicMock() + adapter.send = AsyncMock(return_value=send_result) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + + loop = MagicMock() + loop.is_running.return_value = True + + job = { + "id": "thread-fallback-job", + "deliver": "telegram:226252250:7072", + } + + completed_future = Future() + completed_future.set_result(send_result) + + def fake_run_coro(coro, _loop): + coro.close() + return completed_future + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + assert result == ( + "configured thread_id 7072 for telegram:226252250 was not found; " + "delivered without thread_id" + ) + adapter.send.assert_called_once_with( + "226252250", + "Hello world", + metadata={"thread_id": "7072"}, + ) + class TestSendMediaTimeoutCancelsFuture: """Same orphan-coroutine guarantee for _send_media_via_adapter's diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 02ff7262397..642306c142c 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -372,7 +372,7 @@ async def test_send_typing_falls_back_without_thread_on_bad_request(): @pytest.mark.asyncio async def test_send_retries_without_thread_on_thread_not_found(): - """When message_thread_id causes 'thread not found', retry without it.""" + """When message_thread_id keeps failing, retry once then fall back.""" adapter = _make_adapter() call_log = [] @@ -394,10 +394,43 @@ async def test_send_retries_without_thread_on_thread_not_found(): assert result.success is True assert result.message_id == "42" - # First call has thread_id, second call retries without + assert result.raw_response["requested_thread_id"] == 99999 + assert result.raw_response["thread_fallback"] is True + # First two calls keep the configured thread, then final fallback drops it. + assert len(call_log) == 3 + assert call_log[0]["message_thread_id"] == 99999 + assert call_log[1]["message_thread_id"] == 99999 + assert call_log[2]["message_thread_id"] is None + + +@pytest.mark.asyncio +async def test_send_retries_transient_thread_not_found_before_fallback(): + """A one-off Telegram thread-not-found response should still land in the topic.""" + adapter = _make_adapter() + + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise FakeBadRequest("Message thread not found") + return SimpleNamespace(message_id=43) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="test message", + metadata={"thread_id": "99999"}, + ) + + assert result.success is True + assert result.message_id == "43" + assert result.raw_response["requested_thread_id"] == 99999 + assert result.raw_response["thread_fallback"] is False assert len(call_log) == 2 assert call_log[0]["message_thread_id"] == 99999 - assert call_log[1]["message_thread_id"] is None + assert call_log[1]["message_thread_id"] == 99999 @pytest.mark.asyncio @@ -1079,6 +1112,7 @@ async def test_send_without_thread_id_unaffected(): ) assert result.success is True + assert result.raw_response["thread_fallback"] is False assert len(call_log) == 1 assert call_log[0]["message_thread_id"] is None