fix: auto-create Telegram DM topics for delivery

This commit is contained in:
Hermes Agent 2026-05-16 19:39:48 +00:00
parent 6daafb3fd4
commit 5cde0614e8
5 changed files with 205 additions and 24 deletions

View file

@ -34,6 +34,16 @@ def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool:
return False
def _looks_like_int(value: Optional[str]) -> bool:
if value is None:
return False
try:
int(value)
return True
except (TypeError, ValueError):
return False
@dataclass
class DeliveryTarget:
"""
@ -263,30 +273,50 @@ class DeliveryRouter:
"direct_messages_topic_id" in send_metadata
or "telegram_direct_messages_topic_id" in send_metadata
)
if (
target_thread_id = target.thread_id
is_named_telegram_private_topic = (
target.platform == Platform.TELEGRAM
and _looks_like_telegram_private_chat_id(target.chat_id)
and not _looks_like_int(target_thread_id)
and "thread_id" not in send_metadata
and "message_thread_id" not in send_metadata
and not has_explicit_direct_topic
)
if is_named_telegram_private_topic:
ensure_dm_topic = getattr(adapter, "ensure_dm_topic", None)
if ensure_dm_topic is None:
raise RuntimeError(
"Telegram adapter cannot create named private DM topics"
)
created_thread_id = await ensure_dm_topic(target.chat_id, target_thread_id)
if not created_thread_id:
raise RuntimeError(
f"Failed to create Telegram private DM topic '{target_thread_id}'"
)
target_thread_id = str(created_thread_id)
send_metadata["thread_id"] = target_thread_id
send_metadata["telegram_dm_topic_created_for_send"] = True
elif (
target.platform == Platform.TELEGRAM
and _looks_like_telegram_private_chat_id(target.chat_id)
and "thread_id" not in send_metadata
and "message_thread_id" not in send_metadata
and not has_explicit_direct_topic
):
# Telegram has two similar-but-not-equivalent private topic modes:
# true Bot API Direct Messages topics use direct_messages_topic_id,
# while Hermes-created private DM lanes only route reliably with
# message_thread_id plus a reply anchor to a message in that lane.
# DeliveryRouter often handles proactive/cron sends, so an anchor
# may not exist. Refuse the send rather than reporting success for
# a message that lands in General/All Messages or is invisible.
# Legacy private topic/thread ids that were not created by this
# send path may still need a reply anchor to stay visible in the
# requested lane. Named targets are created above via
# createForumTopic and can use message_thread_id directly.
reply_anchor = send_metadata.get("telegram_reply_to_message_id")
if reply_anchor is None:
raise RuntimeError(
"Telegram private DM topic delivery requires telegram_reply_to_message_id; "
"send to the bare chat or provide a reply anchor"
)
send_metadata["thread_id"] = target.thread_id
send_metadata["thread_id"] = target_thread_id
send_metadata["telegram_dm_topic_reply_fallback"] = True
elif "thread_id" not in send_metadata and "message_thread_id" not in send_metadata and not has_explicit_direct_topic:
send_metadata["thread_id"] = target.thread_id
send_metadata["thread_id"] = target_thread_id
result = await adapter.send(target.chat_id, content, metadata=send_metadata or None)
if getattr(result, "success", True) is False:
raise RuntimeError(getattr(result, "error", None) or f"{target.platform.value} delivery failed")

View file

@ -547,6 +547,8 @@ class TelegramAdapter(BasePlatformAdapter):
) -> bool:
if cls._metadata_direct_messages_topic_id(metadata) is not None:
return False
if metadata and metadata.get("telegram_dm_topic_created_for_send"):
return False
return bool(
thread_id
and (
@ -1029,6 +1031,59 @@ class TelegramAdapter(BasePlatformAdapter):
thread_id = await self._create_dm_topic(chat_id_int, name=name)
return str(thread_id) if thread_id else None
async def ensure_dm_topic(self, chat_id: str, topic_name: str) -> Optional[str]:
"""Return a private DM topic thread id, creating and persisting it if needed."""
name = str(topic_name or "").strip()
if not name:
return None
try:
chat_id_int = int(chat_id)
except (TypeError, ValueError):
return None
cache_key = f"{chat_id_int}:{name}"
cached = self._dm_topics.get(cache_key)
if cached:
return str(cached)
topic_conf: Optional[Dict[str, Any]] = None
chat_entry: Optional[Dict[str, Any]] = None
for entry in self._dm_topics_config:
if str(entry.get("chat_id")) != str(chat_id_int):
continue
chat_entry = entry
for candidate in entry.get("topics", []):
if candidate.get("name") == name:
topic_conf = candidate
break
break
if topic_conf and topic_conf.get("thread_id"):
thread_id = int(topic_conf["thread_id"])
self._dm_topics[cache_key] = thread_id
return str(thread_id)
if chat_entry is None:
chat_entry = {"chat_id": chat_id_int, "topics": []}
self._dm_topics_config.append(chat_entry)
if topic_conf is None:
topic_conf = {"name": name}
chat_entry.setdefault("topics", []).append(topic_conf)
thread_id = await self._create_dm_topic(
chat_id_int,
name=name,
icon_color=topic_conf.get("icon_color"),
icon_custom_emoji_id=topic_conf.get("icon_custom_emoji_id"),
)
if not thread_id:
return None
topic_conf["thread_id"] = thread_id
self._dm_topics[cache_key] = int(thread_id)
self._persist_dm_topic_thread_id(chat_id_int, name, int(thread_id))
return str(thread_id)
async def rename_dm_topic(
self,
chat_id: int,
@ -1065,25 +1120,43 @@ class TelegramAdapter(BasePlatformAdapter):
with open(config_path, "r", encoding="utf-8") as f:
config = _yaml.safe_load(f) or {}
# Navigate to platforms.telegram.extra.dm_topics
dm_topics = (
config.get("platforms", {})
.get("telegram", {})
.get("extra", {})
.get("dm_topics", [])
)
if not dm_topics:
return
# Navigate to platforms.telegram.extra.dm_topics, creating the path
# when a named delivery target asks us to create a topic that was
# not predeclared in config.yaml.
platforms = config.setdefault("platforms", {})
telegram_config = platforms.setdefault("telegram", {})
extra = telegram_config.setdefault("extra", {})
dm_topics = extra.setdefault("dm_topics", [])
changed = False
matching_chat_entry = None
for chat_entry in dm_topics:
if int(chat_entry.get("chat_id", 0)) != int(chat_id):
try:
chat_matches = int(chat_entry.get("chat_id", 0)) == int(chat_id)
except (TypeError, ValueError):
chat_matches = False
if not chat_matches:
continue
for t in chat_entry.get("topics", []):
if t.get("name") == topic_name and not t.get("thread_id"):
t["thread_id"] = thread_id
changed = True
matching_chat_entry = chat_entry
for t in chat_entry.setdefault("topics", []):
if t.get("name") == topic_name:
if not t.get("thread_id"):
t["thread_id"] = thread_id
changed = True
break
else:
chat_entry.setdefault("topics", []).append(
{"name": topic_name, "thread_id": thread_id}
)
changed = True
break
if matching_chat_entry is None:
dm_topics.append({
"chat_id": chat_id,
"topics": [{"name": topic_name, "thread_id": thread_id}],
})
changed = True
if changed:
fd, tmp_path = tempfile.mkstemp(

View file

@ -128,11 +128,16 @@ class TestPlatformNameCaseInsensitivity:
class RecordingAdapter:
def __init__(self):
self.calls = []
self.ensure_dm_topic_calls = []
async def send(self, chat_id, content, metadata=None):
self.calls.append({"chat_id": chat_id, "content": content, "metadata": metadata})
return {"success": True}
async def ensure_dm_topic(self, chat_id, topic_name):
self.ensure_dm_topic_calls.append({"chat_id": chat_id, "topic_name": topic_name})
return "38049"
@pytest.mark.asyncio
async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path, monkeypatch):
@ -147,6 +152,30 @@ async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path,
assert adapter.calls == []
@pytest.mark.asyncio
async def test_named_telegram_private_topic_is_created_before_delivery(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)
adapter = RecordingAdapter()
router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter})
target = DeliveryTarget.parse("telegram:722341991:Hermes API Test")
await router._deliver_to_platform(target, "hello", metadata=None)
assert adapter.ensure_dm_topic_calls == [
{"chat_id": "722341991", "topic_name": "Hermes API Test"}
]
assert adapter.calls == [
{
"chat_id": "722341991",
"content": "hello",
"metadata": {
"thread_id": "38049",
"telegram_dm_topic_created_for_send": True,
},
}
]
@pytest.mark.asyncio
async def test_explicit_telegram_private_thread_uses_reply_fallback_with_anchor(tmp_path, monkeypatch):
monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path)

View file

@ -198,6 +198,28 @@ async def test_create_dm_topic_returns_none_without_bot():
assert result is None
@pytest.mark.asyncio
async def test_ensure_dm_topic_creates_on_demand_and_persists():
"""Named delivery targets should create missing private DM topics on demand."""
adapter = _make_adapter()
adapter._bot = AsyncMock()
adapter._bot.create_forum_topic.return_value = SimpleNamespace(message_thread_id=444)
adapter._persist_dm_topic_thread_id = MagicMock()
result = await adapter.ensure_dm_topic("111", "On Demand")
assert result == "444"
adapter._bot.create_forum_topic.assert_called_once_with(
chat_id=111,
name="On Demand",
)
assert adapter._dm_topics["111:On Demand"] == 444
assert adapter._dm_topics_config == [
{"chat_id": 111, "topics": [{"name": "On Demand", "thread_id": 444}]}
]
adapter._persist_dm_topic_thread_id.assert_called_once_with(111, "On Demand", 444)
# ── _persist_dm_topic_thread_id ──

View file

@ -439,6 +439,33 @@ async def test_send_uses_reply_fallback_for_hermes_dm_topics():
assert "direct_messages_topic_id" not in call_log[0]
@pytest.mark.asyncio
async def test_send_created_private_topic_uses_message_thread_without_anchor():
"""Topics created via createForumTopic are addressable by message_thread_id directly."""
adapter = _make_adapter()
call_log = []
async def mock_send_message(**kwargs):
call_log.append(kwargs)
return SimpleNamespace(message_id=781)
adapter._bot = SimpleNamespace(send_message=mock_send_message)
result = await adapter.send(
chat_id="123",
content="created topic message",
metadata={
"thread_id": "38049",
"telegram_dm_topic_created_for_send": True,
},
)
assert result.success is True
assert call_log[0]["reply_to_message_id"] is None
assert call_log[0]["message_thread_id"] == 38049
assert "direct_messages_topic_id" not in call_log[0]
@pytest.mark.asyncio
async def test_send_uses_metadata_reply_fallback_for_streaming_dm_topics():
"""Metadata-only sends still stay in Hermes-created Telegram DM topics."""