Merge pull request #50023 from NousResearch/salvage/f3b-telegram-dmtopic

fix(cron): route Telegram DM-topic cron delivery through DeliveryRouter (#22773)
This commit is contained in:
kshitij 2026-06-21 13:47:30 +05:30 committed by GitHub
commit 3051a1634c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 303 additions and 78 deletions

View file

@ -847,16 +847,74 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
delivered = False
target_errors = []
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
send_metadata = {"thread_id": thread_id} if thread_id else None
# Telegram three-mode topic routing (#22773): a private chat
# (positive chat_id) with a NUMERIC topic id is a Bot API Direct
# Messages topic and must be addressed via ``direct_messages_topic_id``
# — a bare ``message_thread_id`` is rejected/mis-routed by Bot API
# 10.0 and lands in General. Forum/supergroup targets (negative
# chat_id) and named DM-topic lanes keep the default thread_id
# handling. Compute the routed metadata ONCE so both the text send
# (via DeliveryRouter) and the media send use the same routing.
from gateway.delivery import (
DeliveryRouter,
DeliveryTarget,
_looks_like_int,
_looks_like_telegram_private_chat_id,
)
is_private_dm_topic = (
platform == Platform.TELEGRAM
and thread_id is not None
and _looks_like_telegram_private_chat_id(str(chat_id))
and _looks_like_int(str(thread_id))
)
if is_private_dm_topic:
# Routed via direct_messages_topic_id (mode 2), no bare thread_id.
route_thread_id = None
route_metadata = {
"direct_messages_topic_id": str(thread_id),
"job_id": job["id"],
}
# Media metadata mirrors the text routing so attachments land in
# the same DM topic instead of the General lane (#22773).
media_metadata = {"direct_messages_topic_id": str(thread_id)}
else:
route_thread_id = str(thread_id) if thread_id is not None else None
route_metadata = {"job_id": job["id"]}
media_metadata = {"thread_id": thread_id} if thread_id else None
try:
# Send cleaned text (MEDIA tags stripped) — not the raw content
# Send cleaned text (MEDIA tags stripped) — not the raw content.
# Route through the gateway's DeliveryRouter so the live send
# gets the same platform-specific routing as live messages —
# in particular Telegram's three-mode topic routing. The
# standalone cron path lacked this, so DM-topic cron deliveries
# landed in the General topic or were rejected by Bot API 10.0
# (#22773).
text_to_send = cleaned_delivery_content.strip()
adapter_ok = True
timed_out = False
if text_to_send:
from agent.async_utils import safe_schedule_threadsafe
router = DeliveryRouter(config, adapters)
route_target = DeliveryTarget(
platform=platform,
chat_id=str(chat_id),
thread_id=route_thread_id,
is_explicit=True,
)
# Pass thread routing via the target (not a bare metadata
# "thread_id"): the router only applies its Telegram DM-topic
# detection when "thread_id"/"message_thread_id" are absent
# from metadata, deriving the routing from target.thread_id
# or the explicit direct_messages_topic_id above.
future = safe_schedule_threadsafe(
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
router._deliver_to_platform(
route_target,
text_to_send,
route_metadata,
),
loop,
)
if future is None:
@ -922,54 +980,69 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
# dispatched). send_result is None, so skip the
# confirmation/thread-fallback inspection below.
pass
elif not _confirm_adapter_delivery(send_result):
# A ``None`` return or a result object missing an
# explicit ``success`` attribute is NOT a confirmed
# delivery (#47056): the scheduler would log
# "delivered" while the gateway never saw it. Fall
# through to the standalone path.
err = (
getattr(send_result, "error", None)
if send_result is not None
else "no response from adapter"
)
shape = type(send_result).__name__ if send_result is not None else "None"
msg = (
f"live adapter send to {platform_name}:{chat_id} "
f"returned unconfirmed result ({shape}, error={err})"
)
logger.warning(
"Job '%s': %s, falling back to standalone",
job["id"], msg,
)
target_errors.append(msg)
adapter_ok = False # fall through to standalone path
elif (
send_result
and thread_id
and getattr(send_result, "raw_response", None)
and send_result.raw_response.get("thread_fallback")
):
requested_thread_id = send_result.raw_response.get("requested_thread_id") or thread_id
msg = (
f"configured thread_id {requested_thread_id} for "
f"{platform_name}:{chat_id} was not found; delivered without thread_id"
)
logger.warning("Job '%s': %s", job["id"], msg)
delivery_errors.append(msg)
else:
# _deliver_to_platform returns either a SendResult
# (.success attr) or, when the silence-narration
# filter drops the message, a plain dict
# {"success": True, "delivered": False, ...}.
# Normalize both shapes so a getattr default doesn't
# misread a dict, and so a None / success-less object
# is NOT counted as delivered (#47056).
if isinstance(send_result, dict):
send_success = bool(send_result.get("success", False))
send_raw_response = send_result.get("raw_response")
else:
send_success = _confirm_adapter_delivery(send_result)
send_raw_response = getattr(send_result, "raw_response", None)
# Send extracted media files as native attachments via the live adapter.
# Skip on an in-flight confirmation timeout: the gateway loop is
# contended, so each media send would also block its 30s budget,
# and the text payload is already assumed delivered (#38922).
# Record the skipped attachments so the drop is visible in the
# job's delivery error rather than silently lost.
if not send_success:
if isinstance(send_result, dict):
err = send_result.get("error", "unknown")
shape = "dict"
elif send_result is not None:
err = getattr(send_result, "error", None)
shape = type(send_result).__name__
else:
err = "no response from adapter"
shape = "None"
msg = (
f"live adapter send to {platform_name}:{chat_id} "
f"returned unconfirmed result ({shape}, error={err})"
)
logger.warning(
"Job '%s': %s, falling back to standalone",
job["id"], msg,
)
target_errors.append(msg)
adapter_ok = False # fall through to standalone path
elif (
send_raw_response
and thread_id
and send_raw_response.get("thread_fallback")
):
requested_thread_id = send_raw_response.get("requested_thread_id") or thread_id
msg = (
f"configured thread_id {requested_thread_id} for "
f"{platform_name}:{chat_id} was not found; delivered without thread_id"
)
logger.warning("Job '%s': %s", job["id"], msg)
delivery_errors.append(msg)
# Send extracted media files as native attachments via the live
# adapter, using the same DM-topic-aware routing as the text send
# (#22773 — media previously used a bare thread_id and landed in
# the General lane for private DM topics). Skip on an in-flight
# confirmation timeout: the gateway loop is contended, so each
# media send would also block its 30s budget, and the text
# payload is already assumed delivered (#38922). Record the
# skipped attachments so the drop is visible rather than silently
# lost.
if adapter_ok and not timed_out and media_files:
_send_media_via_adapter(
runtime_adapter,
chat_id,
media_files,
send_metadata,
media_metadata,
loop,
job,
platform=platform,

View file

@ -1458,6 +1458,7 @@ AUTHOR_MAP = {
"beastant1@gmail.com": "nekwo", # PR #26481 (PS5.1 UTF-8 BOM)
"43717185+nekwo@users.noreply.github.com": "nekwo",
"9785479+stepanov1975@users.noreply.github.com": "stepanov1975", # PR #22074 (setup config picker writes)
"devsart95@gmail.com": "devsart95", # PR #23249 (cron Telegram DM topic delivery)
"67979730+flooryyyy@users.noreply.github.com": "flooryyyy", # PR #26374 (tool_trace error detection)
"188585318+dgians@users.noreply.github.com": "dgians", # PR #26034 (.ts/.py/.sh docs types)
"zealy@tz.co": "dgians", # PR #26034 (bot-committed by zealy-tzco under dgians' PR)

View file

@ -625,9 +625,15 @@ class TestDeliverResultWrapping:
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
def fake_run_coro(coro, _loop):
# Actually run the routed coroutine (router._deliver_to_platform)
# so the underlying adapter.send is invoked, then wrap the real
# result in a completed Future (matching run_coroutine_threadsafe).
import asyncio as _asyncio
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
job = {
@ -676,9 +682,15 @@ class TestDeliverResultWrapping:
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
# Actually run the routed coroutine (router._deliver_to_platform)
# so the underlying adapter.send is invoked, then wrap the real
# result in a completed Future (matching run_coroutine_threadsafe).
import asyncio as _asyncio
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
job = {
@ -719,9 +731,15 @@ class TestDeliverResultWrapping:
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
# Actually run the routed coroutine (router._deliver_to_platform)
# so the underlying adapter.send is invoked, then wrap the real
# result in a completed Future (matching run_coroutine_threadsafe).
import asyncio as _asyncio
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
job = {
@ -763,9 +781,15 @@ class TestDeliverResultWrapping:
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
# Actually run the routed coroutine (router._deliver_to_platform)
# so the underlying adapter.send is invoked, then wrap the real
# result in a completed Future (matching run_coroutine_threadsafe).
import asyncio as _asyncio
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
job = {
@ -2889,22 +2913,19 @@ class TestDeliverResultTimeoutCancelsFuture:
standalone_send.assert_awaited_once()
assert result is None, f"standalone should have delivered, got: {result!r}"
def test_live_adapter_thread_fallback_records_delivery_error(self):
"""A cron target with an explicit topic must not be marked clean if
Telegram falls back to the base chat after "thread not found".
def test_live_adapter_private_dm_topic_routes_via_direct_messages_topic_id(self):
"""#22773: a cron target to a PRIVATE Telegram chat with a numeric topic
id must be routed via ``direct_messages_topic_id`` (Bot API DM topics),
NOT a bare ``message_thread_id`` (which Bot API 10.0 rejects / mis-routes
to General). The cron live-adapter path routes through the gateway
DeliveryRouter, which applies the same three-mode routing as live
messages.
"""
from gateway.config import Platform
from gateway.platforms.base import SendResult
from concurrent.futures import Future
send_result = SendResult(
success=True,
message_id="42",
raw_response={
"requested_thread_id": 7072,
"thread_fallback": True,
},
)
send_result = SendResult(success=True, message_id="42")
adapter = MagicMock()
adapter.send = AsyncMock(return_value=send_result)
@ -2912,21 +2933,25 @@ class TestDeliverResultTimeoutCancelsFuture:
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
# DeliveryRouter consults the silence-narration config flag.
mock_cfg.filter_silence_narration = False
loop = MagicMock()
loop.is_running.return_value = True
job = {
"id": "thread-fallback-job",
"deliver": "telegram:226252250:7072",
"id": "dm-topic-job",
"deliver": "telegram:226252250:7072", # private chat + numeric topic
}
completed_future = Future()
completed_future.set_result(send_result)
def fake_run_coro(coro, _loop):
coro.close()
return completed_future
import asyncio as _asyncio
future = Future()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
@ -2938,15 +2963,141 @@ class TestDeliverResultTimeoutCancelsFuture:
loop=loop,
)
assert result == (
"configured thread_id 7072 for telegram:226252250 was not found; "
"delivered without thread_id"
assert result is None, f"expected clean delivery, got: {result!r}"
adapter.send.assert_called_once()
sent_chat_id, sent_text = adapter.send.call_args[0][0], adapter.send.call_args[0][1]
sent_metadata = adapter.send.call_args[1]["metadata"]
assert sent_chat_id == "226252250"
assert sent_text == "Hello world"
# The topic must be addressed via direct_messages_topic_id, and a bare
# message_thread_id must NOT be set (that is the Bot API 10.0 bug).
assert str(sent_metadata.get("direct_messages_topic_id")) == "7072"
assert not sent_metadata.get("message_thread_id")
def test_live_adapter_private_dm_topic_media_routes_via_direct_messages_topic_id(self, tmp_path, monkeypatch):
"""#22773 (media): MEDIA attachments to a private DM topic must also be
routed via ``direct_messages_topic_id``, not a bare ``message_thread_id``
the media path previously used the bare thread_id and landed
attachments in the General lane."""
from gateway.config import Platform
from gateway.platforms.base import SendResult
from concurrent.futures import Future
media_root = tmp_path / "media-cache"
media_file = media_root / "chart.png"
media_file.parent.mkdir(parents=True, exist_ok=True)
media_file.write_bytes(b"media")
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
(media_root,),
)
adapter.send.assert_called_once_with(
"226252250",
"Hello world",
metadata={"thread_id": "7072"},
media_path = media_file.resolve()
adapter = AsyncMock()
adapter.send.return_value = SendResult(success=True, message_id="1")
adapter.send_image_file.return_value = SendResult(success=True, message_id="2")
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
mock_cfg.filter_silence_narration = False
loop = MagicMock()
loop.is_running.return_value = True
job = {
"id": "dm-topic-media-job",
"deliver": "telegram:226252250:7072", # private chat + numeric topic
}
def fake_run_coro(coro, _loop):
import asyncio as _asyncio
future = Future()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
f"Chart attached\nMEDIA:{media_path}",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
adapter.send_image_file.assert_called_once()
media_metadata = adapter.send_image_file.call_args[1]["metadata"]
assert str(media_metadata.get("direct_messages_topic_id")) == "7072"
assert not media_metadata.get("message_thread_id")
assert not media_metadata.get("thread_id")
def test_live_adapter_forum_thread_fallback_records_delivery_error(self):
"""A forum/supergroup cron target whose configured topic is gone must
NOT be reported as a clean delivery: when the Telegram adapter falls
back to the base chat (raw_response thread_fallback), the scheduler must
record the "delivered without thread_id" delivery error. Regression
coverage for the thread_fallback-recording branch (kept distinct from
the #22773 routing fix)."""
from gateway.config import Platform
from gateway.platforms.base import SendResult
from concurrent.futures import Future
send_result = SendResult(
success=True,
message_id="42",
raw_response={
"requested_thread_id": 17,
"thread_fallback": True,
},
)
adapter = MagicMock()
adapter.send = AsyncMock(return_value=send_result)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
mock_cfg.filter_silence_narration = False
loop = MagicMock()
loop.is_running.return_value = True
# Forum supergroup (negative chat_id) + numeric topic → mode 1
# (message_thread_id); NOT a private DM topic.
job = {
"id": "forum-fallback-job",
"deliver": "telegram:-1001234567890:17",
}
def fake_run_coro(coro, _loop):
import asyncio as _asyncio
future = Future()
try:
future.set_result(_asyncio.run(coro))
except BaseException as _e: # noqa: BLE001
future.set_exception(_e)
return future
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
assert result is not None
assert "was not found; delivered without thread_id" in result
# Forum target routes via message_thread_id (mode 1), not DM-topic.
sent_metadata = adapter.send.call_args[1]["metadata"]
assert not sent_metadata.get("direct_messages_topic_id")
class TestDeliverResultLiveAdapterUnconfirmed: