From 5cde0614e894c73400bc7e4fe9df1fe523a2e547 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Sat, 16 May 2026 19:39:48 +0000 Subject: [PATCH] fix: auto-create Telegram DM topics for delivery --- gateway/delivery.py | 50 +++++++-- gateway/platforms/telegram.py | 101 +++++++++++++++--- tests/gateway/test_delivery.py | 29 +++++ tests/gateway/test_dm_topics.py | 22 ++++ .../gateway/test_telegram_thread_fallback.py | 27 +++++ 5 files changed, 205 insertions(+), 24 deletions(-) diff --git a/gateway/delivery.py b/gateway/delivery.py index 16b4195b948..88701cd61d5 100644 --- a/gateway/delivery.py +++ b/gateway/delivery.py @@ -34,6 +34,16 @@ def _looks_like_telegram_private_chat_id(chat_id: Optional[str]) -> bool: return False +def _looks_like_int(value: Optional[str]) -> bool: + if value is None: + return False + try: + int(value) + return True + except (TypeError, ValueError): + return False + + @dataclass class DeliveryTarget: """ @@ -263,30 +273,50 @@ class DeliveryRouter: "direct_messages_topic_id" in send_metadata or "telegram_direct_messages_topic_id" in send_metadata ) - if ( + target_thread_id = target.thread_id + is_named_telegram_private_topic = ( + target.platform == Platform.TELEGRAM + and _looks_like_telegram_private_chat_id(target.chat_id) + and not _looks_like_int(target_thread_id) + and "thread_id" not in send_metadata + and "message_thread_id" not in send_metadata + and not has_explicit_direct_topic + ) + if is_named_telegram_private_topic: + ensure_dm_topic = getattr(adapter, "ensure_dm_topic", None) + if ensure_dm_topic is None: + raise RuntimeError( + "Telegram adapter cannot create named private DM topics" + ) + created_thread_id = await ensure_dm_topic(target.chat_id, target_thread_id) + if not created_thread_id: + raise RuntimeError( + f"Failed to create Telegram private DM topic '{target_thread_id}'" + ) + target_thread_id = str(created_thread_id) + send_metadata["thread_id"] = target_thread_id + send_metadata["telegram_dm_topic_created_for_send"] = True + elif ( target.platform == Platform.TELEGRAM and _looks_like_telegram_private_chat_id(target.chat_id) and "thread_id" not in send_metadata and "message_thread_id" not in send_metadata and not has_explicit_direct_topic ): - # Telegram has two similar-but-not-equivalent private topic modes: - # true Bot API Direct Messages topics use direct_messages_topic_id, - # while Hermes-created private DM lanes only route reliably with - # message_thread_id plus a reply anchor to a message in that lane. - # DeliveryRouter often handles proactive/cron sends, so an anchor - # may not exist. Refuse the send rather than reporting success for - # a message that lands in General/All Messages or is invisible. + # Legacy private topic/thread ids that were not created by this + # send path may still need a reply anchor to stay visible in the + # requested lane. Named targets are created above via + # createForumTopic and can use message_thread_id directly. reply_anchor = send_metadata.get("telegram_reply_to_message_id") if reply_anchor is None: raise RuntimeError( "Telegram private DM topic delivery requires telegram_reply_to_message_id; " "send to the bare chat or provide a reply anchor" ) - send_metadata["thread_id"] = target.thread_id + send_metadata["thread_id"] = target_thread_id send_metadata["telegram_dm_topic_reply_fallback"] = True elif "thread_id" not in send_metadata and "message_thread_id" not in send_metadata and not has_explicit_direct_topic: - send_metadata["thread_id"] = target.thread_id + send_metadata["thread_id"] = target_thread_id result = await adapter.send(target.chat_id, content, metadata=send_metadata or None) if getattr(result, "success", True) is False: raise RuntimeError(getattr(result, "error", None) or f"{target.platform.value} delivery failed") diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 35d72192a26..25bbdc4544f 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -547,6 +547,8 @@ class TelegramAdapter(BasePlatformAdapter): ) -> bool: if cls._metadata_direct_messages_topic_id(metadata) is not None: return False + if metadata and metadata.get("telegram_dm_topic_created_for_send"): + return False return bool( thread_id and ( @@ -1029,6 +1031,59 @@ class TelegramAdapter(BasePlatformAdapter): thread_id = await self._create_dm_topic(chat_id_int, name=name) return str(thread_id) if thread_id else None + async def ensure_dm_topic(self, chat_id: str, topic_name: str) -> Optional[str]: + """Return a private DM topic thread id, creating and persisting it if needed.""" + name = str(topic_name or "").strip() + if not name: + return None + try: + chat_id_int = int(chat_id) + except (TypeError, ValueError): + return None + + cache_key = f"{chat_id_int}:{name}" + cached = self._dm_topics.get(cache_key) + if cached: + return str(cached) + + topic_conf: Optional[Dict[str, Any]] = None + chat_entry: Optional[Dict[str, Any]] = None + for entry in self._dm_topics_config: + if str(entry.get("chat_id")) != str(chat_id_int): + continue + chat_entry = entry + for candidate in entry.get("topics", []): + if candidate.get("name") == name: + topic_conf = candidate + break + break + + if topic_conf and topic_conf.get("thread_id"): + thread_id = int(topic_conf["thread_id"]) + self._dm_topics[cache_key] = thread_id + return str(thread_id) + + if chat_entry is None: + chat_entry = {"chat_id": chat_id_int, "topics": []} + self._dm_topics_config.append(chat_entry) + if topic_conf is None: + topic_conf = {"name": name} + chat_entry.setdefault("topics", []).append(topic_conf) + + thread_id = await self._create_dm_topic( + chat_id_int, + name=name, + icon_color=topic_conf.get("icon_color"), + icon_custom_emoji_id=topic_conf.get("icon_custom_emoji_id"), + ) + if not thread_id: + return None + + topic_conf["thread_id"] = thread_id + self._dm_topics[cache_key] = int(thread_id) + self._persist_dm_topic_thread_id(chat_id_int, name, int(thread_id)) + return str(thread_id) + async def rename_dm_topic( self, chat_id: int, @@ -1065,25 +1120,43 @@ class TelegramAdapter(BasePlatformAdapter): with open(config_path, "r", encoding="utf-8") as f: config = _yaml.safe_load(f) or {} - # Navigate to platforms.telegram.extra.dm_topics - dm_topics = ( - config.get("platforms", {}) - .get("telegram", {}) - .get("extra", {}) - .get("dm_topics", []) - ) - if not dm_topics: - return + # Navigate to platforms.telegram.extra.dm_topics, creating the path + # when a named delivery target asks us to create a topic that was + # not predeclared in config.yaml. + platforms = config.setdefault("platforms", {}) + telegram_config = platforms.setdefault("telegram", {}) + extra = telegram_config.setdefault("extra", {}) + dm_topics = extra.setdefault("dm_topics", []) changed = False + matching_chat_entry = None for chat_entry in dm_topics: - if int(chat_entry.get("chat_id", 0)) != int(chat_id): + try: + chat_matches = int(chat_entry.get("chat_id", 0)) == int(chat_id) + except (TypeError, ValueError): + chat_matches = False + if not chat_matches: continue - for t in chat_entry.get("topics", []): - if t.get("name") == topic_name and not t.get("thread_id"): - t["thread_id"] = thread_id - changed = True + matching_chat_entry = chat_entry + for t in chat_entry.setdefault("topics", []): + if t.get("name") == topic_name: + if not t.get("thread_id"): + t["thread_id"] = thread_id + changed = True break + else: + chat_entry.setdefault("topics", []).append( + {"name": topic_name, "thread_id": thread_id} + ) + changed = True + break + + if matching_chat_entry is None: + dm_topics.append({ + "chat_id": chat_id, + "topics": [{"name": topic_name, "thread_id": thread_id}], + }) + changed = True if changed: fd, tmp_path = tempfile.mkstemp( diff --git a/tests/gateway/test_delivery.py b/tests/gateway/test_delivery.py index ae959161c9b..69a62fb4330 100644 --- a/tests/gateway/test_delivery.py +++ b/tests/gateway/test_delivery.py @@ -128,11 +128,16 @@ class TestPlatformNameCaseInsensitivity: class RecordingAdapter: def __init__(self): self.calls = [] + self.ensure_dm_topic_calls = [] async def send(self, chat_id, content, metadata=None): self.calls.append({"chat_id": chat_id, "content": content, "metadata": metadata}) return {"success": True} + async def ensure_dm_topic(self, chat_id, topic_name): + self.ensure_dm_topic_calls.append({"chat_id": chat_id, "topic_name": topic_name}) + return "38049" + @pytest.mark.asyncio async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path, monkeypatch): @@ -147,6 +152,30 @@ async def test_explicit_telegram_private_thread_requires_reply_anchor(tmp_path, assert adapter.calls == [] +@pytest.mark.asyncio +async def test_named_telegram_private_topic_is_created_before_delivery(tmp_path, monkeypatch): + monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path) + adapter = RecordingAdapter() + router = DeliveryRouter(GatewayConfig(), adapters={Platform.TELEGRAM: adapter}) + target = DeliveryTarget.parse("telegram:722341991:Hermes API Test") + + await router._deliver_to_platform(target, "hello", metadata=None) + + assert adapter.ensure_dm_topic_calls == [ + {"chat_id": "722341991", "topic_name": "Hermes API Test"} + ] + assert adapter.calls == [ + { + "chat_id": "722341991", + "content": "hello", + "metadata": { + "thread_id": "38049", + "telegram_dm_topic_created_for_send": True, + }, + } + ] + + @pytest.mark.asyncio async def test_explicit_telegram_private_thread_uses_reply_fallback_with_anchor(tmp_path, monkeypatch): monkeypatch.setattr("gateway.delivery.get_hermes_home", lambda: tmp_path) diff --git a/tests/gateway/test_dm_topics.py b/tests/gateway/test_dm_topics.py index 1d1cf365e0e..971dc3da6d7 100644 --- a/tests/gateway/test_dm_topics.py +++ b/tests/gateway/test_dm_topics.py @@ -198,6 +198,28 @@ async def test_create_dm_topic_returns_none_without_bot(): assert result is None +@pytest.mark.asyncio +async def test_ensure_dm_topic_creates_on_demand_and_persists(): + """Named delivery targets should create missing private DM topics on demand.""" + adapter = _make_adapter() + adapter._bot = AsyncMock() + adapter._bot.create_forum_topic.return_value = SimpleNamespace(message_thread_id=444) + adapter._persist_dm_topic_thread_id = MagicMock() + + result = await adapter.ensure_dm_topic("111", "On Demand") + + assert result == "444" + adapter._bot.create_forum_topic.assert_called_once_with( + chat_id=111, + name="On Demand", + ) + assert adapter._dm_topics["111:On Demand"] == 444 + assert adapter._dm_topics_config == [ + {"chat_id": 111, "topics": [{"name": "On Demand", "thread_id": 444}]} + ] + adapter._persist_dm_topic_thread_id.assert_called_once_with(111, "On Demand", 444) + + # ── _persist_dm_topic_thread_id ── diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 1c6e7cab338..248c9f3b02c 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -439,6 +439,33 @@ async def test_send_uses_reply_fallback_for_hermes_dm_topics(): assert "direct_messages_topic_id" not in call_log[0] +@pytest.mark.asyncio +async def test_send_created_private_topic_uses_message_thread_without_anchor(): + """Topics created via createForumTopic are addressable by message_thread_id directly.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(kwargs) + return SimpleNamespace(message_id=781) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="created topic message", + metadata={ + "thread_id": "38049", + "telegram_dm_topic_created_for_send": True, + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] is None + assert call_log[0]["message_thread_id"] == 38049 + assert "direct_messages_topic_id" not in call_log[0] + + @pytest.mark.asyncio async def test_send_uses_metadata_reply_fallback_for_streaming_dm_topics(): """Metadata-only sends still stay in Hermes-created Telegram DM topics."""