From c648ecdca526f8c421d8c8c00e1aa9b330bbcb5e Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sun, 28 Jun 2026 02:54:24 -0700 Subject: [PATCH] fix(telegram): reject unauthorized users before event construction (#40863) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removed/unauthorized Telegram users could inject prompt content before the per-user auth gate fired. The adapter ran `_should_process_message`, `_build_message_event`, and text/photo batching — and dispatched to the runner — before `_is_user_authorized()` (gateway/authz_mixin.py) rejected the sender. Unmentioned group chatter from a removed user was also persisted into the session transcript via `_observe_unmentioned_group_message`, leaking into the agent's observed context independent of dispatch. Add `_is_user_authorized_from_message()` as an intake prefilter that runs in `_handle_text_message`, `_handle_command`, `_handle_location_message`, and `_handle_media_message` BEFORE batching, event construction, and the unmentioned-group observe branch. It reuses the runner's `_is_user_authorized()` with a correctly-shaped SessionSource (group vs forum vs dm, real chat_id for TELEGRAM_GROUP_ALLOWED_* allowlists), falls back to env allowlists, and only rejects when an allowlist actually exists — unknown DMs with no allowlist still reach the pairing flow. Channel posts authorize via `sender_chat` identity when `from_user` is absent. Co-authored-by: liuhao1024 Co-authored-by: Carlos Manuel Cejas --- plugins/platforms/telegram/adapter.py | 172 ++++++++++ scripts/release.py | 1 + tests/gateway/test_telegram_auth_check.py | 396 ++++++++++++++++++++++ 3 files changed, 569 insertions(+) create mode 100644 tests/gateway/test_telegram_auth_check.py diff --git a/plugins/platforms/telegram/adapter.py b/plugins/platforms/telegram/adapter.py index 9e456bc67bd..ae711604005 100644 --- a/plugins/platforms/telegram/adapter.py +++ b/plugins/platforms/telegram/adapter.py @@ -559,6 +559,146 @@ class TelegramAdapter(BasePlatformAdapter): allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()} return "*" in allowed_ids or normalized_user_id in allowed_ids + def _source_from_message_for_auth(self, message: Message): + """Build the same Telegram source shape the gateway auth path expects. + + Resolves the identity to authorize from ``from_user`` for normal + messages, falling back to ``sender_chat`` for channel posts (which + carry no ``from_user``) so a removed/unauthorized channel cannot + inject content via the broadcast path either. + """ + from gateway.session import SessionSource + + user = getattr(message, "from_user", None) + chat = getattr(message, "chat", None) + user_id = str(getattr(user, "id", "")).strip() or None + user_name = ( + str(getattr(user, "username", "") or getattr(user, "full_name", "") or "").strip() + or None + ) + # Channel posts have no from_user — authorize the sender chat instead. + if not user_id: + sender_chat = getattr(message, "sender_chat", None) + if sender_chat is not None: + user_id = str(getattr(sender_chat, "id", "")).strip() or None + if not user_name: + user_name = ( + str(getattr(sender_chat, "title", "") or "").strip() or None + ) + + chat_id = str(getattr(chat, "id", "")).strip() or user_id + chat_type = str(getattr(chat, "type", "dm")).strip().lower() or "dm" + if chat_type == "private": + chat_type = "dm" + elif chat_type == "supergroup": + thread_id_raw = getattr(message, "message_thread_id", None) + is_topic_message = bool(getattr(message, "is_topic_message", False)) + is_forum_group = getattr(chat, "is_forum", False) is True + chat_type = ( + "forum" + if thread_id_raw is not None and (is_topic_message or is_forum_group) + else "group" + ) + + thread_id = None + thread_id_raw = getattr(message, "message_thread_id", None) + if thread_id_raw is not None: + is_topic_message = bool(getattr(message, "is_topic_message", False)) + is_forum_group = getattr(chat, "is_forum", False) is True + if chat_type == "forum" and (is_topic_message or is_forum_group): + thread_id = str(thread_id_raw) + elif chat_type == "dm" and is_topic_message: + thread_id = str(thread_id_raw) + + return SessionSource( + platform=Platform.TELEGRAM, + chat_id=chat_id or "", + chat_type=chat_type, + user_id=user_id, + user_name=user_name, + thread_id=thread_id, + ) + + def _telegram_auth_env_configured(self) -> bool: + """Return True when Telegram auth env vars make an early decision safe.""" + keys = ( + "TELEGRAM_ALLOWED_USERS", + "TELEGRAM_GROUP_ALLOWED_USERS", + "TELEGRAM_GROUP_ALLOWED_CHATS", + "TELEGRAM_ALLOW_ALL_USERS", + "GATEWAY_ALLOWED_USERS", + "GATEWAY_ALLOW_ALL_USERS", + ) + return any(os.getenv(key, "").strip() for key in keys) + + def _is_user_authorized_from_message(self, message: Message) -> bool: + """Check if the sender of a Telegram message is authorized. + + Intake prefilter that runs BEFORE text batching, event construction, + and unmentioned-group observation, so a removed/unauthorized user + cannot inject prompt content into the agent path or the observed + transcript (fixes #40863). It only rejects when it can make the same + context-aware decision the runner would make. Unknown DMs with no + allowlist still pass through so the normal pairing flow can run. + """ + source = self._source_from_message_for_auth(message) + user_id = source.user_id + # No identity at all → genuine group service message (pin, delete, + # new_chat_members, etc.). Defer to the cold path. Channel posts + # without sender_chat already resolved to None above and fall here; + # they carry no authorizable identity, so let the normal + # _should_process_message gating handle them. + if not user_id: + return True + + # Adapter-level allow_from: when set, it is the sole authority. + adapter_allow_from = self.config.extra.get("allow_from") + if adapter_allow_from is not None: + allowed = {str(u).strip() for u in adapter_allow_from if str(u).strip()} + return user_id in allowed or "*" in allowed + + # Test/custom injection only. The class method named + # _is_callback_user_authorized is for inline button callbacks and must + # not be treated as a user-id-only shortcut for real messages — only + # honor an instance-level override (set in tests). + callback_auth = self.__dict__.get("_is_callback_user_authorized") + if callable(callback_auth): + try: + return bool( + callback_auth( + user_id, + chat_id=source.chat_id, + chat_type=source.chat_type, + thread_id=source.thread_id, + user_name=source.user_name, + ) + ) + except Exception: + pass + + runner = getattr(getattr(self, "_message_handler", None), "__self__", None) + auth_fn = getattr(runner, "_is_user_authorized", None) + if callable(auth_fn): + # Only make an early decision via the runner when an allowlist + # actually exists; otherwise unknown DMs must reach the pairing + # flow rather than being default-denied here. + if not self._telegram_auth_env_configured(): + return True + try: + return bool(auth_fn(source)) + except Exception: + logger.debug( + "[Telegram] Falling back to env-only auth for user %s", + user_id, + exc_info=True, + ) + + allowed_csv = os.getenv("TELEGRAM_ALLOWED_USERS", "").strip() + if not allowed_csv: + return True + allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()} + return "*" in allowed_ids or user_id in allowed_ids + @classmethod def _metadata_thread_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]: if not metadata: @@ -6567,6 +6707,17 @@ class TelegramAdapter(BasePlatformAdapter): msg = self._effective_update_message(update) if not msg or not msg.text: return + # Early user-level auth check: reject unauthorized users before any + # text batching, observe-buffer persistence, event building, or response + # generation. This prevents removed/blocked users from injecting prompts + # into the agent path or the observed transcript context (#40863). + if not self._is_user_authorized_from_message(msg): + logger.warning( + "[Telegram] Blocked unauthorized user %s in chat %s", + getattr(getattr(msg, "from_user", None), "id", None), + getattr(getattr(msg, "chat", None), "id", None), + ) + return if not self._should_process_message(msg): if self._should_observe_unmentioned_group_message(msg): self._observe_unmentioned_group_message(msg, MessageType.TEXT, update_id=update.update_id) @@ -6586,6 +6737,13 @@ class TelegramAdapter(BasePlatformAdapter): return if not self._should_process_message(msg, is_command=True): return + if not self._is_user_authorized_from_message(msg): + logger.warning( + "[Telegram] Blocked unauthorized user %s in chat %s", + getattr(getattr(msg, "from_user", None), "id", None), + getattr(getattr(msg, "chat", None), "id", None), + ) + return await self._ensure_forum_commands(msg) event = self._build_message_event(msg, MessageType.COMMAND, update_id=update.update_id) @@ -6599,6 +6757,13 @@ class TelegramAdapter(BasePlatformAdapter): msg = self._effective_update_message(update) if not msg: return + if not self._is_user_authorized_from_message(msg): + logger.warning( + "[Telegram] Blocked unauthorized user %s in chat %s", + getattr(getattr(msg, "from_user", None), "id", None), + getattr(getattr(msg, "chat", None), "id", None), + ) + return if not self._should_process_message(msg): if self._should_observe_unmentioned_group_message(msg): self._observe_unmentioned_group_message(msg, MessageType.LOCATION, update_id=update.update_id) @@ -6781,6 +6946,13 @@ class TelegramAdapter(BasePlatformAdapter): """Handle incoming media messages, downloading images to local cache.""" if not update.message: return + if not self._is_user_authorized_from_message(update.message): + logger.info( + "[Telegram] Blocked media from unauthorized user %s in chat %s", + getattr(getattr(update.message, "from_user", None), "id", None), + getattr(getattr(update.message, "chat", None), "id", None), + ) + return if not self._should_process_message(update.message): if self._should_observe_unmentioned_group_message(update.message): _m = update.message diff --git a/scripts/release.py b/scripts/release.py index d4c69a57ef8..eb23e2e53a2 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -45,6 +45,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json" # Auto-extracted from noreply emails + manual overrides AUTHOR_MAP = { + "carlosmcejas@gmail.com": "cmcejas", # PR #41188 salvage (early Telegram auth gate before event build/observe; #40863) "ha-agent@homelab.4410.us": "oreoluwa", # PR #49845 salvage (skip preflight content-type probe for OAuth MCP servers so OAuth discovery runs; Akiflow/Hospitable) "prathamesh290504@gmail.com": "PRATHAMESH75", # PR #37550 salvage (ExecStopPost cgroup-orphan reaper to unblock systemd restart; #37454) "der@konsi.org": "konsisumer", # PR #19608 salvage (read-modify-write merge in write_credential_pool to preserve concurrently-added credentials; #19566) diff --git a/tests/gateway/test_telegram_auth_check.py b/tests/gateway/test_telegram_auth_check.py new file mode 100644 index 00000000000..bc309462f13 --- /dev/null +++ b/tests/gateway/test_telegram_auth_check.py @@ -0,0 +1,396 @@ +"""Tests for Telegram adapter early authorization check. + +Verifies that unauthorized users are blocked before any text batching, +event building, or response generation occurs. +""" +import asyncio +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import MessageType + + +def _make_adapter(allow_from=None, allowed_chats=None, group_allowed_chats=None, callback_auth=None, **extra_overrides): + try: + from plugins.platforms.telegram.adapter import TelegramAdapter + except ModuleNotFoundError: # PR branch before Telegram plugin extraction + from gateway.platforms.telegram import TelegramAdapter + + extra = {} + if allow_from is not None: + extra["allow_from"] = allow_from + if allowed_chats is not None: + extra["allowed_chats"] = allowed_chats + if group_allowed_chats is not None: + extra["group_allowed_chats"] = group_allowed_chats + extra.update(extra_overrides) + + adapter = object.__new__(TelegramAdapter) + adapter.platform = Platform.TELEGRAM + adapter.config = PlatformConfig(enabled=True, token="fake-token", extra=extra) + adapter._bot = SimpleNamespace(id=999, username="test_bot") + adapter._message_handler = AsyncMock() + adapter._pending_text_batches = {} + adapter._pending_text_batch_tasks = {} + adapter._text_batch_delay_seconds = 0.01 + adapter._text_batch_split_delay_seconds = 0.01 + adapter._mention_patterns = adapter._compile_mention_patterns() + adapter._forum_lock = asyncio.Lock() + adapter._forum_command_registered = set() + adapter._active_sessions = {} + adapter._pending_messages = {} + if callback_auth is not None: + adapter._is_callback_user_authorized = callback_auth + return adapter + + +def _make_message(text="hello", *, from_user_id=111, chat_id=-100, chat_type="group"): + return SimpleNamespace( + message_id=42, + text=text, + caption=None, + entities=[], + caption_entities=[], + message_thread_id=None, + is_topic_message=False, + chat=SimpleNamespace(id=chat_id, type=chat_type, title="Test", is_forum=False), + from_user=SimpleNamespace(id=from_user_id, full_name="Test User", first_name="Test"), + reply_to_message=None, + date=None, + location=None, + photo=None, + video=None, + audio=None, + voice=None, + document=None, + sticker=None, + media_group_id=None, + ) + + +@pytest.mark.asyncio +async def test_unauthorized_user_blocked_before_event_building(): + """Unauthorized user's message should be blocked before _build_message_event.""" + adapter = _make_adapter(allow_from=["222"]) # Only user 222 allowed + + build_called = False + original_build = adapter._build_message_event + + def track_build(*a, **kw): + nonlocal build_called + build_called = True + return original_build(*a, **kw) + + adapter._build_message_event = track_build + + update = SimpleNamespace( + update_id=1, + message=_make_message(from_user_id=111), # User 111 NOT in allow_from + effective_message=None, + ) + + await adapter._handle_text_message(update, SimpleNamespace()) + + assert build_called is False, "build_message_event should not be called for unauthorized user" + + +@pytest.mark.asyncio +async def test_authorized_user_processed_normally(): + """Authorized user's message should pass the auth check and build an event.""" + adapter = _make_adapter(allow_from=["111"]) + + build_called = False + original_build = adapter._build_message_event + + def track_build(*a, **kw): + nonlocal build_called + build_called = True + return original_build(*a, **kw) + + adapter._build_message_event = track_build + + update = SimpleNamespace( + update_id=1, + message=_make_message(from_user_id=111), + effective_message=None, + ) + + await adapter._handle_text_message(update, SimpleNamespace()) + + assert build_called is True, "build_message_event should be called for authorized user" + + +@pytest.mark.asyncio +async def test_channel_post_passes_auth(): + """Messages with no from_user (channel posts) should pass user-level auth.""" + adapter = _make_adapter(allow_from=["111"]) + + build_called = False + original_build = adapter._build_message_event + + def track_build(*a, **kw): + nonlocal build_called + build_called = True + return original_build(*a, **kw) + + adapter._build_message_event = track_build + + msg = _make_message() + msg.from_user = None # Channel post has no sender + + update = SimpleNamespace( + update_id=1, + message=msg, + effective_message=None, + ) + + await adapter._handle_text_message(update, SimpleNamespace()) + + assert build_called is True, "Channel posts should pass user-level auth" + + +@pytest.mark.asyncio +async def test_command_from_unauthorized_user_blocked(): + """Commands from unauthorized users should be blocked.""" + adapter = _make_adapter(allow_from=["222"]) + adapter.handle_message = AsyncMock() + + update = SimpleNamespace( + update_id=1, + message=_make_message(text="/start", from_user_id=111), + effective_message=None, + ) + + await adapter._handle_command(update, SimpleNamespace()) + + adapter.handle_message.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_command_from_authorized_user_processed(): + """Commands from authorized users should be processed.""" + adapter = _make_adapter(allow_from=["111"]) + adapter.handle_message = AsyncMock() + + update = SimpleNamespace( + update_id=1, + message=_make_message(text="/start", from_user_id=111), + effective_message=None, + ) + + await adapter._handle_command(update, SimpleNamespace()) + + adapter.handle_message.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_location_from_unauthorized_user_blocked(): + """Location messages from unauthorized users should be blocked.""" + adapter = _make_adapter(allow_from=["222"]) + + msg = _make_message(from_user_id=111) + msg.text = None + msg.location = SimpleNamespace(latitude=53.3498, longitude=-6.2603) + + update = SimpleNamespace( + update_id=1, + message=msg, + effective_message=None, + ) + + # Should not raise — just silently return + await adapter._handle_location_message(update, SimpleNamespace()) + + +def test_is_user_authorized_from_message_allow_from(): + """_is_user_authorized_from_message should respect adapter-level allow_from.""" + adapter = _make_adapter(allow_from=["111", "222"]) + + msg = _make_message(from_user_id=111) + assert adapter._is_user_authorized_from_message(msg) is True + + msg = _make_message(from_user_id=333) + assert adapter._is_user_authorized_from_message(msg) is False + + +def test_is_user_authorized_from_message_wildcard(): + """_is_user_authorized_from_message should accept wildcard '*'.""" + adapter = _make_adapter(allow_from=["*"]) + + msg = _make_message(from_user_id=999) + assert adapter._is_user_authorized_from_message(msg) is True + + +def test_is_user_authorized_from_message_no_from_user(): + """_is_user_authorized_from_message should return True for messages without from_user.""" + adapter = _make_adapter(allow_from=["111"]) + + msg = _make_message() + msg.from_user = None + assert adapter._is_user_authorized_from_message(msg) is True + + +def test_is_user_authorized_from_message_callback(): + """_is_user_authorized_from_message should use _is_callback_user_authorized.""" + adapter = _make_adapter(callback_auth=lambda uid, **_kw: uid == "555") + + msg = _make_message(from_user_id=555) + assert adapter._is_user_authorized_from_message(msg) is True + + msg = _make_message(from_user_id=666) + assert adapter._is_user_authorized_from_message(msg) is False + + +def test_unknown_dm_with_no_allowlist_passes_to_pairing(monkeypatch): + """Unknown DMs must still reach the gateway pairing flow when no allowlist exists.""" + for key in ( + "TELEGRAM_ALLOWED_USERS", + "TELEGRAM_GROUP_ALLOWED_USERS", + "TELEGRAM_GROUP_ALLOWED_CHATS", + "TELEGRAM_ALLOW_ALL_USERS", + "GATEWAY_ALLOWED_USERS", + "GATEWAY_ALLOW_ALL_USERS", + ): + monkeypatch.delenv(key, raising=False) + + adapter = _make_adapter() + msg = _make_message(from_user_id=111, chat_id=111, chat_type="private") + + assert adapter._is_user_authorized_from_message(msg) is True + + +def test_runner_auth_gets_group_user_allowlist_context(monkeypatch): + """Group user allowlists need a group-shaped source, not a DM-shaped one.""" + monkeypatch.setenv("TELEGRAM_GROUP_ALLOWED_USERS", "111") + seen_sources = [] + + class Runner: + def _is_user_authorized(self, source): + seen_sources.append(source) + return source.chat_type == "group" and source.chat_id == "-100" and source.user_id == "111" + + async def handle(self, event): + return None + + runner = Runner() + adapter = _make_adapter() + adapter._message_handler = runner.handle + msg = _make_message(from_user_id=111, chat_id=-100, chat_type="group") + + assert adapter._is_user_authorized_from_message(msg) is True + assert seen_sources + assert seen_sources[0].chat_type == "group" + assert seen_sources[0].chat_id == "-100" + + +def test_runner_auth_gets_group_chat_allowlist_context(monkeypatch): + """Group chat allowlists need the real chat id before intake drops updates.""" + monkeypatch.setenv("TELEGRAM_GROUP_ALLOWED_CHATS", "-222") + seen_sources = [] + + class Runner: + def _is_user_authorized(self, source): + seen_sources.append(source) + return source.chat_type == "group" and source.chat_id == "-222" + + async def handle(self, event): + return None + + runner = Runner() + adapter = _make_adapter() + adapter._message_handler = runner.handle + msg = _make_message(from_user_id=111, chat_id=-222, chat_type="group") + + assert adapter._is_user_authorized_from_message(msg) is True + assert seen_sources + assert seen_sources[0].chat_type == "group" + assert seen_sources[0].chat_id == "-222" + + +def test_removed_dm_user_blocked_before_pairing_when_allowlist_exists(monkeypatch): + """A user removed from TELEGRAM_ALLOWED_USERS should be blocked at intake.""" + monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "222") + adapter = _make_adapter() + msg = _make_message(from_user_id=111, chat_id=111, chat_type="private") + + assert adapter._is_user_authorized_from_message(msg) is False + + +@pytest.mark.asyncio +async def test_media_from_removed_user_blocked_before_event_building(monkeypatch): + """Removed users must not inject prompt-bearing documents via media handlers.""" + monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "222") + adapter = _make_adapter() + adapter.handle_message = AsyncMock() + + build_called = False + + def track_build(*_args, **_kwargs): + nonlocal build_called + build_called = True + raise AssertionError("media handler built an event for an unauthorized user") + + adapter._build_message_event = track_build + document = SimpleNamespace( + file_name="payload.txt", + mime_type="text/plain", + file_size=42, + get_file=AsyncMock(side_effect=AssertionError("unauthorized document was downloaded")), + ) + msg = _make_message(text=None, from_user_id=111, chat_id=111, chat_type="private") + msg.caption = "please process this caption" + msg.document = document + + update = SimpleNamespace(update_id=1, message=msg, effective_message=None) + + await adapter._handle_media_message(update, SimpleNamespace()) + + assert build_called is False + adapter.handle_message.assert_not_awaited() + document.get_file.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_unmentioned_group_text_from_removed_user_not_observed(): + """Removed users must not persist unmentioned group text into observed context.""" + adapter = _make_adapter( + allow_from=["222"], + allowed_chats=["-100"], + group_allowed_chats=["-100"], + require_mention=True, + observe_unmentioned_group_messages=True, + ) + observed = [] + adapter._observe_unmentioned_group_message = lambda *args, **kwargs: observed.append((args, kwargs)) + + msg = _make_message(text="side chatter", from_user_id=111, chat_id=-100, chat_type="group") + update = SimpleNamespace(update_id=1, message=msg, effective_message=None) + + await adapter._handle_text_message(update, SimpleNamespace()) + + assert observed == [] + + +@pytest.mark.asyncio +async def test_unmentioned_group_location_from_removed_user_not_observed(): + """Removed users must not persist unmentioned group locations into observed context.""" + adapter = _make_adapter( + allow_from=["222"], + allowed_chats=["-100"], + group_allowed_chats=["-100"], + require_mention=True, + observe_unmentioned_group_messages=True, + ) + observed = [] + adapter._observe_unmentioned_group_message = lambda *args, **kwargs: observed.append((args, kwargs)) + + msg = _make_message(text=None, from_user_id=111, chat_id=-100, chat_type="group") + msg.location = SimpleNamespace(latitude=53.3498, longitude=-6.2603) + update = SimpleNamespace(update_id=1, message=msg, effective_message=None) + + await adapter._handle_location_message(update, SimpleNamespace()) + + assert observed == []