Fix unsafe gateway media path delivery

This commit is contained in:
Eugeniusz Gilewski 2026-05-22 14:47:23 +02:00 committed by Teknium
parent 4a91e36495
commit 41d2c758c3
10 changed files with 371 additions and 60 deletions

View file

@ -529,7 +529,9 @@ def _send_media_via_adapter(
"""
from pathlib import Path
from gateway.platforms.base import should_send_media_as_audio
from gateway.platforms.base import BasePlatformAdapter, should_send_media_as_audio
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
for media_path, _is_voice in media_files:
try:
@ -614,6 +616,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
# Extract MEDIA: tags so attachments are forwarded as files, not raw text
from gateway.platforms.base import BasePlatformAdapter
media_files, cleaned_delivery_content = BasePlatformAdapter.extract_media(delivery_content)
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
try:
config = load_gateway_config()

View file

@ -472,7 +472,7 @@ sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.session import SessionSource, build_session_key
from hermes_constants import get_hermes_dir
from hermes_constants import get_hermes_dir, get_hermes_home
GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE = (
@ -813,6 +813,86 @@ def cache_video_from_bytes(data: bytes, ext: str = ".mp4") -> str:
# ---------------------------------------------------------------------------
DOCUMENT_CACHE_DIR = get_hermes_dir("cache/documents", "document_cache")
SCREENSHOT_CACHE_DIR = get_hermes_dir("cache/screenshots", "browser_screenshots")
_HERMES_HOME = get_hermes_home()
MEDIA_DELIVERY_ALLOW_DIRS_ENV = "HERMES_MEDIA_ALLOW_DIRS"
MEDIA_DELIVERY_SAFE_ROOTS = (
IMAGE_CACHE_DIR,
AUDIO_CACHE_DIR,
VIDEO_CACHE_DIR,
DOCUMENT_CACHE_DIR,
SCREENSHOT_CACHE_DIR,
_HERMES_HOME / "image_cache",
_HERMES_HOME / "audio_cache",
_HERMES_HOME / "video_cache",
_HERMES_HOME / "document_cache",
_HERMES_HOME / "browser_screenshots",
)
def _media_delivery_allowed_roots() -> List[Path]:
"""Return roots from which model-emitted local media may be delivered."""
roots = [Path(root) for root in MEDIA_DELIVERY_SAFE_ROOTS]
extra_roots = os.environ.get(MEDIA_DELIVERY_ALLOW_DIRS_ENV, "")
for chunk in extra_roots.split(os.pathsep):
for raw_root in chunk.split(","):
raw_root = raw_root.strip()
if not raw_root:
continue
root = Path(os.path.expanduser(raw_root))
if root.is_absolute():
roots.append(root)
return roots
def _path_is_within(path: Path, root: Path) -> bool:
try:
path.relative_to(root)
return True
except ValueError:
return False
def validate_media_delivery_path(path: str) -> Optional[str]:
"""Return a safe absolute file path for native media delivery, else None.
MEDIA tags and bare local paths in model output are untrusted text. Only
existing regular files under Hermes-managed media caches, or roots the
operator explicitly allowlists, may be uploaded as native attachments.
Symlinks are resolved before the containment check.
"""
if not path:
return None
candidate = str(path).strip()
if len(candidate) >= 2 and candidate[0] == candidate[-1] and candidate[0] in "`\"'":
candidate = candidate[1:-1].strip()
candidate = candidate.lstrip("`\"'").rstrip("`\"',.;:)}]")
if not candidate:
return None
expanded = Path(os.path.expanduser(candidate))
if not expanded.is_absolute():
return None
try:
resolved = expanded.resolve(strict=True)
except (OSError, RuntimeError, ValueError):
return None
if not resolved.is_file():
return None
for root in _media_delivery_allowed_roots():
try:
resolved_root = root.expanduser().resolve(strict=False)
except (OSError, RuntimeError, ValueError):
continue
if _path_is_within(resolved, resolved_root):
return str(resolved)
return None
SUPPORTED_DOCUMENT_TYPES = {
".pdf": "application/pdf",
@ -2119,6 +2199,35 @@ class BasePlatformAdapter(ABC):
text = f"{caption}\n{text}"
return await self.send(chat_id=chat_id, content=text, reply_to=reply_to, metadata=metadata)
@staticmethod
def validate_media_delivery_path(path: str) -> Optional[str]:
"""Return a resolved path if it is safe for native attachment upload."""
return validate_media_delivery_path(path)
@staticmethod
def filter_media_delivery_paths(media_files) -> List[Tuple[str, bool]]:
"""Drop unsafe MEDIA paths and normalize accepted paths."""
safe_media: List[Tuple[str, bool]] = []
for media_path, is_voice in media_files or []:
safe_path = validate_media_delivery_path(str(media_path))
if safe_path:
safe_media.append((safe_path, bool(is_voice)))
else:
logger.warning("Skipping unsafe MEDIA directive path outside allowed roots")
return safe_media
@staticmethod
def filter_local_delivery_paths(file_paths) -> List[str]:
"""Drop unsafe bare local file paths and normalize accepted paths."""
safe_paths: List[str] = []
for file_path in file_paths or []:
safe_path = validate_media_delivery_path(str(file_path))
if safe_path:
safe_paths.append(safe_path)
else:
logger.warning("Skipping unsafe local file path outside allowed roots")
return safe_paths
@staticmethod
def extract_media(content: str) -> Tuple[List[Tuple[str, bool]], str]:
"""
@ -3166,6 +3275,7 @@ class BasePlatformAdapter(ABC):
# Extract MEDIA:<path> tags (from TTS tool) before other processing
media_files, response = self.extract_media(response)
media_files = self.filter_media_delivery_paths(media_files)
# Extract image URLs and send them as native platform attachments
images, text_content = self.extract_images(response)
@ -3179,6 +3289,7 @@ class BasePlatformAdapter(ABC):
# Auto-detect bare local file paths for native media delivery
# (helps small models that don't use MEDIA: syntax)
local_files, text_content = self.extract_local_files(text_content)
local_files = self.filter_local_delivery_paths(local_files)
if local_files:
logger.info("[%s] extract_local_files found %d file(s) in response", self.name, len(local_files))

View file

@ -1679,8 +1679,10 @@ class WeixinAdapter(BasePlatformAdapter):
# Extract MEDIA: tags and bare local file paths before text delivery.
media_files, cleaned_content = self.extract_media(content)
media_files = self.filter_media_delivery_paths(media_files)
_, image_cleaned = self.extract_images(cleaned_content)
local_files, final_content = self.extract_local_files(image_cleaned)
local_files = self.filter_local_delivery_paths(local_files)
_AUDIO_EXTS = {".ogg", ".opus", ".mp3", ".wav", ".m4a", ".flac"}
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".3gp"}

