From b3239572f0e85187aed153cd2e2ef24e4f6abede Mon Sep 17 00:00:00 2001 From: Jhin Lee Date: Fri, 8 May 2026 21:33:20 -0400 Subject: [PATCH] fix(telegram): preserve DM topic routing via reply fallback --- gateway/platforms/base.py | 90 ++- gateway/platforms/telegram.py | 492 ++++++++++--- gateway/run.py | 111 ++- tests/gateway/test_background_command.py | 83 +++ .../gateway/test_telegram_thread_fallback.py | 676 +++++++++++++++++- tests/gateway/test_voice_command.py | 31 + 6 files changed, 1331 insertions(+), 152 deletions(-) diff --git a/gateway/platforms/base.py b/gateway/platforms/base.py index 3e8c1433e6b..90888d7b3d2 100644 --- a/gateway/platforms/base.py +++ b/gateway/platforms/base.py @@ -40,6 +40,52 @@ def _platform_name(platform) -> str: return str(value or "").lower() +def _thread_metadata_for_source(source, reply_to_message_id: str | None = None) -> dict | None: + """Build platform-aware thread metadata for adapter sends. + + Most platforms route threaded sends with a generic ``thread_id`` metadata + value. Telegram private-chat topics created through Hermes' DM-topic helper + are exposed in updates as ``message_thread_id`` plus a reply anchor, but + outbound sends only render in the correct Telegram lane when the adapter + supplies both ``message_thread_id`` and ``reply_to_message_id``. Mark those + lanes so the Telegram adapter can avoid the known-bad partial routes. + """ + thread_id = getattr(source, "thread_id", None) + if thread_id is None: + return None + metadata = {"thread_id": thread_id} + if _platform_name(getattr(source, "platform", None)) == "telegram" and getattr(source, "chat_type", None) == "dm": + metadata["telegram_dm_topic_reply_fallback"] = True + anchor = reply_to_message_id or getattr(source, "message_id", None) + if anchor is not None: + metadata["telegram_reply_to_message_id"] = str(anchor) + return metadata + + +def _reply_anchor_for_event(event) -> str | None: + """Return reply_to id for platforms that need reply semantics. + + Telegram forum/supergroup topics should be routed by topic metadata, not by + replying to the triggering message. Hermes-created Telegram private-chat + topic lanes are different: Bot API sends reject their ``message_thread_id`` + and do not route with ``direct_messages_topic_id``. Those lanes only remain + visible when sent with both the private topic thread id and a reply to the + triggering user message. + """ + source = getattr(event, "source", None) + platform = _platform_name(getattr(source, "platform", None)) + thread_id = getattr(source, "thread_id", None) + if platform == "telegram" and thread_id and getattr(source, "chat_type", None) == "dm": + # Reply to the triggering user message. Replying to Telegram's earlier + # topic seed/anchor can render the bot response outside the active lane. + return getattr(event, "message_id", None) or getattr(event, "reply_to_message_id", None) + if platform == "telegram" and thread_id: + return None + if platform == "feishu" and thread_id and getattr(event, "reply_to_message_id", None): + return getattr(event, "reply_to_message_id", None) + return getattr(event, "message_id", None) + + def should_send_media_as_audio(platform, ext: str, is_voice: bool = False) -> bool: """Return True when a media file should use the platform's audio sender. @@ -1719,7 +1765,7 @@ class BasePlatformAdapter(ABC): """ # Fallback: send URL as text (subclasses override for native images) text = f"{caption}\n{image_url}" if caption else image_url - return await self.send(chat_id=chat_id, content=text, reply_to=reply_to) + return await self.send(chat_id=chat_id, content=text, reply_to=reply_to, metadata=metadata) async def send_animation( self, @@ -1798,6 +1844,7 @@ class BasePlatformAdapter(ABC): audio_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, **kwargs, ) -> SendResult: """ @@ -1810,7 +1857,7 @@ class BasePlatformAdapter(ABC): text = f"🔊 Audio: {audio_path}" if caption: text = f"{caption}\n{text}" - return await self.send(chat_id=chat_id, content=text, reply_to=reply_to) + return await self.send(chat_id=chat_id, content=text, reply_to=reply_to, metadata=metadata) async def play_tts( self, @@ -1832,6 +1879,7 @@ class BasePlatformAdapter(ABC): video_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, **kwargs, ) -> SendResult: """ @@ -1843,7 +1891,7 @@ class BasePlatformAdapter(ABC): text = f"🎬 Video: {video_path}" if caption: text = f"{caption}\n{text}" - return await self.send(chat_id=chat_id, content=text, reply_to=reply_to) + return await self.send(chat_id=chat_id, content=text, reply_to=reply_to, metadata=metadata) async def send_document( self, @@ -1852,6 +1900,7 @@ class BasePlatformAdapter(ABC): caption: Optional[str] = None, file_name: Optional[str] = None, reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, **kwargs, ) -> SendResult: """ @@ -1863,7 +1912,7 @@ class BasePlatformAdapter(ABC): text = f"📎 File: {file_path}" if caption: text = f"{caption}\n{text}" - return await self.send(chat_id=chat_id, content=text, reply_to=reply_to) + return await self.send(chat_id=chat_id, content=text, reply_to=reply_to, metadata=metadata) async def send_image_file( self, @@ -1871,6 +1920,7 @@ class BasePlatformAdapter(ABC): image_path: str, caption: Optional[str] = None, reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, **kwargs, ) -> SendResult: """ @@ -1883,7 +1933,7 @@ class BasePlatformAdapter(ABC): text = f"🖼️ Image: {image_path}" if caption: text = f"{caption}\n{text}" - return await self.send(chat_id=chat_id, content=text, reply_to=reply_to) + return await self.send(chat_id=chat_id, content=text, reply_to=reply_to, metadata=metadata) @staticmethod def extract_media(content: str) -> Tuple[List[Tuple[str, bool]], str]: @@ -2558,7 +2608,7 @@ class BasePlatformAdapter(ABC): current_guard = self._active_sessions.get(session_key) command_guard = asyncio.Event() self._active_sessions[session_key] = command_guard - thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None + thread_meta = _thread_metadata_for_source(event.source, _reply_anchor_for_event(event)) try: response = await self._message_handler(event) @@ -2579,13 +2629,7 @@ class BasePlatformAdapter(ABC): _r = await self._send_with_retry( chat_id=event.source.chat_id, content=_text, - reply_to=( - event.reply_to_message_id - if event.source.platform == Platform.FEISHU - and event.source.thread_id - and event.reply_to_message_id - else event.message_id - ), + reply_to=_reply_anchor_for_event(event), metadata=thread_meta, ) if _eph_ttl > 0 and _r.success and _r.message_id: @@ -2678,20 +2722,14 @@ class BasePlatformAdapter(ABC): self.name, cmd, session_key, ) try: - _thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None + _thread_meta = _thread_metadata_for_source(event.source, _reply_anchor_for_event(event)) response = await self._message_handler(event) _text, _eph_ttl = self._unwrap_ephemeral(response) if _text: _r = await self._send_with_retry( chat_id=event.source.chat_id, content=_text, - reply_to=( - event.reply_to_message_id - if event.source.platform == Platform.FEISHU - and event.source.thread_id - and event.reply_to_message_id - else event.message_id - ), + reply_to=_reply_anchor_for_event(event), metadata=_thread_meta, ) if _eph_ttl > 0 and _r.success and _r.message_id: @@ -2783,7 +2821,7 @@ class BasePlatformAdapter(ABC): self._active_sessions[session_key] = interrupt_event # Start continuous typing indicator (refreshes every 2 seconds) - _thread_metadata = {"thread_id": event.source.thread_id} if event.source.thread_id else None + _thread_metadata = _thread_metadata_for_source(event.source, _reply_anchor_for_event(event)) _keep_typing_kwargs = {"metadata": _thread_metadata} try: _keep_typing_sig = inspect.signature(self._keep_typing) @@ -2911,11 +2949,7 @@ class BasePlatformAdapter(ABC): # Send the text portion if text_content: logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id) - _reply_anchor = ( - event.reply_to_message_id - if event.source.platform == Platform.FEISHU and event.source.thread_id and event.reply_to_message_id - else event.message_id - ) + _reply_anchor = _reply_anchor_for_event(event) result = await self._send_with_retry( chat_id=event.source.chat_id, content=text_content, @@ -3108,7 +3142,7 @@ class BasePlatformAdapter(ABC): try: error_type = type(e).__name__ error_detail = str(e)[:300] if str(e) else "no details available" - _thread_metadata = {"thread_id": event.source.thread_id} if event.source.thread_id else None + _thread_metadata = _thread_metadata_for_source(event.source, _reply_anchor_for_event(event)) await self.send( chat_id=event.source.chat_id, content=( diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index e4bba209b82..f692aa4fd16 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -361,6 +361,63 @@ class TelegramAdapter(BasePlatformAdapter): thread_id = metadata.get("thread_id") or metadata.get("message_thread_id") return str(thread_id) if thread_id is not None else None + @classmethod + def _metadata_direct_messages_topic_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]: + if not metadata: + return None + topic_id = metadata.get("direct_messages_topic_id") or metadata.get("telegram_direct_messages_topic_id") + return str(topic_id) if topic_id is not None else None + + @classmethod + def _metadata_reply_to_message_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[int]: + if not metadata: + return None + reply_to = metadata.get("telegram_reply_to_message_id") + return int(reply_to) if reply_to is not None else None + + @classmethod + def _reply_to_message_id_for_send( + cls, + reply_to: Optional[str], + metadata: Optional[Dict[str, Any]] = None, + ) -> Optional[int]: + if reply_to: + return int(reply_to) + if metadata and metadata.get("telegram_dm_topic_reply_fallback"): + return cls._metadata_reply_to_message_id(metadata) + return None + + @classmethod + def _thread_kwargs_for_send( + cls, + chat_id: str, + thread_id: Optional[str], + metadata: Optional[Dict[str, Any]] = None, + reply_to_message_id: Optional[int] = None, + ) -> Dict[str, Any]: + """Return Telegram send kwargs for forum and direct-message topic routing. + + Supergroup/forum topics use ``message_thread_id``. True Bot API Direct + Messages topics can opt in with explicit ``direct_messages_topic_id`` + metadata. Hermes-created private-chat topic lanes are marked with + ``telegram_dm_topic_reply_fallback`` and must send the private topic + thread id together with a reply anchor. Live testing showed that either + parameter alone can render outside the visible lane. + """ + if metadata and metadata.get("telegram_dm_topic_reply_fallback"): + if reply_to_message_id is None: + reply_to_message_id = cls._metadata_reply_to_message_id(metadata) + if reply_to_message_id is None: + return {} + return {"message_thread_id": cls._message_thread_id_for_send(thread_id)} + direct_topic_id = cls._metadata_direct_messages_topic_id(metadata) + if direct_topic_id is not None: + return { + "message_thread_id": None, + "direct_messages_topic_id": int(direct_topic_id), + } + return {"message_thread_id": cls._message_thread_id_for_send(thread_id)} + @classmethod def _message_thread_id_for_send(cls, thread_id: Optional[str]) -> Optional[int]: if not thread_id or str(thread_id) == cls._GENERAL_TOPIC_THREAD_ID: @@ -384,6 +441,65 @@ class TelegramAdapter(BasePlatformAdapter): def _is_thread_not_found_error(error: Exception) -> bool: return "thread not found" in str(error).lower() + @staticmethod + def _is_bad_request_error(error: Exception) -> bool: + name = error.__class__.__name__.lower() + if name == "badrequest" or name.endswith("badrequest"): + return True + try: + from telegram.error import BadRequest + return isinstance(error, BadRequest) + except ImportError: + return False + + @classmethod + def _should_retry_without_dm_topic_reply_anchor( + cls, + error: Exception, + metadata: Optional[Dict[str, Any]], + reply_to_message_id: Optional[int], + ) -> bool: + return ( + bool(metadata and metadata.get("telegram_dm_topic_reply_fallback")) + and reply_to_message_id is not None + and cls._is_bad_request_error(error) + and "message to be replied not found" in str(error).lower() + ) + + async def _send_with_dm_topic_reply_anchor_retry( + self, + send_fn: Any, + send_kwargs: Dict[str, Any], + metadata: Optional[Dict[str, Any]], + reply_to_message_id: Optional[int], + media_label: str, + reset_media: Optional[Any] = None, + ) -> Any: + """Retry stale private-topic media replies once without the topic anchor.""" + try: + return await send_fn(**send_kwargs) + except Exception as send_err: + if not self._should_retry_without_dm_topic_reply_anchor( + send_err, + metadata, + reply_to_message_id, + ): + raise + logger.warning( + "[%s] Reply target deleted for Telegram %s, " + "retrying without reply/topic anchor: %s", + self.name, + media_label, + send_err, + ) + if reset_media is not None: + reset_media() + retry_kwargs = dict(send_kwargs) + retry_kwargs["reply_to_message_id"] = None + retry_kwargs.pop("message_thread_id", None) + retry_kwargs.pop("direct_messages_topic_id", None) + return await send_fn(**retry_kwargs) + def _fallback_ips(self) -> list[str]: """Return validated fallback IPs from config (populated by _apply_env_overrides).""" configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else [] @@ -1254,9 +1370,23 @@ class TelegramAdapter(BasePlatformAdapter): _TimedOut = None # type: ignore[assignment,misc] for i, chunk in enumerate(chunks): - should_thread = self._should_thread_reply(reply_to, i) - reply_to_id = int(reply_to) if should_thread else None - effective_thread_id = self._message_thread_id_for_send(thread_id) + metadata_reply_to = self._metadata_reply_to_message_id(metadata) + reply_to_source = reply_to or ( + str(metadata_reply_to) + if metadata and metadata.get("telegram_dm_topic_reply_fallback") and metadata_reply_to is not None else None + ) + if metadata and metadata.get("telegram_dm_topic_reply_fallback"): + should_thread = reply_to_source is not None + else: + should_thread = self._should_thread_reply(reply_to_source, i) + reply_to_id = int(reply_to_source) if should_thread and reply_to_source else None + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ) + effective_thread_id = thread_kwargs.get("message_thread_id") msg = None for _send_attempt in range(3): @@ -1268,7 +1398,7 @@ class TelegramAdapter(BasePlatformAdapter): text=chunk, parse_mode=ParseMode.MARKDOWN_V2, reply_to_message_id=reply_to_id, - message_thread_id=effective_thread_id, + **thread_kwargs, **self._link_preview_kwargs(), ) except Exception as md_error: @@ -1281,7 +1411,7 @@ class TelegramAdapter(BasePlatformAdapter): text=plain_chunk, parse_mode=None, reply_to_message_id=reply_to_id, - message_thread_id=effective_thread_id, + **thread_kwargs, **self._link_preview_kwargs(), ) else: @@ -1302,17 +1432,30 @@ class TelegramAdapter(BasePlatformAdapter): self.name, effective_thread_id, ) effective_thread_id = None + thread_kwargs = {"message_thread_id": None} continue err_lower = str(send_err).lower() if "message to be replied not found" in err_lower and reply_to_id is not None: # Original message was deleted before we - # could reply — clear reply target and retry - # so the response is still delivered. + # could reply. For private-topic fallback + # sends, message_thread_id is only valid with + # the reply anchor, so drop both together. logger.warning( "[%s] Reply target deleted, retrying without reply_to: %s", self.name, send_err, ) reply_to_id = None + if metadata and metadata.get("telegram_dm_topic_reply_fallback"): + thread_kwargs = {} + effective_thread_id = None + else: + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ) + effective_thread_id = thread_kwargs.get("message_thread_id") continue # Other BadRequest errors are permanent — don't retry raise @@ -1494,13 +1637,19 @@ class TelegramAdapter(BasePlatformAdapter): ] ]) thread_id = self._metadata_thread_id(metadata) - message_thread_id = self._message_thread_id_for_send(thread_id) + reply_to_id = self._reply_to_message_id_for_send(None, metadata) msg = await self._bot.send_message( chat_id=int(chat_id), text=text, parse_mode=ParseMode.MARKDOWN, reply_markup=keyboard, - message_thread_id=message_thread_id, + reply_to_message_id=reply_to_id, + **self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ), **self._link_preview_kwargs(), ) return SendResult(success=True, message_id=str(msg.message_id)) @@ -1558,9 +1707,16 @@ class TelegramAdapter(BasePlatformAdapter): "reply_markup": keyboard, **self._link_preview_kwargs(), } - message_thread_id = self._message_thread_id_for_send(thread_id) - if message_thread_id is not None: - kwargs["message_thread_id"] = message_thread_id + reply_to_id = self._reply_to_message_id_for_send(None, metadata) + kwargs["reply_to_message_id"] = reply_to_id + kwargs.update( + self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ) + ) msg = await self._bot.send_message(**kwargs) @@ -1603,9 +1759,16 @@ class TelegramAdapter(BasePlatformAdapter): "reply_markup": keyboard, **self._link_preview_kwargs(), } - message_thread_id = self._message_thread_id_for_send(thread_id) - if message_thread_id is not None: - kwargs["message_thread_id"] = message_thread_id + reply_to_id = self._reply_to_message_id_for_send(None, metadata) + kwargs["reply_to_message_id"] = reply_to_id + kwargs.update( + self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ) + ) msg = await self._bot.send_message(**kwargs) self._slash_confirm_state[confirm_id] = session_key @@ -1664,12 +1827,19 @@ class TelegramAdapter(BasePlatformAdapter): ) thread_id = metadata.get("thread_id") if metadata else None + reply_to_id = self._reply_to_message_id_for_send(None, metadata) msg = await self._bot.send_message( chat_id=int(chat_id), text=text, parse_mode=ParseMode.MARKDOWN, reply_markup=keyboard, - message_thread_id=int(thread_id) if thread_id else None, + reply_to_message_id=reply_to_id, + **self._thread_kwargs_for_send( + chat_id, + thread_id, + metadata, + reply_to_message_id=reply_to_id, + ), **self._link_preview_kwargs(), ) @@ -2046,17 +2216,47 @@ class TelegramAdapter(BasePlatformAdapter): session_key, confirm_id, choice, ) if result_text and query.message: - # Inherit the prompt message's thread so the reply - # lands in the same supergroup topic / reply chain. + # Inherit the prompt message's topic. Supergroup forums + # use message_thread_id; Telegram private DM-topic lanes + # need both the private topic id and the prompt reply anchor. thread_id = getattr(query.message, "message_thread_id", None) + chat = getattr(query.message, "chat", None) + chat_type = getattr(chat, "type", None) + prompt_message_id = getattr(query.message, "message_id", None) send_kwargs: Dict[str, Any] = { "chat_id": int(query.message.chat_id), "text": result_text, "parse_mode": ParseMode.MARKDOWN, **self._link_preview_kwargs(), } - if thread_id is not None: - send_kwargs["message_thread_id"] = thread_id + chat_type_value = getattr(chat_type, "value", chat_type) + is_private_chat = str(chat_type_value).lower() in { + "private", + str(ChatType.PRIVATE).lower(), + str(getattr(ChatType.PRIVATE, "value", ChatType.PRIVATE)).lower(), + } + if thread_id is not None and is_private_chat and prompt_message_id is not None: + reply_to_id = int(prompt_message_id) + send_kwargs["reply_to_message_id"] = reply_to_id + send_kwargs.update( + self._thread_kwargs_for_send( + str(query.message.chat_id), + str(thread_id), + { + "thread_id": str(thread_id), + "telegram_dm_topic_reply_fallback": True, + }, + reply_to_message_id=reply_to_id, + ) + ) + elif thread_id is not None: + send_kwargs.update( + self._thread_kwargs_for_send( + str(query.message.chat_id), + str(thread_id), + {"thread_id": str(thread_id)}, + ) + ) await self._bot.send_message(**send_kwargs) except Exception as exc: logger.error("[%s] slash-confirm callback failed: %s", self.name, exc, exc_info=True) @@ -2137,22 +2337,50 @@ class TelegramAdapter(BasePlatformAdapter): # .ogg / .opus files -> send as voice (round playable bubble) if ext in (".ogg", ".opus"): _voice_thread = self._metadata_thread_id(metadata) - msg = await self._bot.send_voice( - chat_id=int(chat_id), - voice=audio_file, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_voice_thread), + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + voice_thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _voice_thread, + metadata, + reply_to_message_id=reply_to_id, + ) + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_voice, + { + "chat_id": int(chat_id), + "voice": audio_file, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **voice_thread_kwargs, + }, + metadata, + reply_to_id, + "voice", + reset_media=lambda: audio_file.seek(0), ) elif ext in (".mp3", ".m4a"): # Telegram's Bot API sendAudio only accepts MP3 / M4A. _audio_thread = self._metadata_thread_id(metadata) - msg = await self._bot.send_audio( - chat_id=int(chat_id), - audio=audio_file, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_audio_thread), + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + audio_thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _audio_thread, + metadata, + reply_to_message_id=reply_to_id, + ) + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_audio, + { + "chat_id": int(chat_id), + "audio": audio_file, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **audio_thread_kwargs, + }, + metadata, + reply_to_id, + "audio", + reset_media=lambda: audio_file.seek(0), ) else: # Formats Telegram can't play natively (.wav, .flac, ...) @@ -2172,7 +2400,7 @@ class TelegramAdapter(BasePlatformAdapter): e, exc_info=True, ) - return await super().send_voice(chat_id, audio_path, caption, reply_to) + return await super().send_voice(chat_id, audio_path, caption, reply_to, metadata=metadata) async def send_multiple_images( self, @@ -2227,7 +2455,6 @@ class TelegramAdapter(BasePlatformAdapter): from urllib.parse import unquote as _unquote _thread = self._metadata_thread_id(metadata) - _thread_id = self._message_thread_id_for_send(_thread) # Chunk into groups of 10 (Telegram's album limit) CHUNK = 10 @@ -2263,10 +2490,33 @@ class TelegramAdapter(BasePlatformAdapter): "[%s] Sending media group of %d photo(s) (chunk %d/%d)", self.name, len(media), chunk_idx + 1, len(chunks), ) - await self._bot.send_media_group( - chat_id=int(chat_id), - media=media, - message_thread_id=_thread_id, + reply_to_id = self._reply_to_message_id_for_send(None, metadata) + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _thread, + metadata, + reply_to_message_id=reply_to_id, + ) + + def _reset_opened_files() -> None: + for fh in opened_files: + try: + fh.seek(0) + except Exception: + pass + + await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_media_group, + { + "chat_id": int(chat_id), + "media": media, + "reply_to_message_id": reply_to_id, + **thread_kwargs, + }, + metadata, + reply_to_id, + "media group", + reset_media=_reset_opened_files, ) except Exception as e: logger.warning( @@ -2303,13 +2553,27 @@ class TelegramAdapter(BasePlatformAdapter): return SendResult(success=False, error=self._missing_media_path_error("Image", image_path)) _thread = self._metadata_thread_id(metadata) + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _thread, + metadata, + reply_to_message_id=reply_to_id, + ) with open(image_path, "rb") as image_file: - msg = await self._bot.send_photo( - chat_id=int(chat_id), - photo=image_file, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_thread), + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_photo, + { + "chat_id": int(chat_id), + "photo": image_file, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **thread_kwargs, + }, + metadata, + reply_to_id, + "photo", + reset_media=lambda: image_file.seek(0), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -2360,7 +2624,7 @@ class TelegramAdapter(BasePlatformAdapter): doc_err, exc_info=True, ) - return await super().send_image_file(chat_id, image_path, caption, reply_to) + return await super().send_image_file(chat_id, image_path, caption, reply_to, metadata=metadata) async def send_document( self, @@ -2382,20 +2646,34 @@ class TelegramAdapter(BasePlatformAdapter): display_name = file_name or os.path.basename(file_path) _thread = self._metadata_thread_id(metadata) + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _thread, + metadata, + reply_to_message_id=reply_to_id, + ) with open(file_path, "rb") as f: - msg = await self._bot.send_document( - chat_id=int(chat_id), - document=f, - filename=display_name, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_thread), + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_document, + { + "chat_id": int(chat_id), + "document": f, + "filename": display_name, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **thread_kwargs, + }, + metadata, + reply_to_id, + "document", + reset_media=lambda: f.seek(0), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: print(f"[{self.name}] Failed to send document: {e}") - return await super().send_document(chat_id, file_path, caption, file_name, reply_to) + return await super().send_document(chat_id, file_path, caption, file_name, reply_to, metadata=metadata) async def send_video( self, @@ -2415,18 +2693,32 @@ class TelegramAdapter(BasePlatformAdapter): return SendResult(success=False, error=self._missing_media_path_error("Video", video_path)) _thread = self._metadata_thread_id(metadata) + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _thread, + metadata, + reply_to_message_id=reply_to_id, + ) with open(video_path, "rb") as f: - msg = await self._bot.send_video( - chat_id=int(chat_id), - video=f, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_thread), + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_video, + { + "chat_id": int(chat_id), + "video": f, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **thread_kwargs, + }, + metadata, + reply_to_id, + "video", + reset_media=lambda: f.seek(0), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: print(f"[{self.name}] Failed to send video: {e}") - return await super().send_video(chat_id, video_path, caption, reply_to) + return await super().send_video(chat_id, video_path, caption, reply_to, metadata=metadata) async def send_image( self, @@ -2452,12 +2744,25 @@ class TelegramAdapter(BasePlatformAdapter): try: # Telegram can send photos directly from URLs (up to ~5MB) _photo_thread = self._metadata_thread_id(metadata) - msg = await self._bot.send_photo( - chat_id=int(chat_id), - photo=image_url, - caption=caption[:1024] if caption else None, # Telegram caption limit - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_photo_thread), + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + photo_thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _photo_thread, + metadata, + reply_to_message_id=reply_to_id, + ) + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_photo, + { + "chat_id": int(chat_id), + "photo": image_url, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **photo_thread_kwargs, + }, + metadata, + reply_to_id, + "URL photo", ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -2474,13 +2779,25 @@ class TelegramAdapter(BasePlatformAdapter): resp = await client.get(image_url) resp.raise_for_status() image_data = resp.content - - msg = await self._bot.send_photo( - chat_id=int(chat_id), - photo=image_data, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_photo_thread), + + upload_thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _photo_thread, + metadata, + reply_to_message_id=reply_to_id, + ) + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_photo, + { + "chat_id": int(chat_id), + "photo": image_data, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **upload_thread_kwargs, + }, + metadata, + reply_to_id, + "uploaded photo", ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e2: @@ -2491,7 +2808,7 @@ class TelegramAdapter(BasePlatformAdapter): exc_info=True, ) # Final fallback: send URL as text - return await super().send_image(chat_id, image_url, caption, reply_to) + return await super().send_image(chat_id, image_url, caption, reply_to, metadata=metadata) async def send_animation( self, @@ -2507,12 +2824,25 @@ class TelegramAdapter(BasePlatformAdapter): try: _anim_thread = self._metadata_thread_id(metadata) - msg = await self._bot.send_animation( - chat_id=int(chat_id), - animation=animation_url, - caption=caption[:1024] if caption else None, - reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=self._message_thread_id_for_send(_anim_thread), + reply_to_id = self._reply_to_message_id_for_send(reply_to, metadata) + animation_thread_kwargs = self._thread_kwargs_for_send( + chat_id, + _anim_thread, + metadata, + reply_to_message_id=reply_to_id, + ) + msg = await self._send_with_dm_topic_reply_anchor_retry( + self._bot.send_animation, + { + "chat_id": int(chat_id), + "animation": animation_url, + "caption": caption[:1024] if caption else None, + "reply_to_message_id": reply_to_id, + **animation_thread_kwargs, + }, + metadata, + reply_to_id, + "animation", ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -2523,7 +2853,7 @@ class TelegramAdapter(BasePlatformAdapter): exc_info=True, ) # Fallback: try as a regular photo - return await self.send_image(chat_id, animation_url, caption, reply_to) + return await self.send_image(chat_id, animation_url, caption, reply_to, metadata=metadata) async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None: """Send typing indicator.""" diff --git a/gateway/run.py b/gateway/run.py index 457dc60d757..1c89044b72f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -571,6 +571,7 @@ from gateway.platforms.base import ( EphemeralReply, MessageEvent, MessageType, + _reply_anchor_for_event, merge_pending_message_event, ) from gateway.restart import ( @@ -2406,7 +2407,8 @@ class GatewayRunner: if not adapter: return True - thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None + reply_anchor = self._reply_anchor_for_event(event) + thread_meta = self._thread_metadata_for_source(event.source, reply_anchor) if self._queue_during_drain_enabled(): self._queue_or_replace_pending_event(session_key, event) message = f"⏳ Gateway {self._status_action_gerund()} — queued for the next turn after it comes back." @@ -2416,7 +2418,13 @@ class GatewayRunner: await adapter._send_with_retry( chat_id=event.source.chat_id, content=message, - reply_to=event.message_id, + reply_to=( + reply_anchor + if event.source.platform == Platform.TELEGRAM + and event.source.chat_type == "dm" + and event.source.thread_id + else (None if event.source.platform == Platform.TELEGRAM and event.source.thread_id else event.message_id) + ), metadata=thread_meta, ) return True @@ -2553,12 +2561,19 @@ class GatewayRunner: except Exception as _onb_err: logger.debug("Failed to apply busy-input onboarding hint: %s", _onb_err) - thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None + reply_anchor = self._reply_anchor_for_event(event) + thread_meta = self._thread_metadata_for_source(event.source, reply_anchor) try: await adapter._send_with_retry( chat_id=event.source.chat_id, content=message, - reply_to=event.message_id, + reply_to=( + reply_anchor + if event.source.platform == Platform.TELEGRAM + and event.source.chat_type == "dm" + and event.source.thread_id + else (None if event.source.platform == Platform.TELEGRAM and event.source.thread_id else event.message_id) + ), metadata=thread_meta, ) except Exception as e: @@ -5063,7 +5078,7 @@ class GatewayRunner: if config and hasattr(config, "get_notice_delivery"): notice_delivery = config.get_notice_delivery(source.platform) - metadata = {"thread_id": source.thread_id} if getattr(source, "thread_id", None) else None + metadata = self._thread_metadata_for_source(source) if notice_delivery == "private" and getattr(source, "user_id", None): try: result = await adapter.send_private_notice( @@ -6158,7 +6173,7 @@ class GatewayRunner: ) if any(marker in message_text for marker in _stt_fail_markers): _stt_adapter = self.adapters.get(source.platform) - _stt_meta = {"thread_id": source.thread_id} if source.thread_id else None + _stt_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)) if _stt_adapter: try: _stt_msg = ( @@ -6679,7 +6694,7 @@ class GatewayRunner: f"{_compress_token_threshold:,}", ) - _hyg_meta = {"thread_id": source.thread_id} if source.thread_id else None + _hyg_meta = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)) try: from run_agent import AIAgent @@ -6908,7 +6923,7 @@ class GatewayRunner: session_id=session_entry.session_id, session_key=session_key, run_generation=run_generation, - event_message_id=event.message_id, + event_message_id=self._reply_anchor_for_event(event), channel_prompt=event.channel_prompt, ) @@ -7249,7 +7264,11 @@ class GatewayRunner: try: _foot_adapter = self.adapters.get(source.platform) if _foot_adapter: - await _foot_adapter.send(source.chat_id, _footer_line) + await _foot_adapter.send( + source.chat_id, + _footer_line, + metadata=self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)), + ) except Exception as _e: logger.debug("trailing footer send failed: %s", _e) return None @@ -8264,7 +8283,7 @@ class GatewayRunner: lines.append("_(session only — use `/model --global` to persist)_") return "\n".join(lines) - metadata = {"thread_id": source.thread_id} if source.thread_id else None + metadata = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)) result = await adapter.send_model_picker( chat_id=source.chat_id, providers=providers, @@ -8685,7 +8704,7 @@ class GatewayRunner: try: metadata = self._thread_metadata_for_source(source) except Exception: - metadata = {"thread_id": source.thread_id} if getattr(source, "thread_id", None) else None + metadata = None result = await adapter.send(source.chat_id, message, metadata=metadata) if result is not None and not getattr(result, "success", True): @@ -9250,13 +9269,15 @@ class GatewayRunner: and adapter.is_in_voice_channel(guild_id)): await adapter.play_in_voice_channel(guild_id, actual_path) elif adapter and hasattr(adapter, "send_voice"): + reply_anchor = self._reply_anchor_for_event(event) + thread_meta = self._thread_metadata_for_source(event.source, reply_anchor) send_kwargs: Dict[str, Any] = { "chat_id": event.source.chat_id, "audio_path": actual_path, - "reply_to": event.message_id, + "reply_to": reply_anchor, } - if event.source.thread_id: - send_kwargs["metadata"] = {"thread_id": event.source.thread_id} + if thread_meta: + send_kwargs["metadata"] = thread_meta await adapter.send_voice(**send_kwargs) except Exception as e: logger.warning("Auto voice reply failed: %s", e, exc_info=True) @@ -9293,7 +9314,7 @@ class GatewayRunner: _, cleaned = adapter.extract_images(response) local_files, _ = adapter.extract_local_files(cleaned) - _thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None + _thread_meta = self._thread_metadata_for_source(event.source, self._reply_anchor_for_event(event)) from gateway.platforms.base import should_send_media_as_audio @@ -9457,9 +9478,16 @@ class GatewayRunner: source = event.source task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{os.urandom(3).hex()}" + event_message_id = self._reply_anchor_for_event(event) + # Fire-and-forget the background task _task = asyncio.create_task( - self._run_background_task(prompt, source, task_id) + self._run_background_task( + prompt, + source, + task_id, + event_message_id=event_message_id, + ) ) self._background_tasks.add(_task) _task.add_done_callback(self._background_tasks.discard) @@ -9468,7 +9496,11 @@ class GatewayRunner: return f'🔄 Background task started: "{preview}"\nTask ID: {task_id}\nYou can keep chatting — results will appear when done.' async def _run_background_task( - self, prompt: str, source: "SessionSource", task_id: str + self, + prompt: str, + source: "SessionSource", + task_id: str, + event_message_id: Optional[str] = None, ) -> None: """Execute a background agent task and deliver the result to the chat.""" from run_agent import AIAgent @@ -9478,7 +9510,7 @@ class GatewayRunner: logger.warning("No adapter for platform %s in background task %s", source.platform, task_id) return - _thread_metadata = {"thread_id": source.thread_id} if source.thread_id else None + _thread_metadata = self._thread_metadata_for_source(source, event_message_id) try: user_config = _load_gateway_config() @@ -11293,7 +11325,7 @@ class GatewayRunner: _slash_confirm_mod.register(session_key, confirm_id, command, handler) adapter = self.adapters.get(source.platform) - metadata = self._thread_metadata_for_source(source) + metadata = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event)) used_buttons = False if adapter is not None: @@ -11333,12 +11365,30 @@ class GatewayRunner: except Exception: return {} - def _thread_metadata_for_source(self, source) -> Optional[Dict[str, Any]]: + def _thread_metadata_for_source( + self, + source, + reply_to_message_id: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: """Build the metadata dict platforms need for thread-aware replies.""" thread_id = getattr(source, "thread_id", None) if thread_id is None: return None - return {"thread_id": thread_id} + metadata: Dict[str, Any] = {"thread_id": thread_id} + if ( + getattr(source, "platform", None) == Platform.TELEGRAM + and getattr(source, "chat_type", None) == "dm" + ): + metadata["telegram_dm_topic_reply_fallback"] = True + anchor = reply_to_message_id or getattr(source, "message_id", None) + if anchor is not None: + metadata["telegram_reply_to_message_id"] = str(anchor) + return metadata + + @staticmethod + def _reply_anchor_for_event(event: MessageEvent) -> Optional[str]: + """Return the platform-specific reply anchor for GatewayRunner sends.""" + return _reply_anchor_for_event(event) # ------------------------------------------------------------------ @@ -13131,10 +13181,7 @@ class GatewayRunner: else bool(_plat_streaming) ) - if source.thread_id: - _thread_metadata: Optional[Dict[str, Any]] = {"thread_id": source.thread_id} - else: - _thread_metadata = None + _thread_metadata: Optional[Dict[str, Any]] = self._thread_metadata_for_source(source, event_message_id) if _streaming_enabled: try: @@ -13564,8 +13611,8 @@ class GatewayRunner: # # Threading metadata is platform-specific: # - Slack DM threading needs event_message_id fallback (reply thread) - # - Telegram uses message_thread_id only for forum topics; passing a - # normal DM/group message id as thread_id causes send failures + # - Telegram forum topics use message_thread_id; Hermes-created private + # DM topic lanes require both thread metadata and a reply anchor # - Feishu only honors reply_in_thread when sending a reply, so topic # progress uses the triggering event message as the reply target # - Other platforms should use explicit source.thread_id only @@ -13573,7 +13620,11 @@ class GatewayRunner: _progress_thread_id = source.thread_id or event_message_id else: _progress_thread_id = source.thread_id - _progress_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None + _progress_metadata = ( + self._thread_metadata_for_source(source, event_message_id) + if _progress_thread_id == source.thread_id + else {"thread_id": _progress_thread_id} + ) if _progress_thread_id else None _progress_reply_to = ( event_message_id if source.platform == Platform.FEISHU and source.thread_id and event_message_id @@ -13833,7 +13884,7 @@ class GatewayRunner: "reply_to_message_id": event_message_id, } else: - _status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None + _status_thread_metadata = self._thread_metadata_for_source(source, event_message_id) if _progress_thread_id else None def _status_callback_sync(event_type: str, message: str) -> None: if not _status_adapter or not _run_still_current(): @@ -15118,7 +15169,7 @@ class GatewayRunner: ) if next_message is None: return result - next_message_id = getattr(pending_event, "message_id", None) + next_message_id = self._reply_anchor_for_event(pending_event) next_channel_prompt = getattr(pending_event, "channel_prompt", None) # Restart typing indicator so the user sees activity while diff --git a/tests/gateway/test_background_command.py b/tests/gateway/test_background_command.py index 559c04ea79b..9c156960c70 100644 --- a/tests/gateway/test_background_command.py +++ b/tests/gateway/test_background_command.py @@ -108,6 +108,38 @@ class TestHandleBackgroundCommand: assert "Summarize the top HN stories" in result assert len(created_tasks) == 1 # background task was created + @pytest.mark.asyncio + async def test_telegram_dm_topic_passes_trigger_anchor_to_task(self): + """Telegram private-topic completion sends need the original command message id.""" + runner = _make_runner() + runner._run_background_task = AsyncMock() + + def capture_task(coro, *args, **kwargs): + coro.close() + mock_task = MagicMock() + return mock_task + + source = SessionSource( + platform=Platform.TELEGRAM, + user_id="12345", + chat_id="67890", + chat_type="dm", + thread_id="20197", + ) + event = MessageEvent( + text="/background summarize", + source=source, + message_id="463", + reply_to_message_id="462", + ) + + with patch("gateway.run.asyncio.create_task", side_effect=capture_task): + result = await runner._handle_background_command(event) + + assert "Background task started" in result + runner._run_background_task.assert_called_once() + assert runner._run_background_task.call_args.kwargs["event_message_id"] == "463" + @pytest.mark.asyncio async def test_prompt_truncated_in_preview(self): """Long prompts are truncated to 60 chars in the confirmation message.""" @@ -236,6 +268,57 @@ class TestRunBackgroundTask: mock_agent_instance.shutdown_memory_provider.assert_called_once() mock_agent_instance.close.assert_called_once() + @pytest.mark.asyncio + async def test_telegram_dm_topic_completion_preserves_reply_anchor_metadata(self, monkeypatch): + """Background completion metadata must let Telegram send thread id plus reply id.""" + from gateway import run as gateway_run + + runner = _make_runner() + runner._resolve_session_agent_runtime = MagicMock( + return_value=("test-model", {"api_key": "test-key"}) + ) + runner._resolve_session_reasoning_config = MagicMock(return_value=None) + runner._load_service_tier = MagicMock(return_value=None) + runner._resolve_turn_agent_config = MagicMock( + return_value={ + "model": "test-model", + "runtime": {"api_key": "test-key"}, + "request_overrides": None, + } + ) + runner._run_in_executor_with_context = AsyncMock( + return_value={"final_response": "done", "messages": []} + ) + monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {}) + + mock_adapter = AsyncMock() + mock_adapter.send = AsyncMock() + mock_adapter.extract_media = MagicMock(return_value=([], "done")) + mock_adapter.extract_images = MagicMock(return_value=([], "done")) + runner.adapters[Platform.TELEGRAM] = mock_adapter + + source = SessionSource( + platform=Platform.TELEGRAM, + user_id="12345", + chat_id="67890", + chat_type="dm", + thread_id="20197", + ) + + await runner._run_background_task( + "say hello", + source, + "bg_test", + event_message_id="463", + ) + + mock_adapter.send.assert_called_once() + assert mock_adapter.send.call_args.kwargs["metadata"] == { + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "463", + } + @pytest.mark.asyncio async def test_agent_cleanup_runs_when_background_agent_raises(self): """Temporary background agents must be cleaned up on error paths too.""" diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index 7b982e9588c..92702f6fd00 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -1,13 +1,11 @@ -"""Tests for Telegram send() thread_id fallback. +"""Tests for Telegram topic/thread routing fallbacks. -When message_thread_id points to a non-existent thread, Telegram returns -BadRequest('Message thread not found'). Since BadRequest is a subclass of -NetworkError in python-telegram-bot, the old retry loop treated this as a -transient error and retried 3 times before silently failing — killing all -tool progress messages, streaming responses, and typing indicators. - -The fix detects "thread not found" BadRequest errors and retries the send -WITHOUT message_thread_id so the message still reaches the chat. +Supergroup forum topics route with ``message_thread_id``. Hermes-created +private DM topic lanes are different: live Telegram testing showed they only +stay in the expected lane when sends include both the private topic +``message_thread_id`` and a ``reply_to_message_id`` anchor to the triggering +user message. If either anchor is unavailable or rejected, the adapter must +avoid retrying with a partial topic route that can render outside the lane. """ import sys @@ -17,7 +15,14 @@ from types import SimpleNamespace import pytest from gateway.config import PlatformConfig, Platform -from gateway.platforms.base import SendResult +from gateway.platforms.base import ( + MessageEvent, + MessageType, + SendResult, + _reply_anchor_for_event, + _thread_metadata_for_source, +) +from gateway.session import build_session_key # ── Fake telegram.error hierarchy ────────────────────────────────────── @@ -44,23 +49,48 @@ class FakeRetryAfter(Exception): # Build a fake telegram module tree so the adapter's internal imports work +class _FakeInlineKeyboardButton: + def __init__(self, text, callback_data=None, **kwargs): + self.text = text + self.callback_data = callback_data + self.kwargs = kwargs + + +class _FakeInlineKeyboardMarkup: + def __init__(self, inline_keyboard): + self.inline_keyboard = inline_keyboard + + +class _FakeInputMediaPhoto: + def __init__(self, media, caption=None, **kwargs): + self.media = media + self.caption = caption + self.kwargs = kwargs + + _fake_telegram = types.ModuleType("telegram") _fake_telegram.Update = object _fake_telegram.Bot = object _fake_telegram.Message = object -_fake_telegram.InlineKeyboardButton = object -_fake_telegram.InlineKeyboardMarkup = object +_fake_telegram.InlineKeyboardButton = _FakeInlineKeyboardButton +_fake_telegram.InlineKeyboardMarkup = _FakeInlineKeyboardMarkup +_fake_telegram.InputMediaPhoto = _FakeInputMediaPhoto _fake_telegram_error = types.ModuleType("telegram.error") _fake_telegram_error.NetworkError = FakeNetworkError _fake_telegram_error.BadRequest = FakeBadRequest _fake_telegram_error.TimedOut = FakeTimedOut _fake_telegram.error = _fake_telegram_error _fake_telegram_constants = types.ModuleType("telegram.constants") -_fake_telegram_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2") +_fake_telegram_constants.ParseMode = SimpleNamespace( + MARKDOWN_V2="MarkdownV2", + MARKDOWN="Markdown", + HTML="HTML", +) _fake_telegram_constants.ChatType = SimpleNamespace( GROUP="group", SUPERGROUP="supergroup", CHANNEL="channel", + PRIVATE="private", ) _fake_telegram.constants = _fake_telegram_constants _fake_telegram_ext = types.ModuleType("telegram.ext") @@ -235,6 +265,626 @@ async def test_send_retries_without_thread_on_thread_not_found(): assert call_log[1]["message_thread_id"] is None +@pytest.mark.asyncio +async def test_send_private_dm_topic_uses_direct_messages_topic_id(): + """Private Telegram topics route sends via direct_messages_topic_id.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + return SimpleNamespace(message_id=42) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="test message", + metadata={"thread_id": "99999", "direct_messages_topic_id": "99999"}, + ) + + assert result.success is True + assert call_log[0]["message_thread_id"] is None + assert call_log[0]["direct_messages_topic_id"] == 99999 + + +def test_base_gateway_metadata_marks_telegram_dm_topics_as_reply_fallback(): + source = SimpleNamespace( + platform=Platform.TELEGRAM, + chat_type="dm", + thread_id="20189", + ) + + metadata = _thread_metadata_for_source(source, "462") + + assert metadata == { + "thread_id": "20189", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + } + + +def test_base_gateway_replies_to_triggering_message_for_telegram_dm_topic(): + """Private DM topic lanes should anchor replies to the active user message.""" + event = SimpleNamespace( + message_id="463", + reply_to_message_id="462", + source=SimpleNamespace( + platform=Platform.TELEGRAM, + chat_type="dm", + thread_id="20189", + ), + ) + + assert _reply_anchor_for_event(event) == "463" + + +@pytest.mark.asyncio +async def test_gateway_runner_busy_ack_replies_to_triggering_message_for_telegram_dm_topic(monkeypatch, tmp_path): + """GatewayRunner's duplicate thread metadata must match the base helper.""" + from gateway import run as gateway_run + + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + GatewayRunner = gateway_run.GatewayRunner + + class BusyAdapter: + def __init__(self): + self._pending_messages = {} + self.calls = [] + + async def _send_with_retry(self, **kwargs): + self.calls.append(kwargs) + return SendResult(success=True, message_id="ack-1") + + class BusyAgent: + def interrupt(self, _text): + return None + + def get_activity_summary(self): + return {} + + source = SimpleNamespace( + platform=Platform.TELEGRAM, + chat_id="12345", + chat_type="dm", + thread_id="20197", + user_id="user-1", + ) + event = MessageEvent( + text="busy follow-up", + message_type=MessageType.TEXT, + source=source, + message_id="463", + reply_to_message_id="462", + ) + session_key = build_session_key(source) + adapter = BusyAdapter() + + runner = object.__new__(GatewayRunner) + runner.adapters = {Platform.TELEGRAM: adapter} + runner._running_agents = {session_key: BusyAgent()} + runner._running_agents_ts = {} + runner._pending_messages = {} + runner._busy_ack_ts = {} + runner._draining = False + runner._busy_input_mode = "interrupt" + runner._is_user_authorized = lambda _source: True + + assert await runner._handle_active_session_busy_message(event, session_key) is True + + assert adapter.calls + assert adapter.calls[0]["reply_to"] == "463" + assert adapter.calls[0]["metadata"] == { + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "463", + } + + +@pytest.mark.asyncio +async def test_send_uses_reply_fallback_for_hermes_dm_topics(): + """Hermes-created Telegram DM topics route with thread id plus reply anchor.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(kwargs) + return SimpleNamespace(message_id=777) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="test message", + reply_to="462", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert "direct_messages_topic_id" not in call_log[0] + + +@pytest.mark.asyncio +async def test_send_uses_metadata_reply_fallback_for_streaming_dm_topics(): + """Metadata-only sends still stay in Hermes-created Telegram DM topics.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(kwargs) + return SimpleNamespace(message_id=778) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="streamed text", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert "direct_messages_topic_id" not in call_log[0] + + +@pytest.mark.asyncio +async def test_send_reply_fallback_applies_to_every_chunk_for_dm_topics(): + """Long Telegram DM-topic fallback sends must anchor every chunk.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + return SimpleNamespace(message_id=len(call_log)) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="A" * 5000, + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert len(call_log) > 1 + assert all(call["reply_to_message_id"] == 462 for call in call_log) + assert all(call["message_thread_id"] == 20197 for call in call_log) + assert all("direct_messages_topic_id" not in call for call in call_log) + + +@pytest.mark.asyncio +async def test_send_model_picker_uses_metadata_reply_fallback_for_dm_topics(): + """Inline keyboard sends also consume the metadata reply fallback.""" + adapter = _make_adapter() + adapter._model_picker_state = {} + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(kwargs) + return SimpleNamespace(message_id=779) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send_model_picker( + chat_id="123", + providers=[{"name": "OpenAI", "slug": "openai", "models": [], "total_models": 0}], + current_model="gpt-test", + current_provider="openai", + session_key="telegram:123:20197", + on_model_selected=lambda *_: None, + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert "direct_messages_topic_id" not in call_log[0] + + +@pytest.mark.asyncio +async def test_send_dm_topic_fallback_without_anchor_does_not_crash(): + """DM-topic fallback without an anchor must not use message_thread_id alone.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + return SimpleNamespace(message_id=780) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="source-only send", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[0] + assert "direct_messages_topic_id" not in call_log[0] + + +@pytest.mark.asyncio +async def test_send_dm_topic_reply_not_found_retry_drops_thread_id(): + """If Telegram deletes the reply anchor, private-topic retry must drop thread id too.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise FakeBadRequest("Message to be replied not found") + return SimpleNamespace(message_id=781) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="123", + content="anchor disappeared", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[1]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[1] + assert "direct_messages_topic_id" not in call_log[1] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ("method_name", "bot_method_name", "path_kw", "filename", "payload"), + [ + ("send_image_file", "send_photo", "image_path", "photo.png", b"png-data"), + ("send_document", "send_document", "file_path", "report.txt", b"report-data"), + ("send_video", "send_video", "video_path", "clip.mp4", b"video-data"), + ("send_voice", "send_voice", "audio_path", "clip.ogg", b"ogg-data"), + ("send_voice", "send_audio", "audio_path", "clip.mp3", b"mp3-data"), + ], +) +async def test_native_media_dm_topic_reply_not_found_retry_drops_thread_id( + tmp_path, + method_name, + bot_method_name, + path_kw, + filename, + payload, +): + adapter = _make_adapter() + media_path = tmp_path / filename + media_path.write_bytes(payload) + call_log = [] + + async def mock_send_media(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise FakeBadRequest("Message to be replied not found") + return SimpleNamespace(message_id=782) + + adapter._bot = SimpleNamespace(**{bot_method_name: mock_send_media}) + + result = await getattr(adapter, method_name)( + chat_id="123", + **{path_kw: str(media_path)}, + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[1]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[1] + assert "direct_messages_topic_id" not in call_log[1] + + +@pytest.mark.asyncio +async def test_animation_dm_topic_reply_not_found_retry_drops_thread_id(): + adapter = _make_adapter() + call_log = [] + + async def mock_send_animation(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise FakeBadRequest("Message to be replied not found") + return SimpleNamespace(message_id=786) + + adapter._bot = SimpleNamespace(send_animation=mock_send_animation) + + result = await adapter.send_animation( + chat_id="123", + animation_url="https://example.com/anim.gif", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[1]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[1] + assert "direct_messages_topic_id" not in call_log[1] + + +@pytest.mark.asyncio +async def test_media_group_dm_topic_reply_not_found_retry_drops_thread_id(tmp_path): + adapter = _make_adapter() + image_path = tmp_path / "photo.png" + image_path.write_bytes(b"png-data") + call_log = [] + + async def mock_send_media_group(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise FakeBadRequest("Message to be replied not found") + return [SimpleNamespace(message_id=783)] + + adapter._bot = SimpleNamespace(send_media_group=mock_send_media_group) + + await adapter.send_multiple_images( + chat_id="123", + images=[(f"file://{image_path}", "caption")], + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[1]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[1] + assert "direct_messages_topic_id" not in call_log[1] + + +@pytest.mark.asyncio +async def test_send_image_url_dm_topic_reply_not_found_retry_drops_thread_id(monkeypatch): + adapter = _make_adapter() + call_log = [] + + async def mock_send_photo(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise FakeBadRequest("Message to be replied not found") + return SimpleNamespace(message_id=784) + + adapter._bot = SimpleNamespace(send_photo=mock_send_photo) + import tools.url_safety as url_safety + + monkeypatch.setattr(url_safety, "is_safe_url", lambda _url: True) + + result = await adapter.send_image( + chat_id="123", + image_url="https://example.com/photo.png", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[1]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[1] + assert "direct_messages_topic_id" not in call_log[1] + + +@pytest.mark.asyncio +async def test_send_image_upload_dm_topic_reply_not_found_retry_drops_thread_id(monkeypatch): + adapter = _make_adapter() + call_log = [] + + async def mock_send_photo(**kwargs): + call_log.append(dict(kwargs)) + if len(call_log) == 1: + raise RuntimeError("URL is too large") + if len(call_log) == 2: + raise FakeBadRequest("Message to be replied not found") + return SimpleNamespace(message_id=785) + + class _FakeResponse: + content = b"image-data" + + def raise_for_status(self): + return None + + class _FakeAsyncClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return None + + async def get(self, _url): + return _FakeResponse() + + monkeypatch.setitem( + sys.modules, + "httpx", + SimpleNamespace(AsyncClient=_FakeAsyncClient), + ) + adapter._bot = SimpleNamespace(send_photo=mock_send_photo) + import tools.url_safety as url_safety + + monkeypatch.setattr(url_safety, "is_safe_url", lambda _url: True) + + result = await adapter.send_image( + chat_id="123", + image_url="https://example.com/photo.png", + metadata={ + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + }, + ) + + assert result.success is True + assert call_log[0]["reply_to_message_id"] == 462 + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[1]["reply_to_message_id"] == 462 + assert call_log[1]["message_thread_id"] == 20197 + assert call_log[2]["reply_to_message_id"] is None + assert "message_thread_id" not in call_log[2] + assert "direct_messages_topic_id" not in call_log[2] + + +@pytest.mark.asyncio +async def test_slash_confirm_private_topic_callback_followup_sends_thread_and_reply(monkeypatch): + adapter = _make_adapter() + adapter._slash_confirm_state = {"confirm-1": "session-1"} + adapter._is_callback_user_authorized = lambda *args, **kwargs: True + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + return SimpleNamespace(message_id=9001) + + async def resolve(_session_key, _confirm_id, _choice): + return "done" + + from tools import slash_confirm + + monkeypatch.setattr(slash_confirm, "resolve", resolve) + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + class Query: + data = "sc:once:confirm-1" + from_user = SimpleNamespace(id=42, first_name="Alice") + message = SimpleNamespace( + chat_id=12345, + chat=SimpleNamespace(type=_fake_telegram_constants.ChatType.PRIVATE), + message_thread_id=20197, + message_id=462, + ) + + async def answer(self, **kwargs): + return None + + async def edit_message_text(self, **kwargs): + return None + + await adapter._handle_callback_query(SimpleNamespace(callback_query=Query()), SimpleNamespace()) + + assert call_log + assert call_log[0]["message_thread_id"] == 20197 + assert call_log[0]["reply_to_message_id"] == 462 + + +@pytest.mark.asyncio +async def test_slash_confirm_forum_callback_followup_keeps_existing_thread_behavior(monkeypatch): + adapter = _make_adapter() + adapter._slash_confirm_state = {"confirm-1": "session-1"} + adapter._is_callback_user_authorized = lambda *args, **kwargs: True + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + return SimpleNamespace(message_id=9001) + + async def resolve(_session_key, _confirm_id, _choice): + return "done" + + from tools import slash_confirm + + monkeypatch.setattr(slash_confirm, "resolve", resolve) + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + class Query: + data = "sc:once:confirm-1" + from_user = SimpleNamespace(id=42, first_name="Alice") + message = SimpleNamespace( + chat_id=-100123, + chat=SimpleNamespace(type=_fake_telegram_constants.ChatType.SUPERGROUP), + message_thread_id=20197, + message_id=462, + ) + + async def answer(self, **kwargs): + return None + + async def edit_message_text(self, **kwargs): + return None + + await adapter._handle_callback_query(SimpleNamespace(callback_query=Query()), SimpleNamespace()) + + assert call_log + assert call_log[0]["message_thread_id"] == 20197 + assert "reply_to_message_id" not in call_log[0] + assert "direct_messages_topic_id" not in call_log[0] + + +@pytest.mark.asyncio +async def test_base_send_image_fallback_preserves_metadata(): + """Base image fallback should pass metadata through instead of referencing kwargs.""" + from gateway.platforms.base import BasePlatformAdapter + + class _ConcreteBaseAdapter(BasePlatformAdapter): + async def connect(self): + return True + + async def disconnect(self): + return None + + async def send(self, **kwargs): + call_log.append(kwargs) + return SendResult(success=True, message_id="781") + + async def get_chat_info(self, chat_id): + return None + + call_log = [] + adapter = _ConcreteBaseAdapter(Platform.TELEGRAM, None) + metadata = {"thread_id": "20197"} + + result = await adapter.send_image( + chat_id="123", + image_url="https://example.invalid/image.png", + metadata=metadata, + ) + + assert result.success is True + assert call_log[0]["metadata"] is metadata + + @pytest.mark.asyncio async def test_send_raises_on_other_bad_request(): """Non-thread BadRequest errors should NOT be retried — they fail immediately.""" diff --git a/tests/gateway/test_voice_command.py b/tests/gateway/test_voice_command.py index 947d4904aa8..a877730dcec 100644 --- a/tests/gateway/test_voice_command.py +++ b/tests/gateway/test_voice_command.py @@ -433,6 +433,37 @@ class TestSendVoiceReply: call_args = mock_adapter.send_voice.call_args assert call_args.kwargs.get("chat_id") == "123" + @pytest.mark.asyncio + async def test_auto_voice_reply_uses_thread_metadata_helper(self, runner): + from gateway.config import Platform + + mock_adapter = AsyncMock() + mock_adapter.send_voice = AsyncMock() + event = _make_event() + event.source.platform = Platform.TELEGRAM + event.source.chat_type = "dm" + event.source.thread_id = "20197" + event.message_id = "462" + runner.adapters[event.source.platform] = mock_adapter + + tts_result = json.dumps({"success": True, "file_path": "/tmp/test.ogg"}) + + with patch("tools.tts_tool.text_to_speech_tool", return_value=tts_result), \ + patch("tools.tts_tool._strip_markdown_for_tts", side_effect=lambda t: t), \ + patch("os.path.isfile", return_value=True), \ + patch("os.unlink"), \ + patch("os.makedirs"): + await runner._send_voice_reply(event, "Hello world") + + mock_adapter.send_voice.assert_called_once() + call_kwargs = mock_adapter.send_voice.call_args.kwargs + assert call_kwargs["reply_to"] == "462" + assert call_kwargs["metadata"] == { + "thread_id": "20197", + "telegram_dm_topic_reply_fallback": True, + "telegram_reply_to_message_id": "462", + } + @pytest.mark.asyncio async def test_empty_text_after_strip_skips(self, runner): event = _make_event()