fix: route Telegram image documents through photo handling

This commit is contained in:
mrcoferland 2026-05-02 23:51:13 +00:00 committed by Teknium
parent 51f9953e69
commit bd0c54d171
2 changed files with 105 additions and 14 deletions

View file

@ -86,6 +86,22 @@ from gateway.platforms.telegram_network import (
)
from utils import atomic_replace
_TELEGRAM_IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".gif"}
_TELEGRAM_IMAGE_MIME_TO_EXT = {
"image/png": ".png",
"image/jpeg": ".jpg",
"image/jpg": ".jpg",
"image/webp": ".webp",
"image/gif": ".gif",
}
_TELEGRAM_IMAGE_EXT_TO_MIME = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
".gif": "image/gif",
}
def check_telegram_requirements() -> bool:
"""Check if Telegram dependencies are available."""
@ -3239,10 +3255,59 @@ class TelegramAdapter(BasePlatformAdapter):
_, ext = os.path.splitext(original_filename)
ext = ext.lower()
# Normalize mime_type for robust comparisons (some clients send
# uppercase like "IMAGE/PNG").
doc_mime = (doc.mime_type or "").lower()
# If no extension from filename, reverse-lookup from MIME type
if not ext and doc.mime_type:
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
ext = mime_to_ext.get(doc.mime_type, "")
if not ext and doc_mime:
ext = _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, "")
if not ext:
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
ext = mime_to_ext.get(doc_mime, "")
# Check file size early so image documents cannot bypass the
# document size limit by taking the image path.
MAX_DOC_BYTES = 20 * 1024 * 1024
if not doc.file_size or doc.file_size > MAX_DOC_BYTES:
event.text = (
"The document is too large or its size could not be verified. "
"Maximum: 20 MB."
)
logger.info("[Telegram] Document too large: %s bytes", doc.file_size)
await self.handle_message(event)
return
# Telegram may deliver screenshots/photos as documents. If the
# payload is actually an image, route it through the image cache
# and batching path instead of rejecting it as a document.
if ext in _TELEGRAM_IMAGE_EXTENSIONS or doc_mime.startswith("image/"):
file_obj = await doc.get_file()
image_bytes = await file_obj.download_as_bytearray()
image_ext = ext if ext in _TELEGRAM_IMAGE_EXTENSIONS else _TELEGRAM_IMAGE_MIME_TO_EXT.get(doc_mime, ".jpg")
try:
cached_path = cache_image_from_bytes(bytes(image_bytes), ext=image_ext)
except ValueError as e:
logger.warning("[Telegram] Failed to cache image document: %s", e, exc_info=True)
event.text = (
f"Image document '{original_filename or doc_mime or ext or 'unknown'}' "
"could not be read as an image."
)
await self.handle_message(event)
return
event.message_type = MessageType.PHOTO
event.media_urls = [cached_path]
event.media_types = [doc_mime if doc_mime.startswith("image/") else _TELEGRAM_IMAGE_EXT_TO_MIME.get(image_ext, "image/jpeg")]
logger.info("[Telegram] Cached user image-document at %s", cached_path)
media_group_id = getattr(msg, "media_group_id", None)
if media_group_id:
await self._queue_media_group_event(str(media_group_id), event)
else:
batch_key = self._photo_batch_key(event, msg)
self._enqueue_photo_event(batch_key, event)
return
if not ext and doc.mime_type:
video_mime_to_ext = {v: k for k, v in SUPPORTED_VIDEO_TYPES.items()}
@ -3270,17 +3335,6 @@ class TelegramAdapter(BasePlatformAdapter):
await self.handle_message(event)
return
# Check file size (Telegram Bot API limit: 20 MB)
MAX_DOC_BYTES = 20 * 1024 * 1024
if not doc.file_size or doc.file_size > MAX_DOC_BYTES:
event.text = (
"The document is too large or its size could not be verified. "
"Maximum: 20 MB."
)
logger.info("[Telegram] Document too large: %s bytes", doc.file_size)
await self.handle_message(event)
return
# Download and cache
file_obj = await doc.get_file()
doc_bytes = await file_obj.download_as_bytearray()

View file

@ -257,6 +257,43 @@ class TestDocumentDownloadBlock:
assert event.media_urls and event.media_urls[0].endswith("archive.zip")
assert event.media_types == ["application/zip"]
@pytest.mark.asyncio
async def test_png_document_is_routed_as_image(self, adapter):
"""Telegram documents that are really PNGs should use the image path."""
file_obj = _make_file_obj(b"\x89PNG\r\n\x1a\n" + b"\x00" * 16)
doc = _make_document(file_name="screenshot.png", mime_type="image/png", file_size=9, file_obj=file_obj)
msg = _make_message(document=doc)
update = _make_update(msg)
with patch.object(adapter, "_photo_batch_key", return_value="batch-1"), patch.object(
adapter, "_enqueue_photo_event"
) as enqueue_mock:
await adapter._handle_media_message(update, MagicMock())
enqueue_mock.assert_called_once()
event = enqueue_mock.call_args.args[1]
assert event.message_type == MessageType.PHOTO
assert event.media_urls and event.media_urls[0].endswith(".png")
assert event.media_types == ["image/png"]
assert adapter.handle_message.call_count == 0
@pytest.mark.asyncio
async def test_spoofed_png_document_falls_back_with_error(self, adapter):
"""A .png filename with non-image bytes should fail clearly, not disappear."""
file_obj = _make_file_obj(b"not-a-real-image")
doc = _make_document(file_name="spoofed.png", mime_type="image/png", file_size=16, file_obj=file_obj)
msg = _make_message(document=doc)
update = _make_update(msg)
with patch.object(adapter, "_photo_batch_key", return_value="batch-2"), patch.object(
adapter, "_enqueue_photo_event"
) as enqueue_mock:
await adapter._handle_media_message(update, MagicMock())
enqueue_mock.assert_not_called()
event = adapter.handle_message.call_args[0][0]
assert "could not be read as an image" in event.text
@pytest.mark.asyncio
async def test_oversized_file_rejected(self, adapter):
doc = _make_document(file_name="huge.pdf", file_size=25 * 1024 * 1024)