View file

@ -5058,6 +5058,11 @@ class GatewayRunner:
if not candidates:
return
from gateway.platforms.base import BasePlatformAdapter
candidates = BasePlatformAdapter.filter_local_delivery_paths(candidates)
if not candidates:
return
_IMAGE_EXTS = {".png", ".jpg", ".jpeg", ".gif", ".webp"}
_VIDEO_EXTS = {".mp4", ".mov", ".avi", ".mkv", ".webm", ".3gp"}
@ -11264,14 +11269,16 @@ class GatewayRunner:
# send_multiple_images (Telegram sendPhoto recompresses to ~1280px).
force_document_attachments = "[[as_document]]" in response
from gateway.platforms.base import BasePlatformAdapter, should_send_media_as_audio
media_files, _ = adapter.extract_media(response)
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
_, cleaned = adapter.extract_images(response)
local_files, _ = adapter.extract_local_files(cleaned)
local_files = BasePlatformAdapter.filter_local_delivery_paths(local_files)
_thread_meta = self._thread_metadata_for_source(event.source, self._reply_anchor_for_event(event))
from gateway.platforms.base import should_send_media_as_audio
_VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'}
_IMAGE_EXTS = {'.jpg', '.jpeg', '.png', '.webp', '.gif'}
@ -11563,6 +11570,8 @@ class GatewayRunner:
# Extract media files from the response
if response:
media_files, response = adapter.extract_media(response)
from gateway.platforms.base import BasePlatformAdapter
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
images, text_content = adapter.extract_images(response)
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")

View file

