fix(telegram): reject unauthorized users before event construction (#40863)

Removed/unauthorized Telegram users could inject prompt content before the
per-user auth gate fired. The adapter ran `_should_process_message`,
`_build_message_event`, and text/photo batching — and dispatched to the
runner — before `_is_user_authorized()` (gateway/authz_mixin.py) rejected
the sender. Unmentioned group chatter from a removed user was also
persisted into the session transcript via `_observe_unmentioned_group_message`,
leaking into the agent's observed context independent of dispatch.

Add `_is_user_authorized_from_message()` as an intake prefilter that runs
in `_handle_text_message`, `_handle_command`, `_handle_location_message`,
and `_handle_media_message` BEFORE batching, event construction, and the
unmentioned-group observe branch. It reuses the runner's
`_is_user_authorized()` with a correctly-shaped SessionSource (group vs
forum vs dm, real chat_id for TELEGRAM_GROUP_ALLOWED_* allowlists),
falls back to env allowlists, and only rejects when an allowlist actually
exists — unknown DMs with no allowlist still reach the pairing flow.
Channel posts authorize via `sender_chat` identity when `from_user` is
absent.

Co-authored-by: liuhao1024 <sunsky.lau@gmail.com>
Co-authored-by: Carlos Manuel Cejas <carlosmcejas@gmail.com>
This commit is contained in:
teknium1 2026-06-28 02:54:24 -07:00 committed by Teknium
parent 61210097a5
commit c648ecdca5
3 changed files with 569 additions and 0 deletions

View file

@ -559,6 +559,146 @@ class TelegramAdapter(BasePlatformAdapter):
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
return "*" in allowed_ids or normalized_user_id in allowed_ids
def _source_from_message_for_auth(self, message: Message):
"""Build the same Telegram source shape the gateway auth path expects.
Resolves the identity to authorize from ``from_user`` for normal
messages, falling back to ``sender_chat`` for channel posts (which
carry no ``from_user``) so a removed/unauthorized channel cannot
inject content via the broadcast path either.
"""
from gateway.session import SessionSource
user = getattr(message, "from_user", None)
chat = getattr(message, "chat", None)
user_id = str(getattr(user, "id", "")).strip() or None
user_name = (
str(getattr(user, "username", "") or getattr(user, "full_name", "") or "").strip()
or None
)
# Channel posts have no from_user — authorize the sender chat instead.
if not user_id:
sender_chat = getattr(message, "sender_chat", None)
if sender_chat is not None:
user_id = str(getattr(sender_chat, "id", "")).strip() or None
if not user_name:
user_name = (
str(getattr(sender_chat, "title", "") or "").strip() or None
)
chat_id = str(getattr(chat, "id", "")).strip() or user_id
chat_type = str(getattr(chat, "type", "dm")).strip().lower() or "dm"
if chat_type == "private":
chat_type = "dm"
elif chat_type == "supergroup":
thread_id_raw = getattr(message, "message_thread_id", None)
is_topic_message = bool(getattr(message, "is_topic_message", False))
is_forum_group = getattr(chat, "is_forum", False) is True
chat_type = (
"forum"
if thread_id_raw is not None and (is_topic_message or is_forum_group)
else "group"
)
thread_id = None
thread_id_raw = getattr(message, "message_thread_id", None)
if thread_id_raw is not None:
is_topic_message = bool(getattr(message, "is_topic_message", False))
is_forum_group = getattr(chat, "is_forum", False) is True
if chat_type == "forum" and (is_topic_message or is_forum_group):
thread_id = str(thread_id_raw)
elif chat_type == "dm" and is_topic_message:
thread_id = str(thread_id_raw)
return SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id or "",
chat_type=chat_type,
user_id=user_id,
user_name=user_name,
thread_id=thread_id,
)
def _telegram_auth_env_configured(self) -> bool:
"""Return True when Telegram auth env vars make an early decision safe."""
keys = (
"TELEGRAM_ALLOWED_USERS",
"TELEGRAM_GROUP_ALLOWED_USERS",
"TELEGRAM_GROUP_ALLOWED_CHATS",
"TELEGRAM_ALLOW_ALL_USERS",
"GATEWAY_ALLOWED_USERS",
"GATEWAY_ALLOW_ALL_USERS",
)
return any(os.getenv(key, "").strip() for key in keys)
def _is_user_authorized_from_message(self, message: Message) -> bool:
"""Check if the sender of a Telegram message is authorized.
Intake prefilter that runs BEFORE text batching, event construction,
and unmentioned-group observation, so a removed/unauthorized user
cannot inject prompt content into the agent path or the observed
transcript (fixes #40863). It only rejects when it can make the same
context-aware decision the runner would make. Unknown DMs with no
allowlist still pass through so the normal pairing flow can run.
"""
source = self._source_from_message_for_auth(message)
user_id = source.user_id
# No identity at all → genuine group service message (pin, delete,
# new_chat_members, etc.). Defer to the cold path. Channel posts
# without sender_chat already resolved to None above and fall here;
# they carry no authorizable identity, so let the normal
# _should_process_message gating handle them.
if not user_id:
return True
# Adapter-level allow_from: when set, it is the sole authority.
adapter_allow_from = self.config.extra.get("allow_from")
if adapter_allow_from is not None:
allowed = {str(u).strip() for u in adapter_allow_from if str(u).strip()}
return user_id in allowed or "*" in allowed
# Test/custom injection only. The class method named
# _is_callback_user_authorized is for inline button callbacks and must
# not be treated as a user-id-only shortcut for real messages — only
# honor an instance-level override (set in tests).
callback_auth = self.__dict__.get("_is_callback_user_authorized")
if callable(callback_auth):
try:
return bool(
callback_auth(
user_id,
chat_id=source.chat_id,
chat_type=source.chat_type,
thread_id=source.thread_id,
user_name=source.user_name,
)
)
except Exception:
pass
runner = getattr(getattr(self, "_message_handler", None), "__self__", None)
auth_fn = getattr(runner, "_is_user_authorized", None)
if callable(auth_fn):
# Only make an early decision via the runner when an allowlist
# actually exists; otherwise unknown DMs must reach the pairing
# flow rather than being default-denied here.
if not self._telegram_auth_env_configured():
return True
try:
return bool(auth_fn(source))
except Exception:
logger.debug(
"[Telegram] Falling back to env-only auth for user %s",
user_id,
exc_info=True,
)
allowed_csv = os.getenv("TELEGRAM_ALLOWED_USERS", "").strip()
if not allowed_csv:
return True
allowed_ids = {uid.strip() for uid in allowed_csv.split(",") if uid.strip()}
return "*" in allowed_ids or user_id in allowed_ids
@classmethod
def _metadata_thread_id(cls, metadata: Optional[Dict[str, Any]]) -> Optional[str]:
if not metadata:
@ -6567,6 +6707,17 @@ class TelegramAdapter(BasePlatformAdapter):
msg = self._effective_update_message(update)
if not msg or not msg.text:
return
# Early user-level auth check: reject unauthorized users before any
# text batching, observe-buffer persistence, event building, or response
# generation. This prevents removed/blocked users from injecting prompts
# into the agent path or the observed transcript context (#40863).
if not self._is_user_authorized_from_message(msg):
logger.warning(
"[Telegram] Blocked unauthorized user %s in chat %s",
getattr(getattr(msg, "from_user", None), "id", None),
getattr(getattr(msg, "chat", None), "id", None),
)
return
if not self._should_process_message(msg):
if self._should_observe_unmentioned_group_message(msg):
self._observe_unmentioned_group_message(msg, MessageType.TEXT, update_id=update.update_id)
@ -6586,6 +6737,13 @@ class TelegramAdapter(BasePlatformAdapter):
return
if not self._should_process_message(msg, is_command=True):
return
if not self._is_user_authorized_from_message(msg):
logger.warning(
"[Telegram] Blocked unauthorized user %s in chat %s",
getattr(getattr(msg, "from_user", None), "id", None),
getattr(getattr(msg, "chat", None), "id", None),
)
return
await self._ensure_forum_commands(msg)
event = self._build_message_event(msg, MessageType.COMMAND, update_id=update.update_id)
@ -6599,6 +6757,13 @@ class TelegramAdapter(BasePlatformAdapter):
msg = self._effective_update_message(update)
if not msg:
return
if not self._is_user_authorized_from_message(msg):
logger.warning(
"[Telegram] Blocked unauthorized user %s in chat %s",
getattr(getattr(msg, "from_user", None), "id", None),
getattr(getattr(msg, "chat", None), "id", None),
)
return
if not self._should_process_message(msg):
if self._should_observe_unmentioned_group_message(msg):
self._observe_unmentioned_group_message(msg, MessageType.LOCATION, update_id=update.update_id)
@ -6781,6 +6946,13 @@ class TelegramAdapter(BasePlatformAdapter):
"""Handle incoming media messages, downloading images to local cache."""
if not update.message:
return
if not self._is_user_authorized_from_message(update.message):
logger.info(
"[Telegram] Blocked media from unauthorized user %s in chat %s",
getattr(getattr(update.message, "from_user", None), "id", None),
getattr(getattr(update.message, "chat", None), "id", None),
)
return
if not self._should_process_message(update.message):
if self._should_observe_unmentioned_group_message(update.message):
_m = update.message

