From 4cc28aa3bbb83a974a3dc311909ce45a0726fb41 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sun, 21 Jun 2026 13:07:10 +0530 Subject: [PATCH 1/2] fix(cron): route Telegram DM-topic cron delivery through DeliveryRouter (#22773) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #22410 added three-mode Telegram topic routing to the live message path (TelegramAdapter.send via the gateway DeliveryRouter), but the cron delivery path never got it. cron/scheduler.py::_deliver_result sent through the live adapter with a bare ``{"thread_id": ...}`` and fell back to the standalone _send_telegram, neither of which addresses Bot API Direct Messages topics correctly. After Bot API 10.0 (2026-05-08), sending to a private chat with a bare ``message_thread_id`` is rejected/mis-routed, so cron deliveries to a private DM topic landed in the General topic instead of the requested lane. Fix: the cron live-adapter branch now routes the text send through the gateway's ``DeliveryRouter._deliver_to_platform`` — the same canonical path live messages use — so it inherits all three Telegram routing modes: 1. Forum/supergroup (negative chat_id) -> message_thread_id 2. Bot API DM topics (private chat_id + numeric topic id) -> direct_messages_topic_id (the case #22773 reported) 3. Hermes-created named private DM-topic lanes -> ensure_dm_topic + reply anchor For mode 2, a private-chat target with a numeric topic id is passed as ``direct_messages_topic_id`` metadata (verified end-to-end: TelegramAdapter._thread_kwargs_for_send turns it into ``{message_thread_id: None, direct_messages_topic_id: }``), instead of a bare message_thread_id. Forum/supergroup and home-channel deliveries are unchanged. The standalone fallback (gateway down) is preserved. No new config knob and no duplicated routing logic — this reuses the existing DeliveryRouter rather than reimplementing topic routing in the cron path. Salvaged from #42051 (stepanov1975) and #23249 (devsart95), which both diagnosed the missing three-mode routing in the cron/standalone path; reimplemented onto the canonical DeliveryRouter that landed since those PRs were opened. Co-authored-by: Alex <9785479+stepanov1975@users.noreply.github.com> Co-authored-by: devsart95 --- cron/scheduler.py | 163 ++++++++++++++++++-------- tests/cron/test_scheduler.py | 217 +++++++++++++++++++++++++++++------ 2 files changed, 302 insertions(+), 78 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index bdea20b3d14..bd6d2b5359f 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -847,16 +847,74 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option delivered = False target_errors = [] if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)(): - send_metadata = {"thread_id": thread_id} if thread_id else None + # Telegram three-mode topic routing (#22773): a private chat + # (positive chat_id) with a NUMERIC topic id is a Bot API Direct + # Messages topic and must be addressed via ``direct_messages_topic_id`` + # — a bare ``message_thread_id`` is rejected/mis-routed by Bot API + # 10.0 and lands in General. Forum/supergroup targets (negative + # chat_id) and named DM-topic lanes keep the default thread_id + # handling. Compute the routed metadata ONCE so both the text send + # (via DeliveryRouter) and the media send use the same routing. + from gateway.delivery import ( + DeliveryRouter, + DeliveryTarget, + _looks_like_int, + _looks_like_telegram_private_chat_id, + ) + + is_private_dm_topic = ( + platform == Platform.TELEGRAM + and thread_id is not None + and _looks_like_telegram_private_chat_id(str(chat_id)) + and _looks_like_int(str(thread_id)) + ) + if is_private_dm_topic: + # Routed via direct_messages_topic_id (mode 2), no bare thread_id. + route_thread_id = None + route_metadata = { + "direct_messages_topic_id": str(thread_id), + "job_id": job["id"], + } + # Media metadata mirrors the text routing so attachments land in + # the same DM topic instead of the General lane (#22773). + media_metadata = {"direct_messages_topic_id": str(thread_id)} + else: + route_thread_id = str(thread_id) if thread_id is not None else None + route_metadata = {"job_id": job["id"]} + media_metadata = {"thread_id": thread_id} if thread_id else None + try: - # Send cleaned text (MEDIA tags stripped) — not the raw content + # Send cleaned text (MEDIA tags stripped) — not the raw content. + # Route through the gateway's DeliveryRouter so the live send + # gets the same platform-specific routing as live messages — + # in particular Telegram's three-mode topic routing. The + # standalone cron path lacked this, so DM-topic cron deliveries + # landed in the General topic or were rejected by Bot API 10.0 + # (#22773). text_to_send = cleaned_delivery_content.strip() adapter_ok = True timed_out = False if text_to_send: from agent.async_utils import safe_schedule_threadsafe + + router = DeliveryRouter(config, adapters) + route_target = DeliveryTarget( + platform=platform, + chat_id=str(chat_id), + thread_id=route_thread_id, + is_explicit=True, + ) + # Pass thread routing via the target (not a bare metadata + # "thread_id"): the router only applies its Telegram DM-topic + # detection when "thread_id"/"message_thread_id" are absent + # from metadata, deriving the routing from target.thread_id + # or the explicit direct_messages_topic_id above. future = safe_schedule_threadsafe( - runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata), + router._deliver_to_platform( + route_target, + text_to_send, + route_metadata, + ), loop, ) if future is None: @@ -922,54 +980,69 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option # dispatched). send_result is None, so skip the # confirmation/thread-fallback inspection below. pass - elif not _confirm_adapter_delivery(send_result): - # A ``None`` return or a result object missing an - # explicit ``success`` attribute is NOT a confirmed - # delivery (#47056): the scheduler would log - # "delivered" while the gateway never saw it. Fall - # through to the standalone path. - err = ( - getattr(send_result, "error", None) - if send_result is not None - else "no response from adapter" - ) - shape = type(send_result).__name__ if send_result is not None else "None" - msg = ( - f"live adapter send to {platform_name}:{chat_id} " - f"returned unconfirmed result ({shape}, error={err})" - ) - logger.warning( - "Job '%s': %s, falling back to standalone", - job["id"], msg, - ) - target_errors.append(msg) - adapter_ok = False # fall through to standalone path - elif ( - send_result - and thread_id - and getattr(send_result, "raw_response", None) - and send_result.raw_response.get("thread_fallback") - ): - requested_thread_id = send_result.raw_response.get("requested_thread_id") or thread_id - msg = ( - f"configured thread_id {requested_thread_id} for " - f"{platform_name}:{chat_id} was not found; delivered without thread_id" - ) - logger.warning("Job '%s': %s", job["id"], msg) - delivery_errors.append(msg) + else: + # _deliver_to_platform returns either a SendResult + # (.success attr) or, when the silence-narration + # filter drops the message, a plain dict + # {"success": True, "delivered": False, ...}. + # Normalize both shapes so a getattr default doesn't + # misread a dict, and so a None / success-less object + # is NOT counted as delivered (#47056). + if isinstance(send_result, dict): + send_success = bool(send_result.get("success", False)) + send_raw_response = send_result.get("raw_response") + else: + send_success = _confirm_adapter_delivery(send_result) + send_raw_response = getattr(send_result, "raw_response", None) - # Send extracted media files as native attachments via the live adapter. - # Skip on an in-flight confirmation timeout: the gateway loop is - # contended, so each media send would also block its 30s budget, - # and the text payload is already assumed delivered (#38922). - # Record the skipped attachments so the drop is visible in the - # job's delivery error rather than silently lost. + if not send_success: + if isinstance(send_result, dict): + err = send_result.get("error", "unknown") + shape = "dict" + elif send_result is not None: + err = getattr(send_result, "error", None) + shape = type(send_result).__name__ + else: + err = "no response from adapter" + shape = "None" + msg = ( + f"live adapter send to {platform_name}:{chat_id} " + f"returned unconfirmed result ({shape}, error={err})" + ) + logger.warning( + "Job '%s': %s, falling back to standalone", + job["id"], msg, + ) + target_errors.append(msg) + adapter_ok = False # fall through to standalone path + elif ( + send_raw_response + and thread_id + and send_raw_response.get("thread_fallback") + ): + requested_thread_id = send_raw_response.get("requested_thread_id") or thread_id + msg = ( + f"configured thread_id {requested_thread_id} for " + f"{platform_name}:{chat_id} was not found; delivered without thread_id" + ) + logger.warning("Job '%s': %s", job["id"], msg) + delivery_errors.append(msg) + + # Send extracted media files as native attachments via the live + # adapter, using the same DM-topic-aware routing as the text send + # (#22773 — media previously used a bare thread_id and landed in + # the General lane for private DM topics). Skip on an in-flight + # confirmation timeout: the gateway loop is contended, so each + # media send would also block its 30s budget, and the text + # payload is already assumed delivered (#38922). Record the + # skipped attachments so the drop is visible rather than silently + # lost. if adapter_ok and not timed_out and media_files: _send_media_via_adapter( runtime_adapter, chat_id, media_files, - send_metadata, + media_metadata, loop, job, platform=platform, diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index a57f0805f8b..27613e7e1ca 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -625,9 +625,15 @@ class TestDeliverResultWrapping: # run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg) def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -676,9 +682,15 @@ class TestDeliverResultWrapping: loop.is_running.return_value = True def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -719,9 +731,15 @@ class TestDeliverResultWrapping: loop.is_running.return_value = True def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -763,9 +781,15 @@ class TestDeliverResultWrapping: loop.is_running.return_value = True def fake_run_coro(coro, _loop): + # Actually run the routed coroutine (router._deliver_to_platform) + # so the underlying adapter.send is invoked, then wrap the real + # result in a completed Future (matching run_coroutine_threadsafe). + import asyncio as _asyncio future = Future() - future.set_result(MagicMock(success=True)) - coro.close() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) return future job = { @@ -2889,22 +2913,19 @@ class TestDeliverResultTimeoutCancelsFuture: standalone_send.assert_awaited_once() assert result is None, f"standalone should have delivered, got: {result!r}" - def test_live_adapter_thread_fallback_records_delivery_error(self): - """A cron target with an explicit topic must not be marked clean if - Telegram falls back to the base chat after "thread not found". + def test_live_adapter_private_dm_topic_routes_via_direct_messages_topic_id(self): + """#22773: a cron target to a PRIVATE Telegram chat with a numeric topic + id must be routed via ``direct_messages_topic_id`` (Bot API DM topics), + NOT a bare ``message_thread_id`` (which Bot API 10.0 rejects / mis-routes + to General). The cron live-adapter path routes through the gateway + DeliveryRouter, which applies the same three-mode routing as live + messages. """ from gateway.config import Platform from gateway.platforms.base import SendResult from concurrent.futures import Future - send_result = SendResult( - success=True, - message_id="42", - raw_response={ - "requested_thread_id": 7072, - "thread_fallback": True, - }, - ) + send_result = SendResult(success=True, message_id="42") adapter = MagicMock() adapter.send = AsyncMock(return_value=send_result) @@ -2912,21 +2933,25 @@ class TestDeliverResultTimeoutCancelsFuture: pconfig.enabled = True mock_cfg = MagicMock() mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + # DeliveryRouter consults the silence-narration config flag. + mock_cfg.filter_silence_narration = False loop = MagicMock() loop.is_running.return_value = True job = { - "id": "thread-fallback-job", - "deliver": "telegram:226252250:7072", + "id": "dm-topic-job", + "deliver": "telegram:226252250:7072", # private chat + numeric topic } - completed_future = Future() - completed_future.set_result(send_result) - def fake_run_coro(coro, _loop): - coro.close() - return completed_future + import asyncio as _asyncio + future = Future() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) + return future with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ @@ -2938,15 +2963,141 @@ class TestDeliverResultTimeoutCancelsFuture: loop=loop, ) - assert result == ( - "configured thread_id 7072 for telegram:226252250 was not found; " - "delivered without thread_id" + assert result is None, f"expected clean delivery, got: {result!r}" + adapter.send.assert_called_once() + sent_chat_id, sent_text = adapter.send.call_args[0][0], adapter.send.call_args[0][1] + sent_metadata = adapter.send.call_args[1]["metadata"] + assert sent_chat_id == "226252250" + assert sent_text == "Hello world" + # The topic must be addressed via direct_messages_topic_id, and a bare + # message_thread_id must NOT be set (that is the Bot API 10.0 bug). + assert str(sent_metadata.get("direct_messages_topic_id")) == "7072" + assert not sent_metadata.get("message_thread_id") + + def test_live_adapter_private_dm_topic_media_routes_via_direct_messages_topic_id(self, tmp_path, monkeypatch): + """#22773 (media): MEDIA attachments to a private DM topic must also be + routed via ``direct_messages_topic_id``, not a bare ``message_thread_id`` + — the media path previously used the bare thread_id and landed + attachments in the General lane.""" + from gateway.config import Platform + from gateway.platforms.base import SendResult + from concurrent.futures import Future + + media_root = tmp_path / "media-cache" + media_file = media_root / "chart.png" + media_file.parent.mkdir(parents=True, exist_ok=True) + media_file.write_bytes(b"media") + monkeypatch.setattr( + "gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS", + (media_root,), ) - adapter.send.assert_called_once_with( - "226252250", - "Hello world", - metadata={"thread_id": "7072"}, + media_path = media_file.resolve() + + adapter = AsyncMock() + adapter.send.return_value = SendResult(success=True, message_id="1") + adapter.send_image_file.return_value = SendResult(success=True, message_id="2") + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + mock_cfg.filter_silence_narration = False + + loop = MagicMock() + loop.is_running.return_value = True + + job = { + "id": "dm-topic-media-job", + "deliver": "telegram:226252250:7072", # private chat + numeric topic + } + + def fake_run_coro(coro, _loop): + import asyncio as _asyncio + future = Future() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) + return future + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + _deliver_result( + job, + f"Chart attached\nMEDIA:{media_path}", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + adapter.send_image_file.assert_called_once() + media_metadata = adapter.send_image_file.call_args[1]["metadata"] + assert str(media_metadata.get("direct_messages_topic_id")) == "7072" + assert not media_metadata.get("message_thread_id") + assert not media_metadata.get("thread_id") + + def test_live_adapter_forum_thread_fallback_records_delivery_error(self): + """A forum/supergroup cron target whose configured topic is gone must + NOT be reported as a clean delivery: when the Telegram adapter falls + back to the base chat (raw_response thread_fallback), the scheduler must + record the "delivered without thread_id" delivery error. Regression + coverage for the thread_fallback-recording branch (kept distinct from + the #22773 routing fix).""" + from gateway.config import Platform + from gateway.platforms.base import SendResult + from concurrent.futures import Future + + send_result = SendResult( + success=True, + message_id="42", + raw_response={ + "requested_thread_id": 17, + "thread_fallback": True, + }, ) + adapter = MagicMock() + adapter.send = AsyncMock(return_value=send_result) + + pconfig = MagicMock() + pconfig.enabled = True + mock_cfg = MagicMock() + mock_cfg.platforms = {Platform.TELEGRAM: pconfig} + mock_cfg.filter_silence_narration = False + + loop = MagicMock() + loop.is_running.return_value = True + + # Forum supergroup (negative chat_id) + numeric topic → mode 1 + # (message_thread_id); NOT a private DM topic. + job = { + "id": "forum-fallback-job", + "deliver": "telegram:-1001234567890:17", + } + + def fake_run_coro(coro, _loop): + import asyncio as _asyncio + future = Future() + try: + future.set_result(_asyncio.run(coro)) + except BaseException as _e: # noqa: BLE001 + future.set_exception(_e) + return future + + with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \ + patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \ + patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro): + result = _deliver_result( + job, + "Hello world", + adapters={Platform.TELEGRAM: adapter}, + loop=loop, + ) + + assert result is not None + assert "was not found; delivered without thread_id" in result + # Forum target routes via message_thread_id (mode 1), not DM-topic. + sent_metadata = adapter.send.call_args[1]["metadata"] + assert not sent_metadata.get("direct_messages_topic_id") class TestDeliverResultLiveAdapterUnconfirmed: From f43c61643d3e95b1aaab024d1ede5e2b5cbab378 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Sun, 21 Jun 2026 13:07:10 +0530 Subject: [PATCH 2/2] chore(release): add devsart95 to AUTHOR_MAP --- scripts/release.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/release.py b/scripts/release.py index e70fd8d5f3b..0c6ccf36659 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -1458,6 +1458,7 @@ AUTHOR_MAP = { "beastant1@gmail.com": "nekwo", # PR #26481 (PS5.1 UTF-8 BOM) "43717185+nekwo@users.noreply.github.com": "nekwo", "9785479+stepanov1975@users.noreply.github.com": "stepanov1975", # PR #22074 (setup config picker writes) + "devsart95@gmail.com": "devsart95", # PR #23249 (cron Telegram DM topic delivery) "67979730+flooryyyy@users.noreply.github.com": "flooryyyy", # PR #26374 (tool_trace error detection) "188585318+dgians@users.noreply.github.com": "dgians", # PR #26034 (.ts/.py/.sh docs types) "zealy@tz.co": "dgians", # PR #26034 (bot-committed by zealy-tzco under dgians' PR)