@ -490,6 +490,17 @@ class TestRoutingIntents:
class TestDeliverResultWrapping:
"""Verify that cron deliveries are wrapped with header/footer and no longer mirrored."""
def _safe_media_path(self, tmp_path, monkeypatch, name, data=b"media"):
root = tmp_path / "media-cache"
media_file = root / name
media_file.parent.mkdir(parents=True, exist_ok=True)
media_file.write_bytes(data)
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
(root,),
)
return media_file.resolve()
def test_delivery_wraps_content_with_header_and_footer(self):
"""Delivered content should include task name header and agent-invisible note."""
from gateway.config import Platform
@ -564,9 +575,10 @@ class TestDeliverResultWrapping:
assert "Cronjob Response" not in sent_content
assert "The agent cannot see" not in sent_content
def test_delivery_extracts_media_tags_before_send(self):
def test_delivery_extracts_media_tags_before_send(self, tmp_path, monkeypatch):
"""Cron delivery should pass MEDIA attachments separately to the send helper."""
from gateway.config import Platform
media_path = self._safe_media_path(tmp_path, monkeypatch, "test-voice.ogg")
pconfig = MagicMock()
pconfig.enabled = True
@ -581,7 +593,7 @@ class TestDeliverResultWrapping:
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Title\nMEDIA:/tmp/test-voice.ogg")
_deliver_result(job, f"Title\nMEDIA:{media_path}")
send_mock.assert_called_once()
args, kwargs = send_mock.call_args
@ -589,14 +601,15 @@ class TestDeliverResultWrapping:
assert "MEDIA:" not in args[3]
assert "Title" in args[3]
# Media files should be forwarded separately
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
assert kwargs["media_files"] == [(str(media_path), False)]
def test_live_adapter_sends_media_as_attachments(self):
def test_live_adapter_sends_media_as_attachments(self, tmp_path, monkeypatch):
"""When a live adapter is available, MEDIA files should be sent as native
platform attachments (e.g., Discord voice, Telegram audio) rather than
as literal 'MEDIA:/path' text."""
from gateway.config import Platform
from concurrent.futures import Future
media_path = self._safe_media_path(tmp_path, monkeypatch, "cron-voice.mp3")
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
@ -628,7 +641,7 @@ class TestDeliverResultWrapping:
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Here is TTS\nMEDIA:/tmp/cron-voice.mp3",
f"Here is TTS\nMEDIA:{media_path}",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
@ -642,12 +655,13 @@ class TestDeliverResultWrapping:
# Audio file should be sent as a voice attachment
adapter.send_voice.assert_called_once()
voice_call = adapter.send_voice.call_args
assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3"
assert voice_call[1]["audio_path"] == str(media_path)
def test_live_adapter_routes_image_to_send_image_file(self):
def test_live_adapter_routes_image_to_send_image_file(self, tmp_path, monkeypatch):
"""Image MEDIA files should be routed to send_image_file, not send_voice."""
from gateway.config import Platform
from concurrent.futures import Future
media_path = self._safe_media_path(tmp_path, monkeypatch, "chart.png")
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
@ -678,19 +692,20 @@ class TestDeliverResultWrapping:
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Chart attached\nMEDIA:/tmp/chart.png",
f"Chart attached\nMEDIA:{media_path}",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
adapter.send_image_file.assert_called_once()
assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png"
assert adapter.send_image_file.call_args[1]["image_path"] == str(media_path)
adapter.send_voice.assert_not_called()
def test_live_adapter_media_only_no_text(self):
def test_live_adapter_media_only_no_text(self, tmp_path, monkeypatch):
"""When content is ONLY a MEDIA tag with no text, media should still be sent."""
from gateway.config import Platform
from concurrent.futures import Future
media_path = self._safe_media_path(tmp_path, monkeypatch, "voice.ogg")
adapter = AsyncMock()
adapter.send_voice.return_value = MagicMock(success=True)
@ -720,7 +735,7 @@ class TestDeliverResultWrapping:
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg",
f"[[audio_as_voice]]\nMEDIA:{media_path}",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
@ -2164,43 +2179,56 @@ class TestBuildJobPromptBumpUse:
class TestSendMediaViaAdapter:
"""Unit tests for _send_media_via_adapter — routes files to typed adapter methods."""
def _safe_media_path(self, tmp_path, monkeypatch, name, data=b"media"):
root = tmp_path / "media-cache"
media_file = root / name
media_file.parent.mkdir(parents=True, exist_ok=True)
media_file.write_bytes(data)
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
(root,),
)
return media_file.resolve()
@staticmethod
def _run_with_loop(adapter, chat_id, media_files, metadata, job):
"""Helper: run _send_media_via_adapter with a real running event loop."""
import asyncio
import threading
"""Helper: run _send_media_via_adapter with immediate scheduling."""
from concurrent.futures import Future
loop = asyncio.new_event_loop()
t = threading.Thread(target=loop.run_forever, daemon=True)
t.start()
try:
_send_media_via_adapter(adapter, chat_id, media_files, metadata, loop, job)
finally:
loop.call_soon_threadsafe(loop.stop)
t.join(timeout=5)
loop.close()
def fake_run_coro(coro, _loop):
coro.close()
completed = Future()
completed.set_result(MagicMock(success=True))
return completed
def test_video_dispatched_to_send_video(self):
with patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_send_media_via_adapter(adapter, chat_id, media_files, metadata, MagicMock(), job)
def test_video_dispatched_to_send_video(self, tmp_path, monkeypatch):
adapter = MagicMock()
adapter.send_video = AsyncMock()
media_files = [("/tmp/clip.mp4", False)]
media_path = self._safe_media_path(tmp_path, monkeypatch, "clip.mp4")
media_files = [(str(media_path), False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j1"})
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/clip.mp4"
assert adapter.send_video.call_args[1]["video_path"] == str(media_path)
def test_unknown_ext_dispatched_to_send_document(self):
def test_unknown_ext_dispatched_to_send_document(self, tmp_path, monkeypatch):
adapter = MagicMock()
adapter.send_document = AsyncMock()
media_files = [("/tmp/report.pdf", False)]
media_path = self._safe_media_path(tmp_path, monkeypatch, "report.pdf")
media_files = [(str(media_path), False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j2"})
adapter.send_document.assert_called_once()
assert adapter.send_document.call_args[1]["file_path"] == "/tmp/report.pdf"
assert adapter.send_document.call_args[1]["file_path"] == str(media_path)
def test_multiple_media_files_all_delivered(self):
def test_multiple_media_files_all_delivered(self, tmp_path, monkeypatch):
adapter = MagicMock()
adapter.send_voice = AsyncMock()
adapter.send_image_file = AsyncMock()
media_files = [("/tmp/voice.mp3", False), ("/tmp/photo.jpg", False)]
voice_path = self._safe_media_path(tmp_path, monkeypatch, "voice.mp3")
photo_path = self._safe_media_path(tmp_path, monkeypatch, "photo.jpg")
media_files = [(str(voice_path), False), (str(photo_path), False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()
@ -2462,7 +2490,7 @@ class TestSendMediaTimeoutCancelsFuture:
in-flight coroutine must be cancelled before the next file is tried.
"""
def test_media_send_timeout_cancels_future_and_continues(self):
def test_media_send_timeout_cancels_future_and_continues(self, tmp_path, monkeypatch):
"""End-to-end: _send_media_via_adapter with a future whose .result()
raises TimeoutError. Assert cancel() fires and the loop proceeds
to the next file rather than hanging or crashing."""
@ -2493,9 +2521,19 @@ class TestSendMediaTimeoutCancelsFuture:
coro.close()
return next(futures_iter)
root = tmp_path / "media-cache"
slow = root / "slow.png"
fast = root / "fast.mp4"
slow.parent.mkdir(parents=True)
slow.write_bytes(b"slow")
fast.write_bytes(b"fast")
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
(root,),
)
media_files = [
("/tmp/slow.png", False), # times out
("/tmp/fast.mp4", False), # succeeds
(str(slow), False), # times out
(str(fast), False), # succeeds
]
loop = MagicMock()
@ -2509,4 +2547,4 @@ class TestSendMediaTimeoutCancelsFuture:
assert timeout_cancel_calls == [True], "future.cancel() must fire on TimeoutError"
# 2. Second file still got dispatched — one timeout doesn't abort the batch
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/fast.mp4"
assert adapter.send_video.call_args[1]["video_path"] == str(fast.resolve())

View file

@ -361,6 +361,72 @@ class TestExtractMedia:
assert "[[as_document]]" not in cleaned
class TestMediaDeliveryPathValidation:
def _patch_roots(self, monkeypatch, *roots):
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
tuple(roots),
)
def test_allows_existing_file_inside_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
media_file = root / "voice.ogg"
media_file.parent.mkdir(parents=True)
media_file.write_bytes(b"OggS")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(media_file)) == str(media_file.resolve())
def test_rejects_existing_file_outside_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
root.mkdir()
secret = tmp_path / "secrets.txt"
secret.write_text("not for upload")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_rejects_symlink_escape_from_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
root.mkdir()
secret = tmp_path / "outside.png"
secret.write_bytes(b"secret")
link = root / "safe-looking.png"
try:
link.symlink_to(secret)
except OSError:
pytest.skip("symlink creation is unavailable")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(link)) is None
def test_filter_keeps_safe_media_and_drops_unsafe(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
safe = root / "speech.ogg"
unsafe = tmp_path / "outside.ogg"
safe.parent.mkdir(parents=True)
safe.write_bytes(b"OggS")
unsafe.write_bytes(b"OggS")
self._patch_roots(monkeypatch, root)
filtered = BasePlatformAdapter.filter_media_delivery_paths([
(str(unsafe), False),
(str(safe), True),
])
assert filtered == [(str(safe.resolve()), True)]
def test_allows_operator_configured_extra_root(self, tmp_path, monkeypatch):
extra_root = tmp_path / "operator-media"
media_file = extra_root / "report.pdf"
media_file.parent.mkdir(parents=True)
media_file.write_bytes(b"%PDF-1.4")
self._patch_roots(monkeypatch)
monkeypatch.setenv("HERMES_MEDIA_ALLOW_DIRS", str(extra_root))
assert BasePlatformAdapter.validate_media_delivery_path(str(media_file)) == str(media_file.resolve())
# ---------------------------------------------------------------------------
# should_send_media_as_audio
# ---------------------------------------------------------------------------
@ -728,4 +794,3 @@ class TestProxyKwargsForAiohttp:
sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
assert sess_kw == {}
assert req_kw == {"proxy": "http://proxy:8080"}

View file

@ -50,11 +50,24 @@ def _event(thread_id=None):
)
def _allowed_media_path(tmp_path, monkeypatch, name):
root = tmp_path / "media-cache"
media_file = root / name
media_file.parent.mkdir(parents=True, exist_ok=True)
media_file.write_bytes(b"media")
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
(root,),
)
return media_file.resolve()
@pytest.mark.asyncio
async def test_base_adapter_routes_telegram_flac_media_tag_to_document_sender():
async def test_base_adapter_routes_telegram_flac_media_tag_to_document_sender(tmp_path, monkeypatch):
adapter = _MediaRoutingAdapter()
event = _event()
adapter._message_handler = AsyncMock(return_value="MEDIA:/tmp/speech.flac")
media_file = _allowed_media_path(tmp_path, monkeypatch, "speech.flac")
adapter._message_handler = AsyncMock(return_value=f"MEDIA:{media_file}")
adapter.send_voice = AsyncMock(return_value=SendResult(success=True, message_id="voice"))
adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc"))
@ -62,17 +75,18 @@ async def test_base_adapter_routes_telegram_flac_media_tag_to_document_sender():
adapter.send_document.assert_awaited_once_with(
chat_id="chat-1",
file_path="/tmp/speech.flac",
file_path=str(media_file),
metadata=None,
)
adapter.send_voice.assert_not_awaited()
@pytest.mark.asyncio
async def test_base_adapter_routes_non_voice_telegram_ogg_media_tag_to_document_sender():
async def test_base_adapter_routes_non_voice_telegram_ogg_media_tag_to_document_sender(tmp_path, monkeypatch):
adapter = _MediaRoutingAdapter()
event = _event()
adapter._message_handler = AsyncMock(return_value="MEDIA:/tmp/speech.ogg")
media_file = _allowed_media_path(tmp_path, monkeypatch, "speech.ogg")
adapter._message_handler = AsyncMock(return_value=f"MEDIA:{media_file}")
adapter.send_voice = AsyncMock(return_value=SendResult(success=True, message_id="voice"))
adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc"))
@ -80,18 +94,19 @@ async def test_base_adapter_routes_non_voice_telegram_ogg_media_tag_to_document_
adapter.send_document.assert_awaited_once_with(
chat_id="chat-1",
file_path="/tmp/speech.ogg",
file_path=str(media_file),
metadata=None,
)
adapter.send_voice.assert_not_awaited()
@pytest.mark.asyncio
async def test_base_adapter_routes_voice_tagged_telegram_ogg_media_tag_to_voice_sender():
async def test_base_adapter_routes_voice_tagged_telegram_ogg_media_tag_to_voice_sender(tmp_path, monkeypatch):
adapter = _MediaRoutingAdapter()
event = _event()
media_file = _allowed_media_path(tmp_path, monkeypatch, "speech.ogg")
adapter._message_handler = AsyncMock(
return_value="[[audio_as_voice]]\nMEDIA:/tmp/speech.ogg"
return_value=f"[[audio_as_voice]]\nMEDIA:{media_file}"
)
adapter.send_voice = AsyncMock(return_value=SendResult(success=True, message_id="voice"))
adapter.send_document = AsyncMock(return_value=SendResult(success=True, message_id="doc"))
@ -100,7 +115,7 @@ async def test_base_adapter_routes_voice_tagged_telegram_ogg_media_tag_to_voice_
adapter.send_voice.assert_awaited_once_with(
chat_id="chat-1",
audio_path="/tmp/speech.ogg",
audio_path=str(media_file),
metadata=None,
)
adapter.send_document.assert_not_awaited()
@ -117,8 +132,9 @@ def _fake_runner(thread_meta):
@pytest.mark.asyncio
async def test_streaming_delivery_routes_telegram_flac_media_tag_to_document_sender():
async def test_streaming_delivery_routes_telegram_flac_media_tag_to_document_sender(tmp_path, monkeypatch):
event = _event(thread_id="topic-1")
media_file = _allowed_media_path(tmp_path, monkeypatch, "speech.flac")
adapter = SimpleNamespace(
name="test",
extract_media=BasePlatformAdapter.extract_media,
@ -132,22 +148,23 @@ async def test_streaming_delivery_routes_telegram_flac_media_tag_to_document_sen
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
"MEDIA:/tmp/speech.flac",
f"MEDIA:{media_file}",
event,
adapter,
)
adapter.send_document.assert_awaited_once_with(
chat_id="chat-1",
file_path="/tmp/speech.flac",
file_path=str(media_file),
metadata={"thread_id": "topic-1"},
)
adapter.send_voice.assert_not_awaited()
@pytest.mark.asyncio
async def test_streaming_delivery_routes_non_voice_telegram_ogg_media_tag_to_document_sender():
async def test_streaming_delivery_routes_non_voice_telegram_ogg_media_tag_to_document_sender(tmp_path, monkeypatch):
event = _event(thread_id="topic-1")
media_file = _allowed_media_path(tmp_path, monkeypatch, "speech.ogg")
adapter = SimpleNamespace(
name="test",
extract_media=BasePlatformAdapter.extract_media,
@ -161,24 +178,25 @@ async def test_streaming_delivery_routes_non_voice_telegram_ogg_media_tag_to_doc
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
"MEDIA:/tmp/speech.ogg",
f"MEDIA:{media_file}",
event,
adapter,
)
adapter.send_document.assert_awaited_once_with(
chat_id="chat-1",
file_path="/tmp/speech.ogg",
file_path=str(media_file),
metadata={"thread_id": "topic-1"},
)
adapter.send_voice.assert_not_awaited()
@pytest.mark.asyncio
async def test_streaming_delivery_routes_telegram_mp3_media_tag_to_voice_sender():
async def test_streaming_delivery_routes_telegram_mp3_media_tag_to_voice_sender(tmp_path, monkeypatch):
"""MP3 audio on Telegram must go through send_voice (which routes to
sendAudio internally); Telegram accepts MP3 for the audio player."""
event = _event(thread_id="topic-1")
media_file = _allowed_media_path(tmp_path, monkeypatch, "speech.mp3")
adapter = SimpleNamespace(
name="test",
extract_media=BasePlatformAdapter.extract_media,
@ -192,14 +210,47 @@ async def test_streaming_delivery_routes_telegram_mp3_media_tag_to_voice_sender(
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
"MEDIA:/tmp/speech.mp3",
f"MEDIA:{media_file}",
event,
adapter,
)
adapter.send_voice.assert_awaited_once_with(
chat_id="chat-1",
audio_path="/tmp/speech.mp3",
audio_path=str(media_file),
metadata={"thread_id": "topic-1"},
)
adapter.send_document.assert_not_awaited()
@pytest.mark.asyncio
async def test_streaming_delivery_blocks_media_path_outside_allowed_roots(tmp_path, monkeypatch):
event = _event(thread_id="topic-1")
allowed_root = tmp_path / "media-cache"
allowed_root.mkdir()
secret = tmp_path / "outside.pdf"
secret.write_bytes(b"%PDF secret")
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
(allowed_root,),
)
adapter = SimpleNamespace(
name="test",
extract_media=BasePlatformAdapter.extract_media,
extract_images=BasePlatformAdapter.extract_images,
extract_local_files=BasePlatformAdapter.extract_local_files,
send_voice=AsyncMock(return_value=SendResult(success=True, message_id="voice")),
send_document=AsyncMock(return_value=SendResult(success=True, message_id="doc")),
send_image_file=AsyncMock(return_value=SendResult(success=True, message_id="image")),
send_video=AsyncMock(return_value=SendResult(success=True, message_id="video")),
)
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
f"MEDIA:{secret}",
event,
adapter,
)
adapter.send_document.assert_not_awaited()
adapter.send_voice.assert_not_awaited()

View file

@ -377,6 +377,37 @@ class TestSendMessageTool:
user_id="user-123",
)
def test_media_tag_outside_allowed_roots_is_not_sent(self, tmp_path):
config, telegram_cfg = _make_config()
secret = tmp_path / "secret.pdf"
secret.write_bytes(b"%PDF secret")
with patch("gateway.config.load_gateway_config", return_value=config), \
patch("tools.interrupt.is_interrupted", return_value=False), \
patch("model_tools._run_async", side_effect=_run_async_immediately), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("gateway.mirror.mirror_to_session", return_value=True):
result = json.loads(
send_message_tool(
{
"action": "send",
"target": "telegram:12345",
"message": f"hello\nMEDIA:{secret}",
}
)
)
assert result["success"] is True
send_mock.assert_awaited_once_with(
Platform.TELEGRAM,
telegram_cfg,
"12345",
"hello",
thread_id=None,
media_files=[],
force_document=False,
)
def test_top_level_send_failure_redacts_query_token(self):
config, _telegram_cfg = _make_config()
leaked = "very-secret-query-token-123456"
@ -2652,4 +2683,3 @@ class TestSendTelegramThreadNotFoundRetry:
finally:
if media_path and os.path.exists(media_path):
os.unlink(media_path)

View file

@ -139,7 +139,7 @@ SEND_MESSAGE_SCHEMA = {
},
"message": {
"type": "string",
"description": "The message text to send. To send an image or file, include MEDIA:<local_path> (e.g. 'MEDIA:/tmp/hermes/cache/img_xxx.jpg') in the message — the platform will deliver it as a native media attachment."
"description": "The message text to send. To send an image or file, include MEDIA:<local_path> for a file under a Hermes media cache or HERMES_MEDIA_ALLOW_DIRS — the platform will deliver it as a native media attachment."
}
},
"required": []
@ -251,6 +251,7 @@ def _handle_send(args):
force_document_attachments = "[[as_document]]" in message
media_files, cleaned_message = BasePlatformAdapter.extract_media(message)
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
mirror_text = cleaned_message.strip() or _describe_media_for_mirror(media_files)
used_home_channel = False

View file

@ -472,6 +472,7 @@ async def _handle_yb_send_dm(args, **kw):
embedded_media, message = BasePlatformAdapter.extract_media(message)
if embedded_media:
media_files.extend(embedded_media)
media_files = BasePlatformAdapter.filter_media_delivery_paths(media_files)
return tool_result(await send_dm(
group_code=group_code, name=args.get("name", ""),