mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-23 10:42:00 +00:00
fix(cron): route Telegram DM-topic cron delivery through DeliveryRouter (#22773)
PR #22410 added three-mode Telegram topic routing to the live message path (TelegramAdapter.send via the gateway DeliveryRouter), but the cron delivery path never got it. cron/scheduler.py::_deliver_result sent through the live adapter with a bare ``{"thread_id": ...}`` and fell back to the standalone _send_telegram, neither of which addresses Bot API Direct Messages topics correctly. After Bot API 10.0 (2026-05-08), sending to a private chat with a bare ``message_thread_id`` is rejected/mis-routed, so cron deliveries to a private DM topic landed in the General topic instead of the requested lane. Fix: the cron live-adapter branch now routes the text send through the gateway's ``DeliveryRouter._deliver_to_platform`` — the same canonical path live messages use — so it inherits all three Telegram routing modes: 1. Forum/supergroup (negative chat_id) -> message_thread_id 2. Bot API DM topics (private chat_id + numeric topic id) -> direct_messages_topic_id (the case #22773 reported) 3. Hermes-created named private DM-topic lanes -> ensure_dm_topic + reply anchor For mode 2, a private-chat target with a numeric topic id is passed as ``direct_messages_topic_id`` metadata (verified end-to-end: TelegramAdapter._thread_kwargs_for_send turns it into ``{message_thread_id: None, direct_messages_topic_id: <int>}``), instead of a bare message_thread_id. Forum/supergroup and home-channel deliveries are unchanged. The standalone fallback (gateway down) is preserved. No new config knob and no duplicated routing logic — this reuses the existing DeliveryRouter rather than reimplementing topic routing in the cron path. Salvaged from #42051 (stepanov1975) and #23249 (devsart95), which both diagnosed the missing three-mode routing in the cron/standalone path; reimplemented onto the canonical DeliveryRouter that landed since those PRs were opened. Co-authored-by: Alex <9785479+stepanov1975@users.noreply.github.com> Co-authored-by: devsart95 <devsart95@gmail.com>
This commit is contained in:
parent
02a3288de3
commit
4cc28aa3bb
2 changed files with 302 additions and 78 deletions
|
|
@ -847,16 +847,74 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
delivered = False
|
||||
target_errors = []
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
send_metadata = {"thread_id": thread_id} if thread_id else None
|
||||
# Telegram three-mode topic routing (#22773): a private chat
|
||||
# (positive chat_id) with a NUMERIC topic id is a Bot API Direct
|
||||
# Messages topic and must be addressed via ``direct_messages_topic_id``
|
||||
# — a bare ``message_thread_id`` is rejected/mis-routed by Bot API
|
||||
# 10.0 and lands in General. Forum/supergroup targets (negative
|
||||
# chat_id) and named DM-topic lanes keep the default thread_id
|
||||
# handling. Compute the routed metadata ONCE so both the text send
|
||||
# (via DeliveryRouter) and the media send use the same routing.
|
||||
from gateway.delivery import (
|
||||
DeliveryRouter,
|
||||
DeliveryTarget,
|
||||
_looks_like_int,
|
||||
_looks_like_telegram_private_chat_id,
|
||||
)
|
||||
|
||||
is_private_dm_topic = (
|
||||
platform == Platform.TELEGRAM
|
||||
and thread_id is not None
|
||||
and _looks_like_telegram_private_chat_id(str(chat_id))
|
||||
and _looks_like_int(str(thread_id))
|
||||
)
|
||||
if is_private_dm_topic:
|
||||
# Routed via direct_messages_topic_id (mode 2), no bare thread_id.
|
||||
route_thread_id = None
|
||||
route_metadata = {
|
||||
"direct_messages_topic_id": str(thread_id),
|
||||
"job_id": job["id"],
|
||||
}
|
||||
# Media metadata mirrors the text routing so attachments land in
|
||||
# the same DM topic instead of the General lane (#22773).
|
||||
media_metadata = {"direct_messages_topic_id": str(thread_id)}
|
||||
else:
|
||||
route_thread_id = str(thread_id) if thread_id is not None else None
|
||||
route_metadata = {"job_id": job["id"]}
|
||||
media_metadata = {"thread_id": thread_id} if thread_id else None
|
||||
|
||||
try:
|
||||
# Send cleaned text (MEDIA tags stripped) — not the raw content
|
||||
# Send cleaned text (MEDIA tags stripped) — not the raw content.
|
||||
# Route through the gateway's DeliveryRouter so the live send
|
||||
# gets the same platform-specific routing as live messages —
|
||||
# in particular Telegram's three-mode topic routing. The
|
||||
# standalone cron path lacked this, so DM-topic cron deliveries
|
||||
# landed in the General topic or were rejected by Bot API 10.0
|
||||
# (#22773).
|
||||
text_to_send = cleaned_delivery_content.strip()
|
||||
adapter_ok = True
|
||||
timed_out = False
|
||||
if text_to_send:
|
||||
from agent.async_utils import safe_schedule_threadsafe
|
||||
|
||||
router = DeliveryRouter(config, adapters)
|
||||
route_target = DeliveryTarget(
|
||||
platform=platform,
|
||||
chat_id=str(chat_id),
|
||||
thread_id=route_thread_id,
|
||||
is_explicit=True,
|
||||
)
|
||||
# Pass thread routing via the target (not a bare metadata
|
||||
# "thread_id"): the router only applies its Telegram DM-topic
|
||||
# detection when "thread_id"/"message_thread_id" are absent
|
||||
# from metadata, deriving the routing from target.thread_id
|
||||
# or the explicit direct_messages_topic_id above.
|
||||
future = safe_schedule_threadsafe(
|
||||
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
|
||||
router._deliver_to_platform(
|
||||
route_target,
|
||||
text_to_send,
|
||||
route_metadata,
|
||||
),
|
||||
loop,
|
||||
)
|
||||
if future is None:
|
||||
|
|
@ -922,54 +980,69 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
|||
# dispatched). send_result is None, so skip the
|
||||
# confirmation/thread-fallback inspection below.
|
||||
pass
|
||||
elif not _confirm_adapter_delivery(send_result):
|
||||
# A ``None`` return or a result object missing an
|
||||
# explicit ``success`` attribute is NOT a confirmed
|
||||
# delivery (#47056): the scheduler would log
|
||||
# "delivered" while the gateway never saw it. Fall
|
||||
# through to the standalone path.
|
||||
err = (
|
||||
getattr(send_result, "error", None)
|
||||
if send_result is not None
|
||||
else "no response from adapter"
|
||||
)
|
||||
shape = type(send_result).__name__ if send_result is not None else "None"
|
||||
msg = (
|
||||
f"live adapter send to {platform_name}:{chat_id} "
|
||||
f"returned unconfirmed result ({shape}, error={err})"
|
||||
)
|
||||
logger.warning(
|
||||
"Job '%s': %s, falling back to standalone",
|
||||
job["id"], msg,
|
||||
)
|
||||
target_errors.append(msg)
|
||||
adapter_ok = False # fall through to standalone path
|
||||
elif (
|
||||
send_result
|
||||
and thread_id
|
||||
and getattr(send_result, "raw_response", None)
|
||||
and send_result.raw_response.get("thread_fallback")
|
||||
):
|
||||
requested_thread_id = send_result.raw_response.get("requested_thread_id") or thread_id
|
||||
msg = (
|
||||
f"configured thread_id {requested_thread_id} for "
|
||||
f"{platform_name}:{chat_id} was not found; delivered without thread_id"
|
||||
)
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
delivery_errors.append(msg)
|
||||
else:
|
||||
# _deliver_to_platform returns either a SendResult
|
||||
# (.success attr) or, when the silence-narration
|
||||
# filter drops the message, a plain dict
|
||||
# {"success": True, "delivered": False, ...}.
|
||||
# Normalize both shapes so a getattr default doesn't
|
||||
# misread a dict, and so a None / success-less object
|
||||
# is NOT counted as delivered (#47056).
|
||||
if isinstance(send_result, dict):
|
||||
send_success = bool(send_result.get("success", False))
|
||||
send_raw_response = send_result.get("raw_response")
|
||||
else:
|
||||
send_success = _confirm_adapter_delivery(send_result)
|
||||
send_raw_response = getattr(send_result, "raw_response", None)
|
||||
|
||||
# Send extracted media files as native attachments via the live adapter.
|
||||
# Skip on an in-flight confirmation timeout: the gateway loop is
|
||||
# contended, so each media send would also block its 30s budget,
|
||||
# and the text payload is already assumed delivered (#38922).
|
||||
# Record the skipped attachments so the drop is visible in the
|
||||
# job's delivery error rather than silently lost.
|
||||
if not send_success:
|
||||
if isinstance(send_result, dict):
|
||||
err = send_result.get("error", "unknown")
|
||||
shape = "dict"
|
||||
elif send_result is not None:
|
||||
err = getattr(send_result, "error", None)
|
||||
shape = type(send_result).__name__
|
||||
else:
|
||||
err = "no response from adapter"
|
||||
shape = "None"
|
||||
msg = (
|
||||
f"live adapter send to {platform_name}:{chat_id} "
|
||||
f"returned unconfirmed result ({shape}, error={err})"
|
||||
)
|
||||
logger.warning(
|
||||
"Job '%s': %s, falling back to standalone",
|
||||
job["id"], msg,
|
||||
)
|
||||
target_errors.append(msg)
|
||||
adapter_ok = False # fall through to standalone path
|
||||
elif (
|
||||
send_raw_response
|
||||
and thread_id
|
||||
and send_raw_response.get("thread_fallback")
|
||||
):
|
||||
requested_thread_id = send_raw_response.get("requested_thread_id") or thread_id
|
||||
msg = (
|
||||
f"configured thread_id {requested_thread_id} for "
|
||||
f"{platform_name}:{chat_id} was not found; delivered without thread_id"
|
||||
)
|
||||
logger.warning("Job '%s': %s", job["id"], msg)
|
||||
delivery_errors.append(msg)
|
||||
|
||||
# Send extracted media files as native attachments via the live
|
||||
# adapter, using the same DM-topic-aware routing as the text send
|
||||
# (#22773 — media previously used a bare thread_id and landed in
|
||||
# the General lane for private DM topics). Skip on an in-flight
|
||||
# confirmation timeout: the gateway loop is contended, so each
|
||||
# media send would also block its 30s budget, and the text
|
||||
# payload is already assumed delivered (#38922). Record the
|
||||
# skipped attachments so the drop is visible rather than silently
|
||||
# lost.
|
||||
if adapter_ok and not timed_out and media_files:
|
||||
_send_media_via_adapter(
|
||||
runtime_adapter,
|
||||
chat_id,
|
||||
media_files,
|
||||
send_metadata,
|
||||
media_metadata,
|
||||
loop,
|
||||
job,
|
||||
platform=platform,
|
||||
|
|
|
|||
|
|
@ -625,9 +625,15 @@ class TestDeliverResultWrapping:
|
|||
|
||||
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
|
||||
def fake_run_coro(coro, _loop):
|
||||
# Actually run the routed coroutine (router._deliver_to_platform)
|
||||
# so the underlying adapter.send is invoked, then wrap the real
|
||||
# result in a completed Future (matching run_coroutine_threadsafe).
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
job = {
|
||||
|
|
@ -676,9 +682,15 @@ class TestDeliverResultWrapping:
|
|||
loop.is_running.return_value = True
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
# Actually run the routed coroutine (router._deliver_to_platform)
|
||||
# so the underlying adapter.send is invoked, then wrap the real
|
||||
# result in a completed Future (matching run_coroutine_threadsafe).
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
job = {
|
||||
|
|
@ -719,9 +731,15 @@ class TestDeliverResultWrapping:
|
|||
loop.is_running.return_value = True
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
# Actually run the routed coroutine (router._deliver_to_platform)
|
||||
# so the underlying adapter.send is invoked, then wrap the real
|
||||
# result in a completed Future (matching run_coroutine_threadsafe).
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
job = {
|
||||
|
|
@ -763,9 +781,15 @@ class TestDeliverResultWrapping:
|
|||
loop.is_running.return_value = True
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
# Actually run the routed coroutine (router._deliver_to_platform)
|
||||
# so the underlying adapter.send is invoked, then wrap the real
|
||||
# result in a completed Future (matching run_coroutine_threadsafe).
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
future.set_result(MagicMock(success=True))
|
||||
coro.close()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
job = {
|
||||
|
|
@ -2889,22 +2913,19 @@ class TestDeliverResultTimeoutCancelsFuture:
|
|||
standalone_send.assert_awaited_once()
|
||||
assert result is None, f"standalone should have delivered, got: {result!r}"
|
||||
|
||||
def test_live_adapter_thread_fallback_records_delivery_error(self):
|
||||
"""A cron target with an explicit topic must not be marked clean if
|
||||
Telegram falls back to the base chat after "thread not found".
|
||||
def test_live_adapter_private_dm_topic_routes_via_direct_messages_topic_id(self):
|
||||
"""#22773: a cron target to a PRIVATE Telegram chat with a numeric topic
|
||||
id must be routed via ``direct_messages_topic_id`` (Bot API DM topics),
|
||||
NOT a bare ``message_thread_id`` (which Bot API 10.0 rejects / mis-routes
|
||||
to General). The cron live-adapter path routes through the gateway
|
||||
DeliveryRouter, which applies the same three-mode routing as live
|
||||
messages.
|
||||
"""
|
||||
from gateway.config import Platform
|
||||
from gateway.platforms.base import SendResult
|
||||
from concurrent.futures import Future
|
||||
|
||||
send_result = SendResult(
|
||||
success=True,
|
||||
message_id="42",
|
||||
raw_response={
|
||||
"requested_thread_id": 7072,
|
||||
"thread_fallback": True,
|
||||
},
|
||||
)
|
||||
send_result = SendResult(success=True, message_id="42")
|
||||
adapter = MagicMock()
|
||||
adapter.send = AsyncMock(return_value=send_result)
|
||||
|
||||
|
|
@ -2912,21 +2933,25 @@ class TestDeliverResultTimeoutCancelsFuture:
|
|||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||
# DeliveryRouter consults the silence-narration config flag.
|
||||
mock_cfg.filter_silence_narration = False
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
job = {
|
||||
"id": "thread-fallback-job",
|
||||
"deliver": "telegram:226252250:7072",
|
||||
"id": "dm-topic-job",
|
||||
"deliver": "telegram:226252250:7072", # private chat + numeric topic
|
||||
}
|
||||
|
||||
completed_future = Future()
|
||||
completed_future.set_result(send_result)
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
coro.close()
|
||||
return completed_future
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
|
|
@ -2938,15 +2963,141 @@ class TestDeliverResultTimeoutCancelsFuture:
|
|||
loop=loop,
|
||||
)
|
||||
|
||||
assert result == (
|
||||
"configured thread_id 7072 for telegram:226252250 was not found; "
|
||||
"delivered without thread_id"
|
||||
assert result is None, f"expected clean delivery, got: {result!r}"
|
||||
adapter.send.assert_called_once()
|
||||
sent_chat_id, sent_text = adapter.send.call_args[0][0], adapter.send.call_args[0][1]
|
||||
sent_metadata = adapter.send.call_args[1]["metadata"]
|
||||
assert sent_chat_id == "226252250"
|
||||
assert sent_text == "Hello world"
|
||||
# The topic must be addressed via direct_messages_topic_id, and a bare
|
||||
# message_thread_id must NOT be set (that is the Bot API 10.0 bug).
|
||||
assert str(sent_metadata.get("direct_messages_topic_id")) == "7072"
|
||||
assert not sent_metadata.get("message_thread_id")
|
||||
|
||||
def test_live_adapter_private_dm_topic_media_routes_via_direct_messages_topic_id(self, tmp_path, monkeypatch):
|
||||
"""#22773 (media): MEDIA attachments to a private DM topic must also be
|
||||
routed via ``direct_messages_topic_id``, not a bare ``message_thread_id``
|
||||
— the media path previously used the bare thread_id and landed
|
||||
attachments in the General lane."""
|
||||
from gateway.config import Platform
|
||||
from gateway.platforms.base import SendResult
|
||||
from concurrent.futures import Future
|
||||
|
||||
media_root = tmp_path / "media-cache"
|
||||
media_file = media_root / "chart.png"
|
||||
media_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
media_file.write_bytes(b"media")
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
|
||||
(media_root,),
|
||||
)
|
||||
adapter.send.assert_called_once_with(
|
||||
"226252250",
|
||||
"Hello world",
|
||||
metadata={"thread_id": "7072"},
|
||||
media_path = media_file.resolve()
|
||||
|
||||
adapter = AsyncMock()
|
||||
adapter.send.return_value = SendResult(success=True, message_id="1")
|
||||
adapter.send_image_file.return_value = SendResult(success=True, message_id="2")
|
||||
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||
mock_cfg.filter_silence_narration = False
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
job = {
|
||||
"id": "dm-topic-media-job",
|
||||
"deliver": "telegram:226252250:7072", # private chat + numeric topic
|
||||
}
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
_deliver_result(
|
||||
job,
|
||||
f"Chart attached\nMEDIA:{media_path}",
|
||||
adapters={Platform.TELEGRAM: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
adapter.send_image_file.assert_called_once()
|
||||
media_metadata = adapter.send_image_file.call_args[1]["metadata"]
|
||||
assert str(media_metadata.get("direct_messages_topic_id")) == "7072"
|
||||
assert not media_metadata.get("message_thread_id")
|
||||
assert not media_metadata.get("thread_id")
|
||||
|
||||
def test_live_adapter_forum_thread_fallback_records_delivery_error(self):
|
||||
"""A forum/supergroup cron target whose configured topic is gone must
|
||||
NOT be reported as a clean delivery: when the Telegram adapter falls
|
||||
back to the base chat (raw_response thread_fallback), the scheduler must
|
||||
record the "delivered without thread_id" delivery error. Regression
|
||||
coverage for the thread_fallback-recording branch (kept distinct from
|
||||
the #22773 routing fix)."""
|
||||
from gateway.config import Platform
|
||||
from gateway.platforms.base import SendResult
|
||||
from concurrent.futures import Future
|
||||
|
||||
send_result = SendResult(
|
||||
success=True,
|
||||
message_id="42",
|
||||
raw_response={
|
||||
"requested_thread_id": 17,
|
||||
"thread_fallback": True,
|
||||
},
|
||||
)
|
||||
adapter = MagicMock()
|
||||
adapter.send = AsyncMock(return_value=send_result)
|
||||
|
||||
pconfig = MagicMock()
|
||||
pconfig.enabled = True
|
||||
mock_cfg = MagicMock()
|
||||
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
|
||||
mock_cfg.filter_silence_narration = False
|
||||
|
||||
loop = MagicMock()
|
||||
loop.is_running.return_value = True
|
||||
|
||||
# Forum supergroup (negative chat_id) + numeric topic → mode 1
|
||||
# (message_thread_id); NOT a private DM topic.
|
||||
job = {
|
||||
"id": "forum-fallback-job",
|
||||
"deliver": "telegram:-1001234567890:17",
|
||||
}
|
||||
|
||||
def fake_run_coro(coro, _loop):
|
||||
import asyncio as _asyncio
|
||||
future = Future()
|
||||
try:
|
||||
future.set_result(_asyncio.run(coro))
|
||||
except BaseException as _e: # noqa: BLE001
|
||||
future.set_exception(_e)
|
||||
return future
|
||||
|
||||
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
|
||||
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
|
||||
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
|
||||
result = _deliver_result(
|
||||
job,
|
||||
"Hello world",
|
||||
adapters={Platform.TELEGRAM: adapter},
|
||||
loop=loop,
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
assert "was not found; delivered without thread_id" in result
|
||||
# Forum target routes via message_thread_id (mode 1), not DM-topic.
|
||||
sent_metadata = adapter.send.call_args[1]["metadata"]
|
||||
assert not sent_metadata.get("direct_messages_topic_id")
|
||||
|
||||
|
||||
class TestDeliverResultLiveAdapterUnconfirmed:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue