hermes-agent/tests/gateway/test_platform_base.py
Teknium a989a79c0c
fix(gateway): allow native delivery of freshly-produced agent files (#32060)
The gateway's media delivery allowlist required files live inside
`~/.hermes/cache/{documents,images,...}`, which is the wrong shape for
real agent usage. Agents naturally produce artifacts via terminal tools
(`pandoc -o /tmp/report.pdf`, `matplotlib savefig`, etc.) or
write_file into project directories — these never land under the cache.
Result: users got a raw file path in chat instead of an attachment.

This is doubly bad in deployment shapes where the cache directories
aren't writable by the agent at all: Hermes running in Docker with a
read-only mount, or with a Docker/Modal/SSH terminal backend whose
filesystem isn't the gateway host's filesystem.

Layered trust model:

1. Cache-dir allowlist (unchanged) — Hermes-managed roots always trusted.
2. Operator allowlist — `HERMES_MEDIA_ALLOW_DIRS` env var, now also
   surfaced as `gateway.media_delivery_allow_dirs` in config.yaml.
3. Recency-based trust (new, default on) — files whose mtime is within
   `gateway.trust_recent_files_seconds` (default 600s) of "now" are
   trusted even outside the cache/operator allowlist. Old host files
   (`/etc/passwd`, `~/.bashrc`, `~/.ssh/id_rsa`) have mtimes measured
   in days/months, well outside the window — prompt-injection paths
   pointing at pre-existing files are still rejected.
4. Hard denylist — `/etc`, `/proc`, `/sys`, `/dev`, `/root`, `/boot`,
   `/var/{log,lib,run}`, plus `$HOME/.{ssh,aws,gnupg,kube,docker,config,
   azure,gcloud}` and `Library/Keychains`. Denylist blocks delivery
   even when recency would trust the file, in case an attacker
   somehow refreshes a sensitive file's mtime.

Operators who want strict-allowlist behavior set
`gateway.trust_recent_files: false` and the system reverts to
pre-existing behavior.

Tests: 6 new cases in test_platform_base.py cover the recency window,
disabled mode, system-path denylist, and the motivating PDF-in-project
scenario. 3 existing tests (test_platform_base, test_tts_media_routing,
test_send_message_tool) that exercised the strict-allowlist path are
updated to disable recency trust explicitly.

E2E validation: real `validate_media_delivery_path()` accepts fresh
PDFs in /tmp and project dirs, rejects /etc/passwd, ~/.ssh/id_rsa, and
files older than the window; config.yaml `gateway.*` keys bridge
correctly to the env vars the validator reads.
2026-05-25 05:34:31 -07:00

905 lines
37 KiB
Python

"""Tests for gateway/platforms/base.py — MessageEvent, media extraction, message truncation."""
import os
import time
from unittest.mock import patch
import pytest
from gateway.platforms.base import (
BasePlatformAdapter,
GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE,
MessageEvent,
MessageType,
safe_url_for_log,
utf16_len,
_prefix_within_utf16_limit,
)
class TestSecretCaptureGuidance:
def test_gateway_secret_capture_message_points_to_local_setup(self):
message = GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
assert "local cli" in message.lower()
assert "~/.hermes/.env" in message
class TestSafeUrlForLog:
def test_strips_query_fragment_and_userinfo(self):
url = (
"https://user:pass@example.com/private/path/image.png"
"?X-Amz-Signature=supersecret&token=abc#frag"
)
result = safe_url_for_log(url)
assert result == "https://example.com/.../image.png"
assert "supersecret" not in result
assert "token=abc" not in result
assert "user:pass@" not in result
def test_truncates_long_values(self):
long_url = "https://example.com/" + ("a" * 300)
result = safe_url_for_log(long_url, max_len=40)
assert len(result) == 40
assert result.endswith("...")
def test_handles_small_and_non_positive_max_len(self):
url = "https://example.com/very/long/path/file.png?token=secret"
assert safe_url_for_log(url, max_len=3) == "..."
assert safe_url_for_log(url, max_len=2) == ".."
assert safe_url_for_log(url, max_len=0) == ""
# ---------------------------------------------------------------------------
# MessageEvent — command parsing
# ---------------------------------------------------------------------------
class TestMessageEventIsCommand:
def test_slash_command(self):
event = MessageEvent(text="/new")
assert event.is_command() is True
def test_regular_text(self):
event = MessageEvent(text="hello world")
assert event.is_command() is False
def test_empty_text(self):
event = MessageEvent(text="")
assert event.is_command() is False
def test_slash_only(self):
event = MessageEvent(text="/")
assert event.is_command() is True
class TestMessageEventGetCommand:
def test_simple_command(self):
event = MessageEvent(text="/new")
assert event.get_command() == "new"
def test_command_with_args(self):
event = MessageEvent(text="/reset session")
assert event.get_command() == "reset"
def test_not_a_command(self):
event = MessageEvent(text="hello")
assert event.get_command() is None
def test_command_is_lowercased(self):
event = MessageEvent(text="/HELP")
assert event.get_command() == "help"
def test_slash_only_returns_empty(self):
event = MessageEvent(text="/")
assert event.get_command() == ""
def test_command_with_at_botname(self):
event = MessageEvent(text="/new@TigerNanoBot")
assert event.get_command() == "new"
def test_command_with_at_botname_and_args(self):
event = MessageEvent(text="/compress@TigerNanoBot")
assert event.get_command() == "compress"
def test_command_mixed_case_with_at_botname(self):
event = MessageEvent(text="/RESET@TigerNanoBot")
assert event.get_command() == "reset"
class TestMessageEventGetCommandArgs:
def test_command_with_args(self):
event = MessageEvent(text="/new session id 123")
assert event.get_command_args() == "session id 123"
def test_command_without_args(self):
event = MessageEvent(text="/new")
assert event.get_command_args() == ""
def test_not_a_command_returns_full_text(self):
event = MessageEvent(text="hello world")
assert event.get_command_args() == "hello world"
# ---------------------------------------------------------------------------
# extract_images
# ---------------------------------------------------------------------------
class TestExtractImages:
def test_no_images(self):
images, cleaned = BasePlatformAdapter.extract_images("Just regular text.")
assert images == []
assert cleaned == "Just regular text."
def test_markdown_image_with_image_ext(self):
content = "Here is a photo: ![cat](https://example.com/cat.png)"
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/cat.png"
assert images[0][1] == "cat"
assert "![cat]" not in cleaned
def test_markdown_image_jpg(self):
content = "![photo](https://example.com/photo.jpg)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.jpg"
assert images[0][1] == "photo"
def test_markdown_image_jpeg(self):
content = "![](https://example.com/photo.jpeg)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.jpeg"
assert images[0][1] == ""
def test_markdown_image_gif(self):
content = "![anim](https://example.com/anim.gif)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/anim.gif"
assert images[0][1] == "anim"
def test_markdown_image_webp(self):
content = "![](https://example.com/img.webp)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/img.webp"
assert images[0][1] == ""
def test_fal_media_cdn(self):
content = "![gen](https://fal.media/files/abc123/output.png)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal.media/files/abc123/output.png"
assert images[0][1] == "gen"
def test_fal_cdn_url(self):
content = "![](https://fal-cdn.example.com/result)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal-cdn.example.com/result"
assert images[0][1] == ""
def test_replicate_delivery(self):
content = "![](https://replicate.delivery/pbxt/abc/output)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://replicate.delivery/pbxt/abc/output"
assert images[0][1] == ""
def test_non_image_ext_not_extracted(self):
"""Markdown image with non-image extension should not be extracted."""
content = "![doc](https://example.com/report.pdf)"
images, cleaned = BasePlatformAdapter.extract_images(content)
assert images == []
assert "![doc]" in cleaned # Should be preserved
def test_html_img_tag(self):
content = 'Check this: <img src="https://example.com/photo.png">'
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == "" # HTML images have no alt text
assert "<img" not in cleaned
def test_html_img_self_closing(self):
content = '<img src="https://example.com/photo.png"/>'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == ""
def test_html_img_with_closing_tag(self):
content = '<img src="https://example.com/photo.png"></img>'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == ""
def test_multiple_images(self):
content = "![a](https://example.com/a.png)\n![b](https://example.com/b.jpg)"
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 2
assert "![a]" not in cleaned
assert "![b]" not in cleaned
def test_mixed_markdown_and_html(self):
content = '![cat](https://example.com/cat.png)\n<img src="https://example.com/dog.jpg">'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 2
def test_cleaned_content_trims_excess_newlines(self):
content = "Before\n\n![img](https://example.com/img.png)\n\n\n\nAfter"
_, cleaned = BasePlatformAdapter.extract_images(content)
assert "\n\n\n" not in cleaned
def test_non_http_url_not_matched(self):
content = "![file](file:///local/path.png)"
images, _ = BasePlatformAdapter.extract_images(content)
assert images == []
def test_non_image_link_preserved_when_mixed_with_images(self):
"""Regression: non-image markdown links must not be silently removed
when the response also contains real images."""
content = (
"Here is the image: ![photo](https://fal.media/cat.png)\n"
"And a doc: ![report](https://example.com/report.pdf)"
)
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal.media/cat.png"
# The PDF link must survive in cleaned content
assert "![report](https://example.com/report.pdf)" in cleaned
# ---------------------------------------------------------------------------
# extract_media
# ---------------------------------------------------------------------------
class TestExtractMedia:
def test_no_media(self):
media, cleaned = BasePlatformAdapter.extract_media("Just text.")
assert media == []
assert cleaned == "Just text."
def test_single_media_tag(self):
content = "MEDIA:/path/to/audio.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert media[0][0] == "/path/to/audio.ogg"
assert media[0][1] is False # no voice tag
def test_media_with_voice_directive(self):
content = "[[audio_as_voice]]\nMEDIA:/path/to/voice.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert media[0][0] == "/path/to/voice.ogg"
assert media[0][1] is True # voice tag present
def test_multiple_media_tags(self):
content = "MEDIA:/a.ogg\nMEDIA:/b.ogg"
media, _ = BasePlatformAdapter.extract_media(content)
assert len(media) == 2
def test_voice_directive_removed_from_content(self):
content = "[[audio_as_voice]]\nSome text\nMEDIA:/voice.ogg"
_, cleaned = BasePlatformAdapter.extract_media(content)
assert "[[audio_as_voice]]" not in cleaned
assert "MEDIA:" not in cleaned
assert "Some text" in cleaned
def test_media_with_text_before(self):
content = "Here is your audio:\nMEDIA:/output.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert "Here is your audio" in cleaned
def test_cleaned_content_trims_excess_newlines(self):
content = "Before\n\nMEDIA:/audio.ogg\n\n\n\nAfter"
_, cleaned = BasePlatformAdapter.extract_media(content)
assert "\n\n\n" not in cleaned
def test_media_tag_allows_optional_whitespace_after_colon(self):
content = "MEDIA: /path/to/audio.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/path/to/audio.ogg", False)]
assert cleaned == ""
def test_media_tag_strips_wrapping_quotes_and_backticks(self):
content = "MEDIA: `/path/to/file.png`\nMEDIA:\"/path/to/file2.png\"\nMEDIA:'/path/to/file3.png'"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [
("/path/to/file.png", False),
("/path/to/file2.png", False),
("/path/to/file3.png", False),
]
assert cleaned == ""
def test_media_tag_supports_quoted_paths_with_spaces(self):
content = "Here\nMEDIA: '/tmp/my image.png'\nAfter"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/my image.png", False)]
assert "Here" in cleaned
assert "After" in cleaned
def test_media_tag_supports_unquoted_flac_paths_with_spaces(self):
content = "MEDIA:/tmp/Jane Doe/speech.flac"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/Jane Doe/speech.flac", False)]
assert cleaned == ""
def test_as_document_directive_stripped_from_cleaned_text(self):
"""[[as_document]] is a routing directive — strip it from
user-visible text just like [[audio_as_voice]]. Callers detect the
directive on the original content (before extract_media)."""
content = "Here is your infographic:\n[[as_document]]\nMEDIA:/tmp/x.jpg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/x.jpg", False)]
assert "[[as_document]]" not in cleaned
assert "Here is your infographic" in cleaned
def test_as_document_directive_alone_does_not_attach_voice_flag(self):
"""[[as_document]] is independent of [[audio_as_voice]] — combining
them in the same response should not entangle the flags."""
content = "[[as_document]]\nMEDIA:/tmp/x.jpg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/x.jpg", False)] # voice flag stays False
assert "[[as_document]]" not in cleaned
def test_both_directives_can_coexist(self):
"""A response could (rarely) contain both [[audio_as_voice]] for an
ogg file AND [[as_document]] for an attached image. The voice flag
propagates per-tuple; [[as_document]] is detected at dispatch."""
content = "[[audio_as_voice]]\n[[as_document]]\nMEDIA:/tmp/x.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
# Voice flag is propagated to every media tuple (this matches the
# existing extract_media contract)
assert media == [("/tmp/x.ogg", True)]
# Both directives stripped from cleaned text
assert "[[audio_as_voice]]" not in cleaned
assert "[[as_document]]" not in cleaned
class TestMediaDeliveryPathValidation:
def _patch_roots(self, monkeypatch, *roots):
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
tuple(roots),
)
# Disable recency-based trust by default so the original allowlist
# tests continue to exercise the strict-allowlist path. Tests that
# specifically cover recency trust re-enable it themselves.
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "0")
def test_allows_existing_file_inside_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
media_file = root / "voice.ogg"
media_file.parent.mkdir(parents=True)
media_file.write_bytes(b"OggS")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(media_file)) == str(media_file.resolve())
def test_rejects_existing_file_outside_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
root.mkdir()
secret = tmp_path / "secrets.txt"
secret.write_text("not for upload")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_rejects_symlink_escape_from_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
root.mkdir()
secret = tmp_path / "outside.png"
secret.write_bytes(b"secret")
link = root / "safe-looking.png"
try:
link.symlink_to(secret)
except OSError:
pytest.skip("symlink creation is unavailable")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(link)) is None
def test_filter_keeps_safe_media_and_drops_unsafe(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
safe = root / "speech.ogg"
unsafe = tmp_path / "outside.ogg"
safe.parent.mkdir(parents=True)
safe.write_bytes(b"OggS")
unsafe.write_bytes(b"OggS")
self._patch_roots(monkeypatch, root)
filtered = BasePlatformAdapter.filter_media_delivery_paths([
(str(unsafe), False),
(str(safe), True),
])
assert filtered == [(str(safe.resolve()), True)]
def test_allows_operator_configured_extra_root(self, tmp_path, monkeypatch):
extra_root = tmp_path / "operator-media"
media_file = extra_root / "report.pdf"
media_file.parent.mkdir(parents=True)
media_file.write_bytes(b"%PDF-1.4")
self._patch_roots(monkeypatch)
monkeypatch.setenv("HERMES_MEDIA_ALLOW_DIRS", str(extra_root))
assert BasePlatformAdapter.validate_media_delivery_path(str(media_file)) == str(media_file.resolve())
def test_recency_trust_allows_freshly_produced_file(self, tmp_path, monkeypatch):
"""A PDF the agent just wrote to /tmp should be deliverable.
Covers the natural case: agent runs ``pandoc -o /tmp/report.pdf`` or
``write_file('/home/user/report.pdf', ...)`` and asks the gateway to
send the result. With recency trust on, fresh files outside the cache
allowlist are accepted because the file's mtime is within the window.
"""
self._patch_roots(monkeypatch) # zero cache allowlist
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
fresh = tmp_path / "scratch" / "report.pdf"
fresh.parent.mkdir(parents=True)
fresh.write_bytes(b"%PDF-1.4")
assert BasePlatformAdapter.validate_media_delivery_path(str(fresh)) == str(fresh.resolve())
def test_recency_trust_rejects_old_file(self, tmp_path, monkeypatch):
"""A pre-existing host file (~/.bashrc, /etc/passwd shape) is rejected.
Recency trust is the load-bearing anti-injection signal: prompt-injected
paths point at files that have existed for days or months, well outside
the trust window.
"""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "60")
stale = tmp_path / "stale.pdf"
stale.write_bytes(b"%PDF-1.4")
old_mtime = time.time() - 7200 # 2 hours ago
os.utime(stale, (old_mtime, old_mtime))
assert BasePlatformAdapter.validate_media_delivery_path(str(stale)) is None
def test_recency_trust_disabled_falls_back_to_pure_allowlist(self, tmp_path, monkeypatch):
"""Setting trust_recent_files=false reverts to pre-existing strict behavior."""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "0")
fresh = tmp_path / "report.pdf"
fresh.write_bytes(b"%PDF-1.4") # mtime = now
assert BasePlatformAdapter.validate_media_delivery_path(str(fresh)) is None
def test_recency_trust_denies_system_paths_even_when_fresh(self, tmp_path, monkeypatch):
"""A freshly-touched file under /etc must NOT be uploaded.
Belt-and-braces: even if an attacker rewrites the file's mtime
(e.g. via a separately compromised tool result that touches a system
file), the denylist refuses to deliver paths under /etc, /proc, /sys,
~/.ssh, ~/.aws, etc.
"""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
# Simulate $HOME so ~/.ssh resolves into our tmp dir.
fake_home = tmp_path / "home"
ssh_dir = fake_home / ".ssh"
ssh_dir.mkdir(parents=True)
secret = ssh_dir / "id_rsa.txt"
secret.write_bytes(b"-----BEGIN ...") # mtime = now
monkeypatch.setenv("HOME", str(fake_home))
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_recency_trust_allows_pdf_in_project_dir(self, tmp_path, monkeypatch):
"""The motivating case: agent produces a PDF in a project directory.
Reproduces the Discord-PDF-not-delivered bug. Before recency trust,
files outside ~/.hermes/cache/* were silently dropped, leaving the
user with a raw filepath in chat instead of an attachment.
"""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
project = tmp_path / "my-project"
report = project / "build" / "weekly-report.pdf"
report.parent.mkdir(parents=True)
report.write_bytes(b"%PDF-1.4")
assert BasePlatformAdapter.validate_media_delivery_path(str(report)) == str(report.resolve())
def test_filter_keeps_recently_produced_files(self, tmp_path, monkeypatch):
"""End-to-end: filter_local_delivery_paths routes a fresh PDF through."""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
fresh = tmp_path / "report.pdf"
fresh.write_bytes(b"%PDF-1.4")
out = BasePlatformAdapter.filter_local_delivery_paths([str(fresh)])
assert out == [str(fresh.resolve())]
# ---------------------------------------------------------------------------
# should_send_media_as_audio
# ---------------------------------------------------------------------------
class TestShouldSendMediaAsAudio:
"""Audio-routing policy shared by gateway + scheduler + send_message."""
def test_unknown_extension_returns_false(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio(None, ".png") is False
assert should_send_media_as_audio("telegram", ".pdf") is False
def test_non_telegram_platforms_route_all_audio(self):
from gateway.platforms.base import should_send_media_as_audio
for ext in (".mp3", ".m4a", ".wav", ".flac", ".ogg", ".opus"):
assert should_send_media_as_audio("discord", ext) is True
assert should_send_media_as_audio("slack", ext) is True
def test_telegram_mp3_and_m4a_route_to_audio(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio("telegram", ".mp3") is True
assert should_send_media_as_audio("telegram", ".m4a") is True
def test_telegram_wav_and_flac_fall_through_to_document(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio("telegram", ".wav") is False
assert should_send_media_as_audio("telegram", ".flac") is False
def test_telegram_ogg_opus_only_when_voice_flagged(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio("telegram", ".ogg", is_voice=True) is True
assert should_send_media_as_audio("telegram", ".opus", is_voice=True) is True
assert should_send_media_as_audio("telegram", ".ogg") is False
assert should_send_media_as_audio("telegram", ".opus") is False
def test_accepts_platform_enum(self):
from gateway.config import Platform
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio(Platform.TELEGRAM, ".mp3") is True
assert should_send_media_as_audio(Platform.TELEGRAM, ".flac") is False
assert should_send_media_as_audio(Platform.DISCORD, ".flac") is True
# ---------------------------------------------------------------------------
# truncate_message
# ---------------------------------------------------------------------------
class TestTruncateMessage:
def _adapter(self):
"""Create a minimal adapter instance for testing static/instance methods."""
class StubAdapter(BasePlatformAdapter):
async def connect(self):
return True
async def disconnect(self):
pass
async def send(self, *a, **kw):
pass
async def get_chat_info(self, *a):
return {}
from gateway.config import Platform, PlatformConfig
config = PlatformConfig(enabled=True, token="test")
return StubAdapter(config=config, platform=Platform.TELEGRAM)
def test_short_message_single_chunk(self):
adapter = self._adapter()
chunks = adapter.truncate_message("Hello world", max_length=100)
assert chunks == ["Hello world"]
def test_exact_length_single_chunk(self):
adapter = self._adapter()
msg = "x" * 100
chunks = adapter.truncate_message(msg, max_length=100)
assert chunks == [msg]
def test_long_message_splits(self):
adapter = self._adapter()
msg = "word " * 200 # ~1000 chars
chunks = adapter.truncate_message(msg, max_length=200)
assert len(chunks) > 1
# Verify all original content is preserved across chunks
reassembled = "".join(chunks)
# Strip chunk indicators like (1/N) to get raw content
for word in msg.strip().split():
assert word in reassembled, f"Word '{word}' lost during truncation"
def test_chunks_have_indicators(self):
adapter = self._adapter()
msg = "word " * 200
chunks = adapter.truncate_message(msg, max_length=200)
assert "(1/" in chunks[0]
assert f"({len(chunks)}/{len(chunks)})" in chunks[-1]
def test_code_block_first_chunk_closed(self):
adapter = self._adapter()
msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
chunks = adapter.truncate_message(msg, max_length=300)
assert len(chunks) > 1
# First chunk must have a closing fence appended (code block was split)
first_fences = chunks[0].count("```")
assert first_fences == 2, "First chunk should have opening + closing fence"
def test_code_block_language_tag_carried(self):
adapter = self._adapter()
msg = "Start\n```javascript\n" + "console.log('x');\n" * 80 + "```\nEnd"
chunks = adapter.truncate_message(msg, max_length=300)
if len(chunks) > 1:
# At least one continuation chunk should reopen with ```javascript
reopened_with_lang = any("```javascript" in chunk for chunk in chunks[1:])
assert reopened_with_lang, (
"No continuation chunk reopened with language tag"
)
def test_continuation_chunks_have_balanced_fences(self):
"""Regression: continuation chunks must close reopened code blocks."""
adapter = self._adapter()
msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
chunks = adapter.truncate_message(msg, max_length=300)
assert len(chunks) > 1
for i, chunk in enumerate(chunks):
fence_count = chunk.count("```")
assert fence_count % 2 == 0, (
f"Chunk {i} has unbalanced fences ({fence_count})"
)
def test_each_chunk_under_max_length(self):
adapter = self._adapter()
msg = "word " * 500
max_len = 200
chunks = adapter.truncate_message(msg, max_length=max_len)
for i, chunk in enumerate(chunks):
assert len(chunk) <= max_len + 20, (
f"Chunk {i} too long: {len(chunk)} > {max_len}"
)
# ---------------------------------------------------------------------------
# _get_human_delay
# ---------------------------------------------------------------------------
class TestGetHumanDelay:
def test_off_mode(self):
with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "off"}):
assert BasePlatformAdapter._get_human_delay() == 0.0
def test_default_is_off(self):
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("HERMES_HUMAN_DELAY_MODE", None)
assert BasePlatformAdapter._get_human_delay() == 0.0
def test_natural_mode_range(self):
with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "natural"}):
delay = BasePlatformAdapter._get_human_delay()
assert 0.8 <= delay <= 2.5
def test_natural_mode_ignores_malformed_custom_env_vars(self):
env = {
"HERMES_HUMAN_DELAY_MODE": "natural",
"HERMES_HUMAN_DELAY_MIN_MS": "oops",
"HERMES_HUMAN_DELAY_MAX_MS": "still-bad",
}
with patch.dict(os.environ, env):
delay = BasePlatformAdapter._get_human_delay()
assert 0.8 <= delay <= 2.5
def test_custom_mode_uses_env_vars(self):
env = {
"HERMES_HUMAN_DELAY_MODE": "custom",
"HERMES_HUMAN_DELAY_MIN_MS": "100",
"HERMES_HUMAN_DELAY_MAX_MS": "200",
}
with patch.dict(os.environ, env):
delay = BasePlatformAdapter._get_human_delay()
assert 0.1 <= delay <= 0.2
def test_custom_mode_tolerates_malformed_env_vars(self):
env = {
"HERMES_HUMAN_DELAY_MODE": "custom",
"HERMES_HUMAN_DELAY_MIN_MS": "oops",
"HERMES_HUMAN_DELAY_MAX_MS": "still-bad",
}
with patch.dict(os.environ, env):
# falls back to the custom-mode defaults instead of crashing
delay = BasePlatformAdapter._get_human_delay()
assert 0.8 <= delay <= 2.5
# ---------------------------------------------------------------------------
# utf16_len / _prefix_within_utf16_limit / truncate_message with len_fn
# ---------------------------------------------------------------------------
# Ported from nearai/ironclaw#2304 — Telegram counts message length in UTF-16
# code units, not Unicode code-points. Astral-plane characters (emoji, CJK
# Extension B) are surrogate pairs: 1 Python char but 2 UTF-16 units.
class TestUtf16Len:
"""Verify the UTF-16 length helper."""
def test_ascii(self):
assert utf16_len("hello") == 5
def test_bmp_cjk(self):
# CJK ideographs in the BMP are 1 code unit each
assert utf16_len("你好") == 2
def test_emoji_surrogate_pair(self):
# 😀 (U+1F600) is outside BMP → 2 UTF-16 code units
assert utf16_len("😀") == 2
def test_mixed(self):
# "hi😀" = 2 + 2 = 4 UTF-16 units
assert utf16_len("hi😀") == 4
def test_musical_symbol(self):
# 𝄞 (U+1D11E) — Musical Symbol G Clef, surrogate pair
assert utf16_len("𝄞") == 2
def test_empty(self):
assert utf16_len("") == 0
class TestPrefixWithinUtf16Limit:
"""Verify UTF-16-aware prefix truncation."""
def test_fits_entirely(self):
assert _prefix_within_utf16_limit("hello", 10) == "hello"
def test_ascii_truncation(self):
result = _prefix_within_utf16_limit("hello world", 5)
assert result == "hello"
assert utf16_len(result) <= 5
def test_does_not_split_surrogate_pair(self):
# "a😀b" = 1 + 2 + 1 = 4 UTF-16 units; limit 2 should give "a"
result = _prefix_within_utf16_limit("a😀b", 2)
assert result == "a"
assert utf16_len(result) <= 2
def test_emoji_at_limit(self):
# "😀" = 2 UTF-16 units; limit 2 should include it
result = _prefix_within_utf16_limit("😀x", 2)
assert result == "😀"
def test_all_emoji(self):
msg = "😀" * 10 # 20 UTF-16 units
result = _prefix_within_utf16_limit(msg, 6)
assert result == "😀😀😀"
assert utf16_len(result) == 6
def test_empty(self):
assert _prefix_within_utf16_limit("", 5) == ""
class TestTruncateMessageUtf16:
"""Verify truncate_message respects UTF-16 lengths when len_fn=utf16_len."""
def test_short_emoji_message_no_split(self):
"""A short message under the UTF-16 limit should not be split."""
msg = "Hello 😀 world"
chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len)
assert len(chunks) == 1
assert chunks[0] == msg
def test_emoji_near_limit_triggers_split(self):
"""A message at 4096 codepoints but >4096 UTF-16 units must split."""
# 2049 emoji = 2049 codepoints but 4098 UTF-16 units → exceeds 4096
msg = "😀" * 2049
assert len(msg) == 2049 # Python len sees 2049 chars
assert utf16_len(msg) == 4098 # but it's 4098 UTF-16 units
# Without UTF-16 awareness, this would NOT split (2049 < 4096)
chunks_naive = BasePlatformAdapter.truncate_message(msg, 4096)
assert len(chunks_naive) == 1, "Without len_fn, no split expected"
# With UTF-16 awareness, it MUST split
chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len)
assert len(chunks) > 1, "With utf16_len, message should be split"
# Each chunk must fit within the UTF-16 limit
for i, chunk in enumerate(chunks):
assert utf16_len(chunk) <= 4096, (
f"Chunk {i} exceeds 4096 UTF-16 units: {utf16_len(chunk)}"
)
def test_each_utf16_chunk_within_limit(self):
"""All chunks produced with utf16_len must fit the limit."""
# Mix of BMP and astral-plane characters
msg = ("Hello 😀 world 🎵 test 𝄞 " * 200).strip()
max_len = 200
chunks = BasePlatformAdapter.truncate_message(msg, max_len, len_fn=utf16_len)
for i, chunk in enumerate(chunks):
u16_len = utf16_len(chunk)
assert u16_len <= max_len + 20, (
f"Chunk {i} UTF-16 length {u16_len} exceeds {max_len}"
)
def test_all_content_preserved(self):
"""Splitting with utf16_len must not lose content."""
words = ["emoji😀", "music🎵", "cjk你好", "plain"] * 100
msg = " ".join(words)
chunks = BasePlatformAdapter.truncate_message(msg, 200, len_fn=utf16_len)
reassembled = " ".join(chunks)
for word in words:
assert word in reassembled, f"Word '{word}' lost during UTF-16 split"
def test_code_blocks_preserved_with_utf16(self):
"""Code block fence handling should work with utf16_len too."""
msg = "Before\n```python\n" + "x = '😀'\n" * 200 + "```\nAfter"
chunks = BasePlatformAdapter.truncate_message(msg, 300, len_fn=utf16_len)
assert len(chunks) > 1
# Each chunk should have balanced fences
for i, chunk in enumerate(chunks):
fence_count = chunk.count("```")
assert fence_count % 2 == 0, (
f"Chunk {i} has unbalanced fences ({fence_count})"
)
class TestProxyKwargsForAiohttp:
"""Verify proxy_kwargs_for_aiohttp routes all schemes through ProxyConnector."""
def test_none_returns_empty(self):
from gateway.platforms.base import proxy_kwargs_for_aiohttp
sess_kw, req_kw = proxy_kwargs_for_aiohttp(None)
assert sess_kw == {}
assert req_kw == {}
def test_http_proxy_uses_connector_when_aiohttp_socks_available(self):
pytest.importorskip("aiohttp_socks")
from unittest.mock import MagicMock
from gateway.platforms.base import proxy_kwargs_for_aiohttp
sentinel = MagicMock(name="ProxyConnector")
with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel):
sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
assert sess_kw.get("connector") is sentinel, (
"HTTP proxy must use ProxyConnector so libraries that don't "
"forward per-request proxy= kwargs still route through the proxy"
)
assert req_kw == {}
def test_socks_proxy_uses_connector(self):
pytest.importorskip("aiohttp_socks")
from unittest.mock import MagicMock
from gateway.platforms.base import proxy_kwargs_for_aiohttp
sentinel = MagicMock(name="ProxyConnector")
with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel):
sess_kw, req_kw = proxy_kwargs_for_aiohttp("socks5://proxy:1080")
assert sess_kw.get("connector") is sentinel
assert req_kw == {}
def test_http_proxy_falls_back_without_aiohttp_socks(self):
from gateway.platforms.base import proxy_kwargs_for_aiohttp
with patch.dict("sys.modules", {"aiohttp_socks": None}):
sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
assert sess_kw == {}
assert req_kw == {"proxy": "http://proxy:8080"}