View file

@ -45,6 +45,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json"
# Auto-extracted from noreply emails + manual overrides
AUTHOR_MAP = {
"carlosmcejas@gmail.com": "cmcejas", # PR #41188 salvage (early Telegram auth gate before event build/observe; #40863)
"ha-agent@homelab.4410.us": "oreoluwa", # PR #49845 salvage (skip preflight content-type probe for OAuth MCP servers so OAuth discovery runs; Akiflow/Hospitable)
"prathamesh290504@gmail.com": "PRATHAMESH75", # PR #37550 salvage (ExecStopPost cgroup-orphan reaper to unblock systemd restart; #37454)
"der@konsi.org": "konsisumer", # PR #19608 salvage (read-modify-write merge in write_credential_pool to preserve concurrently-added credentials; #19566)

View file

@ -0,0 +1,396 @@
"""Tests for Telegram adapter early authorization check.
Verifies that unauthorized users are blocked before any text batching,
event building, or response generation occurs.
"""
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageType
def _make_adapter(allow_from=None, allowed_chats=None, group_allowed_chats=None, callback_auth=None, **extra_overrides):
try:
from plugins.platforms.telegram.adapter import TelegramAdapter
except ModuleNotFoundError: # PR branch before Telegram plugin extraction
from gateway.platforms.telegram import TelegramAdapter
extra = {}
if allow_from is not None:
extra["allow_from"] = allow_from
if allowed_chats is not None:
extra["allowed_chats"] = allowed_chats
if group_allowed_chats is not None:
extra["group_allowed_chats"] = group_allowed_chats
extra.update(extra_overrides)
adapter = object.__new__(TelegramAdapter)
adapter.platform = Platform.TELEGRAM
adapter.config = PlatformConfig(enabled=True, token="fake-token", extra=extra)
adapter._bot = SimpleNamespace(id=999, username="test_bot")
adapter._message_handler = AsyncMock()
adapter._pending_text_batches = {}
adapter._pending_text_batch_tasks = {}
adapter._text_batch_delay_seconds = 0.01
adapter._text_batch_split_delay_seconds = 0.01
adapter._mention_patterns = adapter._compile_mention_patterns()
adapter._forum_lock = asyncio.Lock()
adapter._forum_command_registered = set()
adapter._active_sessions = {}
adapter._pending_messages = {}
if callback_auth is not None:
adapter._is_callback_user_authorized = callback_auth
return adapter
def _make_message(text="hello", *, from_user_id=111, chat_id=-100, chat_type="group"):
return SimpleNamespace(
message_id=42,
text=text,
caption=None,
entities=[],
caption_entities=[],
message_thread_id=None,
is_topic_message=False,
chat=SimpleNamespace(id=chat_id, type=chat_type, title="Test", is_forum=False),
from_user=SimpleNamespace(id=from_user_id, full_name="Test User", first_name="Test"),
reply_to_message=None,
date=None,
location=None,
photo=None,
video=None,
audio=None,
voice=None,
document=None,
sticker=None,
media_group_id=None,
)
@pytest.mark.asyncio
async def test_unauthorized_user_blocked_before_event_building():
"""Unauthorized user's message should be blocked before _build_message_event."""
adapter = _make_adapter(allow_from=["222"]) # Only user 222 allowed
build_called = False
original_build = adapter._build_message_event
def track_build(*a, **kw):
nonlocal build_called
build_called = True
return original_build(*a, **kw)
adapter._build_message_event = track_build
update = SimpleNamespace(
update_id=1,
message=_make_message(from_user_id=111), # User 111 NOT in allow_from
effective_message=None,
)
await adapter._handle_text_message(update, SimpleNamespace())
assert build_called is False, "build_message_event should not be called for unauthorized user"
@pytest.mark.asyncio
async def test_authorized_user_processed_normally():
"""Authorized user's message should pass the auth check and build an event."""
adapter = _make_adapter(allow_from=["111"])
build_called = False
original_build = adapter._build_message_event
def track_build(*a, **kw):
nonlocal build_called
build_called = True
return original_build(*a, **kw)
adapter._build_message_event = track_build
update = SimpleNamespace(
update_id=1,
message=_make_message(from_user_id=111),
effective_message=None,
)
await adapter._handle_text_message(update, SimpleNamespace())
assert build_called is True, "build_message_event should be called for authorized user"
@pytest.mark.asyncio
async def test_channel_post_passes_auth():
"""Messages with no from_user (channel posts) should pass user-level auth."""
adapter = _make_adapter(allow_from=["111"])
build_called = False
original_build = adapter._build_message_event
def track_build(*a, **kw):
nonlocal build_called
build_called = True
return original_build(*a, **kw)
adapter._build_message_event = track_build
msg = _make_message()
msg.from_user = None # Channel post has no sender
update = SimpleNamespace(
update_id=1,
message=msg,
effective_message=None,
)
await adapter._handle_text_message(update, SimpleNamespace())
assert build_called is True, "Channel posts should pass user-level auth"
@pytest.mark.asyncio
async def test_command_from_unauthorized_user_blocked():
"""Commands from unauthorized users should be blocked."""
adapter = _make_adapter(allow_from=["222"])
adapter.handle_message = AsyncMock()
update = SimpleNamespace(
update_id=1,
message=_make_message(text="/start", from_user_id=111),
effective_message=None,
)
await adapter._handle_command(update, SimpleNamespace())
adapter.handle_message.assert_not_awaited()
@pytest.mark.asyncio
async def test_command_from_authorized_user_processed():
"""Commands from authorized users should be processed."""
adapter = _make_adapter(allow_from=["111"])
adapter.handle_message = AsyncMock()
update = SimpleNamespace(
update_id=1,
message=_make_message(text="/start", from_user_id=111),
effective_message=None,
)
await adapter._handle_command(update, SimpleNamespace())
adapter.handle_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_location_from_unauthorized_user_blocked():
"""Location messages from unauthorized users should be blocked."""
adapter = _make_adapter(allow_from=["222"])
msg = _make_message(from_user_id=111)
msg.text = None
msg.location = SimpleNamespace(latitude=53.3498, longitude=-6.2603)
update = SimpleNamespace(
update_id=1,
message=msg,
effective_message=None,
)
# Should not raise — just silently return
await adapter._handle_location_message(update, SimpleNamespace())
def test_is_user_authorized_from_message_allow_from():
"""_is_user_authorized_from_message should respect adapter-level allow_from."""
adapter = _make_adapter(allow_from=["111", "222"])
msg = _make_message(from_user_id=111)
assert adapter._is_user_authorized_from_message(msg) is True
msg = _make_message(from_user_id=333)
assert adapter._is_user_authorized_from_message(msg) is False
def test_is_user_authorized_from_message_wildcard():
"""_is_user_authorized_from_message should accept wildcard '*'."""
adapter = _make_adapter(allow_from=["*"])
msg = _make_message(from_user_id=999)
assert adapter._is_user_authorized_from_message(msg) is True
def test_is_user_authorized_from_message_no_from_user():
"""_is_user_authorized_from_message should return True for messages without from_user."""
adapter = _make_adapter(allow_from=["111"])
msg = _make_message()
msg.from_user = None
assert adapter._is_user_authorized_from_message(msg) is True
def test_is_user_authorized_from_message_callback():
"""_is_user_authorized_from_message should use _is_callback_user_authorized."""
adapter = _make_adapter(callback_auth=lambda uid, **_kw: uid == "555")
msg = _make_message(from_user_id=555)
assert adapter._is_user_authorized_from_message(msg) is True
msg = _make_message(from_user_id=666)
assert adapter._is_user_authorized_from_message(msg) is False
def test_unknown_dm_with_no_allowlist_passes_to_pairing(monkeypatch):
"""Unknown DMs must still reach the gateway pairing flow when no allowlist exists."""
for key in (
"TELEGRAM_ALLOWED_USERS",
"TELEGRAM_GROUP_ALLOWED_USERS",
"TELEGRAM_GROUP_ALLOWED_CHATS",
"TELEGRAM_ALLOW_ALL_USERS",
"GATEWAY_ALLOWED_USERS",
"GATEWAY_ALLOW_ALL_USERS",
):
monkeypatch.delenv(key, raising=False)
adapter = _make_adapter()
msg = _make_message(from_user_id=111, chat_id=111, chat_type="private")
assert adapter._is_user_authorized_from_message(msg) is True
def test_runner_auth_gets_group_user_allowlist_context(monkeypatch):
"""Group user allowlists need a group-shaped source, not a DM-shaped one."""
monkeypatch.setenv("TELEGRAM_GROUP_ALLOWED_USERS", "111")
seen_sources = []
class Runner:
def _is_user_authorized(self, source):
seen_sources.append(source)
return source.chat_type == "group" and source.chat_id == "-100" and source.user_id == "111"
async def handle(self, event):
return None
runner = Runner()
adapter = _make_adapter()
adapter._message_handler = runner.handle
msg = _make_message(from_user_id=111, chat_id=-100, chat_type="group")
assert adapter._is_user_authorized_from_message(msg) is True
assert seen_sources
assert seen_sources[0].chat_type == "group"
assert seen_sources[0].chat_id == "-100"
def test_runner_auth_gets_group_chat_allowlist_context(monkeypatch):
"""Group chat allowlists need the real chat id before intake drops updates."""
monkeypatch.setenv("TELEGRAM_GROUP_ALLOWED_CHATS", "-222")
seen_sources = []
class Runner:
def _is_user_authorized(self, source):
seen_sources.append(source)
return source.chat_type == "group" and source.chat_id == "-222"
async def handle(self, event):
return None
runner = Runner()
adapter = _make_adapter()
adapter._message_handler = runner.handle
msg = _make_message(from_user_id=111, chat_id=-222, chat_type="group")
assert adapter._is_user_authorized_from_message(msg) is True
assert seen_sources
assert seen_sources[0].chat_type == "group"
assert seen_sources[0].chat_id == "-222"
def test_removed_dm_user_blocked_before_pairing_when_allowlist_exists(monkeypatch):
"""A user removed from TELEGRAM_ALLOWED_USERS should be blocked at intake."""
monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "222")
adapter = _make_adapter()
msg = _make_message(from_user_id=111, chat_id=111, chat_type="private")
assert adapter._is_user_authorized_from_message(msg) is False
@pytest.mark.asyncio
async def test_media_from_removed_user_blocked_before_event_building(monkeypatch):
"""Removed users must not inject prompt-bearing documents via media handlers."""
monkeypatch.setenv("TELEGRAM_ALLOWED_USERS", "222")
adapter = _make_adapter()
adapter.handle_message = AsyncMock()
build_called = False
def track_build(*_args, **_kwargs):
nonlocal build_called
build_called = True
raise AssertionError("media handler built an event for an unauthorized user")
adapter._build_message_event = track_build
document = SimpleNamespace(
file_name="payload.txt",
mime_type="text/plain",
file_size=42,
get_file=AsyncMock(side_effect=AssertionError("unauthorized document was downloaded")),
)
msg = _make_message(text=None, from_user_id=111, chat_id=111, chat_type="private")
msg.caption = "please process this caption"
msg.document = document
update = SimpleNamespace(update_id=1, message=msg, effective_message=None)
await adapter._handle_media_message(update, SimpleNamespace())
assert build_called is False
adapter.handle_message.assert_not_awaited()
document.get_file.assert_not_awaited()
@pytest.mark.asyncio
async def test_unmentioned_group_text_from_removed_user_not_observed():
"""Removed users must not persist unmentioned group text into observed context."""
adapter = _make_adapter(
allow_from=["222"],
allowed_chats=["-100"],
group_allowed_chats=["-100"],
require_mention=True,
observe_unmentioned_group_messages=True,
)
observed = []
adapter._observe_unmentioned_group_message = lambda *args, **kwargs: observed.append((args, kwargs))
msg = _make_message(text="side chatter", from_user_id=111, chat_id=-100, chat_type="group")
update = SimpleNamespace(update_id=1, message=msg, effective_message=None)
await adapter._handle_text_message(update, SimpleNamespace())
assert observed == []
@pytest.mark.asyncio
async def test_unmentioned_group_location_from_removed_user_not_observed():
"""Removed users must not persist unmentioned group locations into observed context."""
adapter = _make_adapter(
allow_from=["222"],
allowed_chats=["-100"],
group_allowed_chats=["-100"],
require_mention=True,
observe_unmentioned_group_messages=True,
)
observed = []
adapter._observe_unmentioned_group_message = lambda *args, **kwargs: observed.append((args, kwargs))
msg = _make_message(text=None, from_user_id=111, chat_id=-100, chat_type="group")
msg.location = SimpleNamespace(latitude=53.3498, longitude=-6.2603)
update = SimpleNamespace(update_id=1, message=msg, effective_message=None)
await adapter._handle_location_message(update, SimpleNamespace())
assert observed == []