fix(telegram): report cron topic fallback

This commit is contained in:
analista 2026-05-14 11:25:44 +09:00 committed by Teknium
parent 16d8e44f7a
commit d81b888807
4 changed files with 129 additions and 4 deletions

View file

@ -691,6 +691,19 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
job["id"], platform_name, chat_id, err,
)
adapter_ok = False # fall through to standalone path
elif (
send_result
and thread_id
and getattr(send_result, "raw_response", None)
and send_result.raw_response.get("thread_fallback")
):
requested_thread_id = send_result.raw_response.get("requested_thread_id") or thread_id
msg = (
f"configured thread_id {requested_thread_id} for "
f"{platform_name}:{chat_id} was not found; delivered without thread_id"
)
logger.warning("Job '%s': %s", job["id"], msg)
delivery_errors.append(msg)
# Send extracted media files as native attachments via the live adapter
if adapter_ok and media_files:

View file

@ -1649,6 +1649,8 @@ class TelegramAdapter(BasePlatformAdapter):
message_ids = []
thread_id = self._metadata_thread_id(metadata)
requested_thread_id = self._message_thread_id_for_send(thread_id)
used_thread_fallback = False
try:
from telegram.error import NetworkError as _NetErr
@ -1666,6 +1668,7 @@ class TelegramAdapter(BasePlatformAdapter):
_TimedOut = None # type: ignore[assignment,misc]
for i, chunk in enumerate(chunks):
retried_thread_not_found = False
metadata_reply_to = self._metadata_reply_to_message_id(metadata)
reply_to_source = reply_to or (
str(metadata_reply_to)
@ -1686,6 +1689,9 @@ class TelegramAdapter(BasePlatformAdapter):
reply_to_message_id=reply_to_id,
reply_to_mode=self._reply_to_mode,
)
if used_thread_fallback and thread_kwargs.get("message_thread_id") is not None:
thread_kwargs = dict(thread_kwargs)
thread_kwargs["message_thread_id"] = None
effective_thread_id = thread_kwargs.get("message_thread_id")
msg = None
@ -1726,6 +1732,14 @@ class TelegramAdapter(BasePlatformAdapter):
# specific cases instead of blindly retrying.
if _BadReq and isinstance(send_err, _BadReq):
if self._is_thread_not_found_error(send_err) and effective_thread_id is not None:
if not retried_thread_not_found:
retried_thread_not_found = True
logger.warning(
"[%s] Thread %s not found, retrying once with message_thread_id",
self.name, effective_thread_id,
)
await asyncio.sleep(1)
continue
# Thread doesn't exist — retry without
# message_thread_id so the message still
# reaches the chat.
@ -1733,6 +1747,7 @@ class TelegramAdapter(BasePlatformAdapter):
"[%s] Thread %s not found, retrying without message_thread_id",
self.name, effective_thread_id,
)
used_thread_fallback = True
effective_thread_id = None
thread_kwargs = {"message_thread_id": None}
continue
@ -1809,7 +1824,11 @@ class TelegramAdapter(BasePlatformAdapter):
return SendResult(
success=True,
message_id=message_ids[0] if message_ids else None,
raw_response={"message_ids": message_ids}
raw_response={
"message_ids": message_ids,
"requested_thread_id": requested_thread_id,
"thread_fallback": used_thread_fallback,
},
)
except Exception as e:

View file

@ -2396,6 +2396,65 @@ class TestDeliverResultTimeoutCancelsFuture:
assert result is None, f"expected successful delivery, got error: {result!r}"
standalone_send.assert_awaited_once()
def test_live_adapter_thread_fallback_records_delivery_error(self):
"""A cron target with an explicit topic must not be marked clean if
Telegram falls back to the base chat after "thread not found".
"""
from gateway.config import Platform
from gateway.platforms.base import SendResult
from concurrent.futures import Future
send_result = SendResult(
success=True,
message_id="42",
raw_response={
"requested_thread_id": 7072,
"thread_fallback": True,
},
)
adapter = MagicMock()
adapter.send = AsyncMock(return_value=send_result)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
job = {
"id": "thread-fallback-job",
"deliver": "telegram:226252250:7072",
}
completed_future = Future()
completed_future.set_result(send_result)
def fake_run_coro(coro, _loop):
coro.close()
return completed_future
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
assert result == (
"configured thread_id 7072 for telegram:226252250 was not found; "
"delivered without thread_id"
)
adapter.send.assert_called_once_with(
"226252250",
"Hello world",
metadata={"thread_id": "7072"},
)
class TestSendMediaTimeoutCancelsFuture:
"""Same orphan-coroutine guarantee for _send_media_via_adapter's

View file

@ -372,7 +372,7 @@ async def test_send_typing_falls_back_without_thread_on_bad_request():
@pytest.mark.asyncio
async def test_send_retries_without_thread_on_thread_not_found():
"""When message_thread_id causes 'thread not found', retry without it."""
"""When message_thread_id keeps failing, retry once then fall back."""
adapter = _make_adapter()
call_log = []
@ -394,10 +394,43 @@ async def test_send_retries_without_thread_on_thread_not_found():
assert result.success is True
assert result.message_id == "42"
# First call has thread_id, second call retries without
assert result.raw_response["requested_thread_id"] == 99999
assert result.raw_response["thread_fallback"] is True
# First two calls keep the configured thread, then final fallback drops it.
assert len(call_log) == 3
assert call_log[0]["message_thread_id"] == 99999
assert call_log[1]["message_thread_id"] == 99999
assert call_log[2]["message_thread_id"] is None
@pytest.mark.asyncio
async def test_send_retries_transient_thread_not_found_before_fallback():
"""A one-off Telegram thread-not-found response should still land in the topic."""
adapter = _make_adapter()
call_log = []
async def mock_send_message(**kwargs):
call_log.append(dict(kwargs))
if len(call_log) == 1:
raise FakeBadRequest("Message thread not found")
return SimpleNamespace(message_id=43)
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(
chat_id="123",
content="test message",
metadata={"thread_id": "99999"},
)
assert result.success is True
assert result.message_id == "43"
assert result.raw_response["requested_thread_id"] == 99999
assert result.raw_response["thread_fallback"] is False
assert len(call_log) == 2
assert call_log[0]["message_thread_id"] == 99999
assert call_log[1]["message_thread_id"] is None
assert call_log[1]["message_thread_id"] == 99999
@pytest.mark.asyncio
@ -1079,6 +1112,7 @@ async def test_send_without_thread_id_unaffected():
)
assert result.success is True
assert result.raw_response["thread_fallback"] is False
assert len(call_log) == 1
assert call_log[0]["message_thread_id"] is None