diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 19eb72e2ec..1bda152f5b 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -134,6 +134,7 @@ class TelegramAdapter(BasePlatformAdapter): # When a chunk is near this limit, a continuation is almost certain. _SPLIT_THRESHOLD = 4000 MEDIA_GROUP_WAIT_SECONDS = 0.8 + _GENERAL_TOPIC_THREAD_ID = "1" def __init__(self, config: PlatformConfig): super().__init__(config, Platform.TELEGRAM) @@ -178,6 +179,29 @@ class TelegramAdapter(BasePlatformAdapter): allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()} return "*" in allowed_ids or user_id in allowed_ids + @classmethod + def _metadata_thread_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]: + if not metadata: + return None + thread_id = metadata.get("thread_id") or metadata.get("message_thread_id") + return str(thread_id) if thread_id is not None else None + + @classmethod + def _message_thread_id_for_send(cls, thread_id: Optional[str]) -> Optional[int]: + if not thread_id or str(thread_id) == cls._GENERAL_TOPIC_THREAD_ID: + return None + return int(thread_id) + + @classmethod + def _message_thread_id_for_typing(cls, thread_id: Optional[str]) -> Optional[int]: + if not thread_id: + return None + return int(thread_id) + + @staticmethod + def _is_thread_not_found_error(error: Exception) -> bool: + return "thread not found" in str(error).lower() + def _fallback_ips(self) -> list[str]: """Return validated fallback IPs from config (populated by _apply_env_overrides).""" configured = self.config.extra.get("fallback_ips", []) if getattr(self.config, "extra", None) else [] @@ -849,7 +873,7 @@ class TelegramAdapter(BasePlatformAdapter): ] message_ids = [] - thread_id = metadata.get("thread_id") if metadata else None + thread_id = self._metadata_thread_id(metadata) try: from telegram.error import NetworkError as _NetErr @@ -869,7 +893,7 @@ class TelegramAdapter(BasePlatformAdapter): for i, chunk in enumerate(chunks): should_thread = self._should_thread_reply(reply_to, i) reply_to_id = int(reply_to) if should_thread else None - effective_thread_id = int(thread_id) if thread_id else None + effective_thread_id = self._message_thread_id_for_send(thread_id) msg = None for _send_attempt in range(3): @@ -906,8 +930,7 @@ class TelegramAdapter(BasePlatformAdapter): # (not transient network issues). Detect and handle # specific cases instead of blindly retrying. if _BadReq and isinstance(send_err, _BadReq): - err_lower = str(send_err).lower() - if "thread not found" in err_lower and effective_thread_id is not None: + if self._is_thread_not_found_error(send_err) and effective_thread_id is not None: # Thread doesn't exist — retry without # message_thread_id so the message still # reaches the chat. @@ -917,6 +940,7 @@ class TelegramAdapter(BasePlatformAdapter): ) effective_thread_id = None continue + err_lower = str(send_err).lower() if "message to be replied not found" in err_lower and reply_to_id is not None: # Original message was deleted before we # could reply — clear reply target and retry @@ -1115,9 +1139,7 @@ class TelegramAdapter(BasePlatformAdapter): ) # Resolve thread context for thread replies - thread_id = None - if metadata: - thread_id = metadata.get("thread_id") or metadata.get("message_thread_id") + thread_id = self._metadata_thread_id(metadata) # We'll use the message_id as part of callback_data to look up session_key # Send a placeholder first, then update — or use a counter. @@ -1145,8 +1167,9 @@ class TelegramAdapter(BasePlatformAdapter): "reply_markup": keyboard, **self._link_preview_kwargs(), } - if thread_id: - kwargs["message_thread_id"] = int(thread_id) + message_thread_id = self._message_thread_id_for_send(thread_id) + if message_thread_id is not None: + kwargs["message_thread_id"] = message_thread_id msg = await self._bot.send_message(**kwargs) @@ -1579,23 +1602,23 @@ class TelegramAdapter(BasePlatformAdapter): with open(audio_path, "rb") as audio_file: # .ogg files -> send as voice (round playable bubble) if audio_path.endswith((".ogg", ".opus")): - _voice_thread = metadata.get("thread_id") if metadata else None + _voice_thread = self._metadata_thread_id(metadata) msg = await self._bot.send_voice( chat_id=int(chat_id), voice=audio_file, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_voice_thread) if _voice_thread else None, + message_thread_id=self._message_thread_id_for_send(_voice_thread), ) else: # .mp3 and others -> send as audio file - _audio_thread = metadata.get("thread_id") if metadata else None + _audio_thread = self._metadata_thread_id(metadata) msg = await self._bot.send_audio( chat_id=int(chat_id), audio=audio_file, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_audio_thread) if _audio_thread else None, + message_thread_id=self._message_thread_id_for_send(_audio_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -1625,14 +1648,14 @@ class TelegramAdapter(BasePlatformAdapter): if not os.path.exists(image_path): return SendResult(success=False, error=f"Image file not found: {image_path}") - _thread = metadata.get("thread_id") if metadata else None + _thread = self._metadata_thread_id(metadata) with open(image_path, "rb") as image_file: msg = await self._bot.send_photo( chat_id=int(chat_id), photo=image_file, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_thread) if _thread else None, + message_thread_id=self._message_thread_id_for_send(_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -1663,7 +1686,7 @@ class TelegramAdapter(BasePlatformAdapter): return SendResult(success=False, error=f"File not found: {file_path}") display_name = file_name or os.path.basename(file_path) - _thread = metadata.get("thread_id") if metadata else None + _thread = self._metadata_thread_id(metadata) with open(file_path, "rb") as f: msg = await self._bot.send_document( @@ -1672,7 +1695,7 @@ class TelegramAdapter(BasePlatformAdapter): filename=display_name, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_thread) if _thread else None, + message_thread_id=self._message_thread_id_for_send(_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -1696,14 +1719,14 @@ class TelegramAdapter(BasePlatformAdapter): if not os.path.exists(video_path): return SendResult(success=False, error=f"Video file not found: {video_path}") - _thread = metadata.get("thread_id") if metadata else None + _thread = self._metadata_thread_id(metadata) with open(video_path, "rb") as f: msg = await self._bot.send_video( chat_id=int(chat_id), video=f, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_thread) if _thread else None, + message_thread_id=self._message_thread_id_for_send(_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -1733,13 +1756,13 @@ class TelegramAdapter(BasePlatformAdapter): try: # Telegram can send photos directly from URLs (up to ~5MB) - _photo_thread = metadata.get("thread_id") if metadata else None + _photo_thread = self._metadata_thread_id(metadata) msg = await self._bot.send_photo( chat_id=int(chat_id), photo=image_url, caption=caption[:1024] if caption else None, # Telegram caption limit reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_photo_thread) if _photo_thread else None, + message_thread_id=self._message_thread_id_for_send(_photo_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -1762,6 +1785,7 @@ class TelegramAdapter(BasePlatformAdapter): photo=image_data, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, + message_thread_id=self._message_thread_id_for_send(_photo_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e2: @@ -1787,13 +1811,13 @@ class TelegramAdapter(BasePlatformAdapter): return SendResult(success=False, error="Not connected") try: - _anim_thread = metadata.get("thread_id") if metadata else None + _anim_thread = self._metadata_thread_id(metadata) msg = await self._bot.send_animation( chat_id=int(chat_id), animation=animation_url, caption=caption[:1024] if caption else None, reply_to_message_id=int(reply_to) if reply_to else None, - message_thread_id=int(_anim_thread) if _anim_thread else None, + message_thread_id=self._message_thread_id_for_send(_anim_thread), ) return SendResult(success=True, message_id=str(msg.message_id)) except Exception as e: @@ -1810,12 +1834,23 @@ class TelegramAdapter(BasePlatformAdapter): """Send typing indicator.""" if self._bot: try: - _typing_thread = metadata.get("thread_id") if metadata else None - await self._bot.send_chat_action( - chat_id=int(chat_id), - action="typing", - message_thread_id=int(_typing_thread) if _typing_thread else None, - ) + _typing_thread = self._metadata_thread_id(metadata) + message_thread_id = self._message_thread_id_for_typing(_typing_thread) + try: + await self._bot.send_chat_action( + chat_id=int(chat_id), + action="typing", + message_thread_id=message_thread_id, + ) + except Exception as e: + if message_thread_id is not None and self._is_thread_not_found_error(e): + await self._bot.send_chat_action( + chat_id=int(chat_id), + action="typing", + message_thread_id=None, + ) + else: + raise except Exception as e: # Typing failures are non-fatal; log at debug level only. logger.debug( @@ -2760,7 +2795,9 @@ class TelegramAdapter(BasePlatformAdapter): # Resolve DM topic name and skill binding thread_id_raw = message.message_thread_id - thread_id_str = str(thread_id_raw) if thread_id_raw else None + thread_id_str = str(thread_id_raw) if thread_id_raw is not None else None + if chat_type == "group" and thread_id_str is None and getattr(chat, "is_forum", False): + thread_id_str = self._GENERAL_TOPIC_THREAD_ID chat_topic = None topic_skill = None diff --git a/tests/gateway/test_telegram_thread_fallback.py b/tests/gateway/test_telegram_thread_fallback.py index fee1dcc806..4930467bfe 100644 --- a/tests/gateway/test_telegram_thread_fallback.py +++ b/tests/gateway/test_telegram_thread_fallback.py @@ -45,6 +45,11 @@ class FakeRetryAfter(Exception): # Build a fake telegram module tree so the adapter's internal imports work _fake_telegram = types.ModuleType("telegram") +_fake_telegram.Update = object +_fake_telegram.Bot = object +_fake_telegram.Message = object +_fake_telegram.InlineKeyboardButton = object +_fake_telegram.InlineKeyboardMarkup = object _fake_telegram_error = types.ModuleType("telegram.error") _fake_telegram_error.NetworkError = FakeNetworkError _fake_telegram_error.BadRequest = FakeBadRequest @@ -52,7 +57,21 @@ _fake_telegram_error.TimedOut = FakeTimedOut _fake_telegram.error = _fake_telegram_error _fake_telegram_constants = types.ModuleType("telegram.constants") _fake_telegram_constants.ParseMode = SimpleNamespace(MARKDOWN_V2="MarkdownV2") +_fake_telegram_constants.ChatType = SimpleNamespace( + GROUP="group", + SUPERGROUP="supergroup", + CHANNEL="channel", +) _fake_telegram.constants = _fake_telegram_constants +_fake_telegram_ext = types.ModuleType("telegram.ext") +_fake_telegram_ext.Application = object +_fake_telegram_ext.CommandHandler = object +_fake_telegram_ext.CallbackQueryHandler = object +_fake_telegram_ext.MessageHandler = object +_fake_telegram_ext.ContextTypes = SimpleNamespace(DEFAULT_TYPE=object) +_fake_telegram_ext.filters = object +_fake_telegram_request = types.ModuleType("telegram.request") +_fake_telegram_request.HTTPXRequest = object @pytest.fixture(autouse=True) @@ -61,6 +80,8 @@ def _inject_fake_telegram(monkeypatch): monkeypatch.setitem(sys.modules, "telegram", _fake_telegram) monkeypatch.setitem(sys.modules, "telegram.error", _fake_telegram_error) monkeypatch.setitem(sys.modules, "telegram.constants", _fake_telegram_constants) + monkeypatch.setitem(sys.modules, "telegram.ext", _fake_telegram_ext) + monkeypatch.setitem(sys.modules, "telegram.request", _fake_telegram_request) def _make_adapter(): @@ -68,6 +89,7 @@ def _make_adapter(): config = PlatformConfig(enabled=True, token="fake-token") adapter = object.__new__(TelegramAdapter) + adapter.config = config adapter._config = config adapter._platform = Platform.TELEGRAM adapter._connected = True @@ -82,6 +104,81 @@ def _make_adapter(): return adapter +def test_forum_general_topic_without_message_thread_id_keeps_thread_context(): + """Forum General-topic messages should keep synthetic thread context.""" + from gateway.platforms import telegram as telegram_mod + + adapter = _make_adapter() + message = SimpleNamespace( + text="hello from General", + caption=None, + chat=SimpleNamespace( + id=-100123, + type=telegram_mod.ChatType.SUPERGROUP, + is_forum=True, + title="Forum group", + ), + from_user=SimpleNamespace(id=456, full_name="Alice"), + message_thread_id=None, + reply_to_message=None, + message_id=10, + date=None, + ) + + event = adapter._build_message_event(message, msg_type=SimpleNamespace(value="text")) + + assert event.source.chat_id == "-100123" + assert event.source.chat_type == "group" + assert event.source.thread_id == "1" + + +@pytest.mark.asyncio +async def test_send_omits_general_topic_thread_id(): + """Telegram sends to forum General should omit message_thread_id=1.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_message(**kwargs): + call_log.append(dict(kwargs)) + return SimpleNamespace(message_id=42) + + adapter._bot = SimpleNamespace(send_message=mock_send_message) + + result = await adapter.send( + chat_id="-100123", + content="test message", + metadata={"thread_id": "1"}, + ) + + assert result.success is True + assert len(call_log) == 1 + assert call_log[0]["chat_id"] == -100123 + assert call_log[0]["text"] == "test message" + assert call_log[0]["reply_to_message_id"] is None + assert call_log[0]["message_thread_id"] is None + + +@pytest.mark.asyncio +async def test_send_typing_retries_without_general_thread_when_not_found(): + """Typing for forum General should fall back if Telegram rejects thread 1.""" + adapter = _make_adapter() + call_log = [] + + async def mock_send_chat_action(**kwargs): + call_log.append(dict(kwargs)) + if kwargs.get("message_thread_id") == 1: + raise FakeBadRequest("Message thread not found") + + adapter._bot = SimpleNamespace(send_chat_action=mock_send_chat_action) + + await adapter.send_typing("-100123", metadata={"thread_id": "1"}) + + assert call_log == [ + {"chat_id": -100123, "action": "typing", "message_thread_id": 1}, + {"chat_id": -100123, "action": "typing", "message_thread_id": None}, + ] + + @pytest.mark.asyncio async def test_send_retries_without_thread_on_thread_not_found(): """When message_thread_id causes 'thread not found', retry without it."""