fix: route Telegram DM topic deliveries directly

This commit is contained in:
Hermes Agent 2026-05-16 18:00:05 +00:00
parent fb05f5d4b5
commit ad8f97db6c
4 changed files with 142 additions and 26 deletions

View file

@ -25,6 +25,15 @@ from .config import Platform, GatewayConfig
from .session import SessionSource
def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool:
if chat_id is None:
return False
try:
return int(chat_id) > 0
except (TypeError, ValueError):
return False
@dataclass
class DeliveryTarget:
"""
@ -249,9 +258,22 @@ class DeliveryRouter:
)
send_metadata = dict(metadata or {})
if target.thread_id and "thread_id" not in send_metadata:
send_metadata["thread_id"] = target.thread_id
return await adapter.send(target.chat_id, content, metadata=send_metadata or None)
if target.thread_id:
if (
target.platform == Platform.TELEGRAM
and _looks_like_telegram_private_chat_id(target.chat_id)
and "thread_id" not in send_metadata
and "message_thread_id" not in send_metadata
and "direct_messages_topic_id" not in send_metadata
and "telegram_direct_messages_topic_id" not in send_metadata
):
send_metadata["telegram_direct_messages_topic_id"] = target.thread_id
elif "thread_id" not in send_metadata:
send_metadata["thread_id"] = target.thread_id
result = await adapter.send(target.chat_id, content, metadata=send_metadata or None)
if getattr(result, "success", True) is False:
raise RuntimeError(getattr(result, "error", None) or f"{target.platform.value} delivery failed")
return result

View file

@ -531,6 +531,34 @@ class TelegramAdapter(BasePlatformAdapter):
reply_to = metadata.get("telegram_reply_to_message_id")
return int(reply_to) if reply_to is not None else None
@staticmethod
def _looks_like_private_chat_id(chat_id: str) -> bool:
try:
return int(chat_id) > 0
except (TypeError, ValueError):
return False
@classmethod
def _is_private_dm_topic_send(
cls,
chat_id: str,
thread_id: Optional[str],
metadata: Optional[Dict[str, Any]],
) -> bool:
if cls._metadata_direct_messages_topic_id(metadata) is not None:
return False
return bool(
thread_id
and (
metadata and metadata.get("telegram_dm_topic_reply_fallback")
or cls._looks_like_private_chat_id(chat_id)
)
)
@staticmethod
def _dm_topic_missing_anchor_error() -> str:
return "Telegram DM topic delivery requires a reply anchor; refusing to send outside the requested topic"
@classmethod
def _reply_to_message_id_for_send(
cls,
@ -1545,15 +1573,21 @@ class TelegramAdapter(BasePlatformAdapter):
for i, chunk in enumerate(chunks):
metadata_reply_to = self._metadata_reply_to_message_id(metadata)
private_dm_topic_send = self._is_private_dm_topic_send(chat_id, thread_id, metadata)
reply_to_source = reply_to or (
str(metadata_reply_to)
if metadata and metadata.get("telegram_dm_topic_reply_fallback") and metadata_reply_to is not None else None
str(metadata_reply_to) if private_dm_topic_send and metadata_reply_to is not None else None
)
if metadata and metadata.get("telegram_dm_topic_reply_fallback"):
if private_dm_topic_send:
should_thread = reply_to_source is not None
else:
should_thread = self._should_thread_reply(reply_to_source, i)
reply_to_id = int(reply_to_source) if should_thread and reply_to_source else None
if private_dm_topic_send and reply_to_id is None:
return SendResult(
success=False,
error=self._dm_topic_missing_anchor_error(),
retryable=False,
)
thread_kwargs = self._thread_kwargs_for_send(
chat_id,
thread_id,
@ -1600,6 +1634,8 @@ class TelegramAdapter(BasePlatformAdapter):
# specific cases instead of blindly retrying.
if _BadReq and isinstance(send_err, _BadReq):
if self._is_thread_not_found_error(send_err) and effective_thread_id is not None:
if private_dm_topic_send:
raise
# Thread doesn't exist — retry without
# message_thread_id so the message still
# reaches the chat.
@ -1612,6 +1648,12 @@ class TelegramAdapter(BasePlatformAdapter):
continue
err_lower = str(send_err).lower()
if "message to be replied not found" in err_lower and reply_to_id is not None:
if private_dm_topic_send:
return SendResult(
success=False,
error=str(send_err),
retryable=False,
)
# Original message was deleted before we
# could reply. For private-topic fallback
# sends, message_thread_id is only valid with

View file

@ -1,7 +1,10 @@
"""Tests for the delivery routing module."""
from gateway.config import Platform
from gateway.delivery import DeliveryTarget
import pytest
from gateway.config import GatewayConfig, Platform
from gateway.delivery import DeliveryRouter, DeliveryTarget
from gateway.platforms.base import SendResult
from gateway.session import SessionSource
@ -122,5 +125,57 @@ class TestPlatformNameCaseInsensitivity:
assert target.platform == Platform.TELEGRAM
assert target.chat_id == "12345"
class RecordingAdapter:
def __init__(self):
self.calls = []
async def send(self, chat_id, content, metadata=None):
self.calls.append({"chat_id": chat_id, "content": content, "metadata": metadata})
return {"success": True}
@pytest.mark.asyncio
async def test_explicit_telegram_private_thread_uses_direct_messages_topic_id(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
adapter = RecordingAdapter()
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
target = DeliveryTarget.parse("telegram:722341991:32344")
await router._deliver_to_platform(target, "hello", metadata=None)
assert adapter.calls == [
{
"chat_id": "722341991",
"content": "hello",
"metadata": {
"telegram_direct_messages_topic_id": "32344",
},
}
]
@pytest.mark.asyncio
async def test_explicit_telegram_group_thread_does_not_mark_dm_fallback(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
adapter = RecordingAdapter()
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
target = DeliveryTarget.parse("telegram:-100123:42")
await router._deliver_to_platform(target, "hello", metadata=None)
assert adapter.calls[0]["metadata"] == {"thread_id": "42"}
class FailingAdapter:
async def send(self, chat_id, content, metadata=None):
return SendResult(success=False, error="route failed", retryable=False)
@pytest.mark.asyncio
async def test_platform_send_failure_raises_for_delivery_result(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: FailingAdapter()})
target = DeliveryTarget.parse("telegram:722341991:32344")
with pytest.raises(RuntimeError, match="route failed"):
await router._deliver_to_platform(target, "hello", metadata=None)

View file

@ -282,7 +282,7 @@ async def test_send_retries_without_thread_on_thread_not_found():
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(
chat_id="123",
chat_id="-100123",
content="test message",
metadata={"thread_id": "99999"},
)
@ -530,8 +530,8 @@ async def test_send_model_picker_uses_metadata_reply_fallback_for_dm_topics():
@pytest.mark.asyncio
async def test_send_dm_topic_fallback_without_anchor_does_not_crash():
"""DM-topic fallback without an anchor must not use message_thread_id alone."""
async def test_send_dm_topic_fallback_without_anchor_fails_closed():
"""DM-topic fallback without an anchor must not send outside the topic."""
adapter = _make_adapter()
call_log = []
@ -550,23 +550,21 @@ async def test_send_dm_topic_fallback_without_anchor_does_not_crash():
},
)
assert result.success is True
assert call_log[0]["reply_to_message_id"] is None
assert "message_thread_id" not in call_log[0]
assert "direct_messages_topic_id" not in call_log[0]
assert result.success is False
assert result.retryable is False
assert "requires a reply anchor" in (result.error or "")
assert call_log == []
@pytest.mark.asyncio
async def test_send_dm_topic_reply_not_found_retry_drops_thread_id():
"""If Telegram deletes the reply anchor, private-topic retry must drop thread id too."""
async def test_send_dm_topic_reply_not_found_fails_closed():
"""If Telegram deletes the reply anchor, private-topic sends must not fall back elsewhere."""
adapter = _make_adapter()
call_log = []
async def mock_send_message(**kwargs):
call_log.append(dict(kwargs))
if len(call_log) == 1:
raise FakeBadRequest("Message to be replied not found")
return SimpleNamespace(message_id=781)
raise FakeBadRequest("Message to be replied not found")
adapter._bot = SimpleNamespace(send_message=mock_send_message)
@ -580,12 +578,11 @@ async def test_send_dm_topic_reply_not_found_retry_drops_thread_id():
},
)
assert result.success is True
assert result.success is False
assert result.retryable is False
assert call_log[0]["reply_to_message_id"] == 462
assert call_log[0]["message_thread_id"] == 20197
assert call_log[1]["reply_to_message_id"] is None
assert "message_thread_id" not in call_log[1]
assert "direct_messages_topic_id" not in call_log[1]
assert len(call_log) == 1
@pytest.mark.asyncio
@ -926,7 +923,7 @@ async def test_send_raises_on_other_bad_request():
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(
chat_id="123",
chat_id="-100123",
content="test message",
metadata={"thread_id": "99999"},
)
@ -1029,7 +1026,7 @@ async def test_thread_fallback_only_fires_once():
# Send a long message that gets split into chunks
long_msg = "A" * 5000 # Exceeds Telegram's 4096 limit
result = await adapter.send(
chat_id="123",
chat_id="-100123",
content=long_msg,
metadata={"thread_id": "99999"},
)