hermes-agent/tests/gateway/test_platform_base.py
liuhao1024 3ccf4fdc6d fix(gateway): skip MEDIA: tags inside code blocks and blockquotes
extract_media() scanned the full response text without distinguishing
live delivery tags from example paths in fenced code blocks, inline code
spans, and blockquotes. This caused false positives where the agent's
explanation of MEDIA: syntax (or tool output containing example paths)
was stripped from user-visible text and the path was added to the media
delivery list.

Added _mask_protected_spans() helper that replaces protected regions
with equal-length whitespace before regex matching, preserving match
offsets. The helper skips backtick-quoted paths in MEDIA: tags to
maintain existing path extraction behavior.

Fixes #35695
2026-06-01 00:00:26 -07:00

1359 lines
57 KiB
Python

"""Tests for gateway/platforms/base.py — MessageEvent, media extraction, message truncation."""
import os
import time
from unittest.mock import patch
import pytest
from gateway.platforms.base import (
BasePlatformAdapter,
GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE,
MessageEvent,
safe_url_for_log,
utf16_len,
_log_safe_path,
_prefix_within_utf16_limit,
)
class TestSecretCaptureGuidance:
def test_gateway_secret_capture_message_points_to_local_setup(self):
message = GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
assert "local cli" in message.lower()
assert "~/.hermes/.env" in message
class TestSafeUrlForLog:
def test_strips_query_fragment_and_userinfo(self):
url = (
"https://user:pass@example.com/private/path/image.png"
"?X-Amz-Signature=supersecret&token=abc#frag"
)
result = safe_url_for_log(url)
assert result == "https://example.com/.../image.png"
assert "supersecret" not in result
assert "token=abc" not in result
assert "user:pass@" not in result
def test_truncates_long_values(self):
long_url = "https://example.com/" + ("a" * 300)
result = safe_url_for_log(long_url, max_len=40)
assert len(result) == 40
assert result.endswith("...")
def test_handles_small_and_non_positive_max_len(self):
url = "https://example.com/very/long/path/file.png?token=secret"
assert safe_url_for_log(url, max_len=3) == "..."
assert safe_url_for_log(url, max_len=2) == ".."
assert safe_url_for_log(url, max_len=0) == ""
# ---------------------------------------------------------------------------
# MessageEvent — command parsing
# ---------------------------------------------------------------------------
class TestMessageEventIsCommand:
def test_slash_command(self):
event = MessageEvent(text="/new")
assert event.is_command() is True
def test_regular_text(self):
event = MessageEvent(text="hello world")
assert event.is_command() is False
def test_empty_text(self):
event = MessageEvent(text="")
assert event.is_command() is False
def test_slash_only(self):
event = MessageEvent(text="/")
assert event.is_command() is True
class TestMessageEventGetCommand:
def test_simple_command(self):
event = MessageEvent(text="/new")
assert event.get_command() == "new"
def test_command_with_args(self):
event = MessageEvent(text="/reset session")
assert event.get_command() == "reset"
def test_not_a_command(self):
event = MessageEvent(text="hello")
assert event.get_command() is None
def test_command_is_lowercased(self):
event = MessageEvent(text="/HELP")
assert event.get_command() == "help"
def test_slash_only_returns_empty(self):
event = MessageEvent(text="/")
assert event.get_command() == ""
def test_command_with_at_botname(self):
event = MessageEvent(text="/new@TigerNanoBot")
assert event.get_command() == "new"
def test_command_with_at_botname_and_args(self):
event = MessageEvent(text="/compress@TigerNanoBot")
assert event.get_command() == "compress"
def test_command_mixed_case_with_at_botname(self):
event = MessageEvent(text="/RESET@TigerNanoBot")
assert event.get_command() == "reset"
class TestMessageEventGetCommandArgs:
def test_command_with_args(self):
event = MessageEvent(text="/new session id 123")
assert event.get_command_args() == "session id 123"
def test_command_without_args(self):
event = MessageEvent(text="/new")
assert event.get_command_args() == ""
def test_not_a_command_returns_full_text(self):
event = MessageEvent(text="hello world")
assert event.get_command_args() == "hello world"
# ---------------------------------------------------------------------------
# extract_images
# ---------------------------------------------------------------------------
class TestExtractImages:
def test_no_images(self):
images, cleaned = BasePlatformAdapter.extract_images("Just regular text.")
assert images == []
assert cleaned == "Just regular text."
def test_markdown_image_with_image_ext(self):
content = "Here is a photo: ![cat](https://example.com/cat.png)"
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/cat.png"
assert images[0][1] == "cat"
assert "![cat]" not in cleaned
def test_markdown_image_jpg(self):
content = "![photo](https://example.com/photo.jpg)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.jpg"
assert images[0][1] == "photo"
def test_markdown_image_jpeg(self):
content = "![](https://example.com/photo.jpeg)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.jpeg"
assert images[0][1] == ""
def test_markdown_image_gif(self):
content = "![anim](https://example.com/anim.gif)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/anim.gif"
assert images[0][1] == "anim"
def test_markdown_image_webp(self):
content = "![](https://example.com/img.webp)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/img.webp"
assert images[0][1] == ""
def test_fal_media_cdn(self):
content = "![gen](https://fal.media/files/abc123/output.png)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal.media/files/abc123/output.png"
assert images[0][1] == "gen"
def test_fal_cdn_url(self):
content = "![](https://fal-cdn.example.com/result)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal-cdn.example.com/result"
assert images[0][1] == ""
def test_replicate_delivery(self):
content = "![](https://replicate.delivery/pbxt/abc/output)"
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://replicate.delivery/pbxt/abc/output"
assert images[0][1] == ""
def test_non_image_ext_not_extracted(self):
"""Markdown image with non-image extension should not be extracted."""
content = "![doc](https://example.com/report.pdf)"
images, cleaned = BasePlatformAdapter.extract_images(content)
assert images == []
assert "![doc]" in cleaned # Should be preserved
def test_html_img_tag(self):
content = 'Check this: <img src="https://example.com/photo.png">'
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == "" # HTML images have no alt text
assert "<img" not in cleaned
def test_html_img_self_closing(self):
content = '<img src="https://example.com/photo.png"/>'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == ""
def test_html_img_with_closing_tag(self):
content = '<img src="https://example.com/photo.png"></img>'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://example.com/photo.png"
assert images[0][1] == ""
def test_multiple_images(self):
content = "![a](https://example.com/a.png)\n![b](https://example.com/b.jpg)"
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 2
assert "![a]" not in cleaned
assert "![b]" not in cleaned
def test_mixed_markdown_and_html(self):
content = '![cat](https://example.com/cat.png)\n<img src="https://example.com/dog.jpg">'
images, _ = BasePlatformAdapter.extract_images(content)
assert len(images) == 2
def test_cleaned_content_trims_excess_newlines(self):
content = "Before\n\n![img](https://example.com/img.png)\n\n\n\nAfter"
_, cleaned = BasePlatformAdapter.extract_images(content)
assert "\n\n\n" not in cleaned
def test_non_http_url_not_matched(self):
content = "![file](file:///local/path.png)"
images, _ = BasePlatformAdapter.extract_images(content)
assert images == []
def test_non_image_link_preserved_when_mixed_with_images(self):
"""Regression: non-image markdown links must not be silently removed
when the response also contains real images."""
content = (
"Here is the image: ![photo](https://fal.media/cat.png)\n"
"And a doc: ![report](https://example.com/report.pdf)"
)
images, cleaned = BasePlatformAdapter.extract_images(content)
assert len(images) == 1
assert images[0][0] == "https://fal.media/cat.png"
# The PDF link must survive in cleaned content
assert "![report](https://example.com/report.pdf)" in cleaned
# ---------------------------------------------------------------------------
# extract_media
# ---------------------------------------------------------------------------
class TestExtractMedia:
def test_no_media(self):
media, cleaned = BasePlatformAdapter.extract_media("Just text.")
assert media == []
assert cleaned == "Just text."
def test_single_media_tag(self):
content = "MEDIA:/path/to/audio.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert media[0][0] == "/path/to/audio.ogg"
assert media[0][1] is False # no voice tag
def test_media_with_voice_directive(self):
content = "[[audio_as_voice]]\nMEDIA:/path/to/voice.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert media[0][0] == "/path/to/voice.ogg"
assert media[0][1] is True # voice tag present
def test_multiple_media_tags(self):
content = "MEDIA:/a.ogg\nMEDIA:/b.ogg"
media, _ = BasePlatformAdapter.extract_media(content)
assert len(media) == 2
def test_voice_directive_removed_from_content(self):
content = "[[audio_as_voice]]\nSome text\nMEDIA:/voice.ogg"
_, cleaned = BasePlatformAdapter.extract_media(content)
assert "[[audio_as_voice]]" not in cleaned
assert "MEDIA:" not in cleaned
assert "Some text" in cleaned
def test_media_with_text_before(self):
content = "Here is your audio:\nMEDIA:/output.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert "Here is your audio" in cleaned
def test_cleaned_content_trims_excess_newlines(self):
content = "Before\n\nMEDIA:/audio.ogg\n\n\n\nAfter"
_, cleaned = BasePlatformAdapter.extract_media(content)
assert "\n\n\n" not in cleaned
def test_media_tag_allows_optional_whitespace_after_colon(self):
content = "MEDIA: /path/to/audio.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/path/to/audio.ogg", False)]
assert cleaned == ""
def test_media_tag_strips_wrapping_quotes_and_backticks(self):
content = "MEDIA: `/path/to/file.png`\nMEDIA:\"/path/to/file2.png\"\nMEDIA:'/path/to/file3.png'"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [
("/path/to/file.png", False),
("/path/to/file2.png", False),
("/path/to/file3.png", False),
]
assert cleaned == ""
def test_media_tag_supports_quoted_paths_with_spaces(self):
content = "Here\nMEDIA: '/tmp/my image.png'\nAfter"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/my image.png", False)]
assert "Here" in cleaned
assert "After" in cleaned
def test_media_tag_supports_unquoted_flac_paths_with_spaces(self):
content = "MEDIA:/tmp/Jane Doe/speech.flac"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/Jane Doe/speech.flac", False)]
assert cleaned == ""
def test_as_document_directive_stripped_from_cleaned_text(self):
"""[[as_document]] is a routing directive — strip it from
user-visible text just like [[audio_as_voice]]. Callers detect the
directive on the original content (before extract_media)."""
content = "Here is your infographic:\n[[as_document]]\nMEDIA:/tmp/x.jpg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/x.jpg", False)]
assert "[[as_document]]" not in cleaned
assert "Here is your infographic" in cleaned
def test_as_document_directive_alone_does_not_attach_voice_flag(self):
"""[[as_document]] is independent of [[audio_as_voice]] — combining
them in the same response should not entangle the flags."""
content = "[[as_document]]\nMEDIA:/tmp/x.jpg"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == [("/tmp/x.jpg", False)] # voice flag stays False
assert "[[as_document]]" not in cleaned
def test_both_directives_can_coexist(self):
"""A response could (rarely) contain both [[audio_as_voice]] for an
ogg file AND [[as_document]] for an attached image. The voice flag
propagates per-tuple; [[as_document]] is detected at dispatch."""
content = "[[audio_as_voice]]\n[[as_document]]\nMEDIA:/tmp/x.ogg"
media, cleaned = BasePlatformAdapter.extract_media(content)
# Voice flag is propagated to every media tuple (this matches the
# existing extract_media contract)
assert media == [("/tmp/x.ogg", True)]
# Both directives stripped from cleaned text
assert "[[audio_as_voice]]" not in cleaned
assert "[[as_document]]" not in cleaned
# Windows path support — regression coverage for #34632
def test_media_tag_windows_backslash_path(self):
"""extract_media should recognise Windows backslash paths."""
media, cleaned = BasePlatformAdapter.extract_media(
r"MEDIA:C:\Users\kotsu\file.pdf"
)
assert len(media) == 1
assert media[0][0].endswith("file.pdf")
def test_media_tag_windows_forward_slash_path(self):
"""extract_media should recognise Windows forward-slash paths."""
media, cleaned = BasePlatformAdapter.extract_media(
"MEDIA:C:/Users/kotsu/file.pdf"
)
assert len(media) == 1
assert media[0][0].endswith("file.pdf")
def test_media_tag_windows_drive_root(self):
"""extract_media should recognise a path at the drive root."""
media, cleaned = BasePlatformAdapter.extract_media(
r"MEDIA:D:\report.md"
)
assert len(media) == 1
assert media[0][0].endswith("report.md")
def test_media_tag_unix_paths_still_work(self):
"""Unix absolute and tilde paths must still extract after Windows change."""
for content in ["MEDIA:/tmp/audio.ogg", r"MEDIA:~/docs/notes.md"]:
media, _ = BasePlatformAdapter.extract_media(content)
assert len(media) == 1, f"Failed for: {content}"
def test_relative_path_still_ignored(self):
"""Relative Windows-style paths (no drive letter) must not match."""
media, _ = BasePlatformAdapter.extract_media(
r"MEDIA:Users\kotsu\file.pdf"
)
assert media == []
# --- Code block / inline code / blockquote false-positive guards (#35695) ---
def test_media_in_fenced_code_block_ignored(self):
"""MEDIA: inside ``` fenced code blocks must not be extracted."""
content = "Here is an example:\n```text\nMEDIA:/path/to/example.png\n```\nDone."
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == []
assert "example" in cleaned.lower()
def test_media_in_inline_code_ignored(self):
"""MEDIA: inside backtick inline code must not be extracted."""
content = "Use `MEDIA:/path/to/file.png` in your response."
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == []
assert "MEDIA:" in cleaned # preserved as text
def test_media_in_blockquote_ignored(self):
"""MEDIA: inside a > blockquote must not be extracted."""
content = "> To send an image, include MEDIA:/path/to/image.jpg\nEnd."
media, cleaned = BasePlatformAdapter.extract_media(content)
assert media == []
assert "End." in cleaned
def test_media_outside_code_blocks_still_extracted(self):
"""Real MEDIA: tags outside protected regions must still work."""
content = "MEDIA:/real/file.png\n```code\nMEDIA:/fake/file.png\n```"
media, _ = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert media[0][0] == "/real/file.png"
def test_media_mixed_code_and_prose(self):
"""Real MEDIA: in prose + example in code block: only prose extracted."""
content = (
"Here is your file:\n"
"MEDIA:/output/report.pdf\n"
"Example usage:\n"
"```text\nMEDIA:/example/path.pdf\n```\n"
"Done."
)
media, cleaned = BasePlatformAdapter.extract_media(content)
assert len(media) == 1
assert media[0][0] == "/output/report.pdf"
assert "Done." in cleaned
class TestMediaInsideSerializedJson:
"""Regression coverage for #34375 — MEDIA: embedded in serialized JSON
string values (e.g. a stored previous reply inside a tool result) must not
be re-delivered as a real attachment, while legitimate MEDIA: tags in prose,
at line start, indented, or as quoted-path tags keep working.
"""
def test_media_in_json_value_not_extracted(self):
content = '{"result": "MEDIA:/tmp/stale.png"}'
media, _ = BasePlatformAdapter.extract_media(content)
assert media == [], f"JSON value MEDIA: leaked: {media}"
def test_media_in_pretty_json_value_not_extracted(self):
content = '{\n "tool_result": "MEDIA:/var/old.jpg"\n}'
media, _ = BasePlatformAdapter.extract_media(content)
assert media == [], f"pretty JSON MEDIA: leaked: {media}"
def test_media_in_json_array_not_extracted(self):
content = '["MEDIA:/a/b.png", "other"]'
media, _ = BasePlatformAdapter.extract_media(content)
assert media == [], f"JSON array MEDIA: leaked: {media}"
def test_media_in_nested_json_value_not_extracted(self):
content = '{"a":{"b":"see MEDIA:/x/y.pdf here"}}'
media, _ = BasePlatformAdapter.extract_media(content)
assert media == [], f"nested JSON MEDIA: leaked: {media}"
def test_media_in_embedded_serialized_reply_not_extracted(self):
"""A serialized tool result that embeds a prior reply's MEDIA: tag."""
content = (
'{"content":"previous reply MEDIA:/Users/ex/.hermes/media/'
'generated/stale.png and more text"}'
)
media, _ = BasePlatformAdapter.extract_media(content)
assert media == [], f"embedded serialized reply leaked: {media}"
# --- Legitimate tags must still extract (no regression vs line-start anchor) ---
def test_media_at_line_start_still_extracted(self):
media, _ = BasePlatformAdapter.extract_media("MEDIA:/real/file.png")
assert len(media) == 1 and media[0][0] == "/real/file.png"
def test_media_after_prose_same_line_still_extracted(self):
media, _ = BasePlatformAdapter.extract_media(
"Here is your file: MEDIA:/out/report.pdf"
)
assert len(media) == 1 and media[0][0] == "/out/report.pdf"
def test_media_indented_still_extracted(self):
media, _ = BasePlatformAdapter.extract_media(" MEDIA:/tmp/x.png")
assert len(media) == 1 and media[0][0] == "/tmp/x.png"
def test_quoted_path_media_still_extracted(self):
"""MEDIA:"..." quoted-path form (a real LLM output) is not JSON-masked."""
media, _ = BasePlatformAdapter.extract_media(
'MEDIA:"/path/with space/file.png"'
)
assert len(media) == 1 and media[0][0] == "/path/with space/file.png"
def test_tts_two_line_still_extracted(self):
media, _ = BasePlatformAdapter.extract_media(
"[[audio_as_voice]]\nMEDIA:/tmp/v.ogg"
)
assert len(media) == 1 and media[0][0] == "/tmp/v.ogg"
assert media[0][1] is True # voice flag
# --- cleaned-text invariants: real tags stripped, JSON data kept verbatim ---
def test_json_embedded_media_kept_verbatim_in_cleaned_text(self):
"""A real tag is delivered+stripped; a JSON-embedded MEDIA: stays as
literal text (stored data must read back unchanged)."""
content = 'MEDIA:/real/r.png\nlog: {"old":"MEDIA:/stale/s.png"}'
media, cleaned = BasePlatformAdapter.extract_media(content)
assert [p for p, _ in media] == ["/real/r.png"]
# The JSON-embedded path must survive verbatim — not blanked to spaces.
assert '{"old":"MEDIA:/stale/s.png"}' in cleaned
def test_cleaned_text_after_directive_not_truncated(self):
"""Stripping a tag preceded by a [[as_document]] directive must not
shift offsets and chop the path or trailing text."""
content = "See [[as_document]] MEDIA:/d/report.pdf now"
media, cleaned = BasePlatformAdapter.extract_media(content)
assert [p for p, _ in media] == ["/d/report.pdf"]
assert "MEDIA:" not in cleaned # real tag removed
assert cleaned.endswith("now") # trailing text intact (not chopped)
class TestMediaExtensionAllowlistParity:
"""Regression coverage for issue #34517 — the MEDIA: extension black hole.
extract_media used to carry a narrow extension allowlist that omitted
.md/.json/.yaml/.xml/.html etc., while extract_local_files had a broad one.
Combined with an unconditional ``MEDIA:\\s*\\S+`` strip at the dispatch
sites, an unmatched MEDIA: tag for one of those extensions was deleted from
the body before extract_local_files could pick up the bare path — the file
was silently dropped. Both extractors now derive from the single
MEDIA_DELIVERY_EXTS source of truth, and the strip is anchored to that set.
"""
DROPPED_BEFORE = ["md", "json", "yaml", "yml", "xml", "html", "htm",
"tsv", "svg"]
def test_previously_dropped_extensions_now_extract(self):
for ext in self.DROPPED_BEFORE:
path = f"/tmp/report.{ext}"
media, _ = BasePlatformAdapter.extract_media(f"Here: MEDIA:{path}")
assert media == [(path, False)], f".{ext} should extract via MEDIA:"
def test_extract_media_and_local_files_share_one_extension_set(self):
from gateway.platforms.base import MEDIA_DELIVERY_EXTS
# Both functions reference MEDIA_DELIVERY_EXTS; assert the documents
# that motivated the bug are present in the shared set.
for ext in (".md", ".json", ".yaml", ".yml", ".xml", ".html", ".htm"):
assert ext in MEDIA_DELIVERY_EXTS
def test_unknown_extension_not_black_holed_by_cleanup(self):
"""A MEDIA: tag with an unknown extension is NOT stripped from the
body — it survives so extract_local_files can still see the bare path,
rather than vanishing entirely (the core of issue #34517)."""
from gateway.platforms.base import MEDIA_TAG_CLEANUP_RE
text = "Saved to MEDIA:/tmp/data.weirdext done"
media, _ = BasePlatformAdapter.extract_media(text)
assert media == [] # unknown extension is not a deliverable MEDIA tag
stripped = MEDIA_TAG_CLEANUP_RE.sub("", text)
assert "/tmp/data.weirdext" in stripped # path preserved, not dropped
def test_known_extension_tag_is_stripped_from_body(self):
from gateway.platforms.base import MEDIA_TAG_CLEANUP_RE
text = "Here is your report: MEDIA:/tmp/report.md"
stripped = MEDIA_TAG_CLEANUP_RE.sub("", text).strip()
assert "MEDIA:" not in stripped
assert "/tmp/report.md" not in stripped
assert "Here is your report:" in stripped
class TestMediaDeliveryPathValidation:
def _patch_roots(self, monkeypatch, *roots):
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
tuple(roots),
)
# All tests in this class cover strict-mode behavior (allowlist +
# recency window + denylist). Force strict on so they keep
# exercising the legacy path even though the public default
# flipped to off in 2026-05.
monkeypatch.setenv("HERMES_MEDIA_DELIVERY_STRICT", "1")
# Disable recency-based trust by default so the original allowlist
# tests continue to exercise the strict-allowlist path. Tests that
# specifically cover recency trust re-enable it themselves.
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "0")
def test_allows_existing_file_inside_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
media_file = root / "voice.ogg"
media_file.parent.mkdir(parents=True)
media_file.write_bytes(b"OggS")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(media_file)) == str(media_file.resolve())
def test_rejects_existing_file_outside_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
root.mkdir()
secret = tmp_path / "secrets.txt"
secret.write_text("not for upload")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_rejects_symlink_escape_from_safe_root(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
root.mkdir()
secret = tmp_path / "outside.png"
secret.write_bytes(b"secret")
link = root / "safe-looking.png"
try:
link.symlink_to(secret)
except OSError:
pytest.skip("symlink creation is unavailable")
self._patch_roots(monkeypatch, root)
assert BasePlatformAdapter.validate_media_delivery_path(str(link)) is None
def test_filter_keeps_safe_media_and_drops_unsafe(self, tmp_path, monkeypatch):
root = tmp_path / "media-cache"
safe = root / "speech.ogg"
unsafe = tmp_path / "outside.ogg"
safe.parent.mkdir(parents=True)
safe.write_bytes(b"OggS")
unsafe.write_bytes(b"OggS")
self._patch_roots(monkeypatch, root)
filtered = BasePlatformAdapter.filter_media_delivery_paths([
(str(unsafe), False),
(str(safe), True),
])
assert filtered == [(str(safe.resolve()), True)]
def test_allows_operator_configured_extra_root(self, tmp_path, monkeypatch):
extra_root = tmp_path / "operator-media"
media_file = extra_root / "report.pdf"
media_file.parent.mkdir(parents=True)
media_file.write_bytes(b"%PDF-1.4")
self._patch_roots(monkeypatch)
monkeypatch.setenv("HERMES_MEDIA_ALLOW_DIRS", str(extra_root))
assert BasePlatformAdapter.validate_media_delivery_path(str(media_file)) == str(media_file.resolve())
def test_recency_trust_allows_freshly_produced_file(self, tmp_path, monkeypatch):
"""A PDF the agent just wrote to /tmp should be deliverable.
Covers the natural case: agent runs ``pandoc -o /tmp/report.pdf`` or
``write_file('/home/user/report.pdf', ...)`` and asks the gateway to
send the result. With recency trust on, fresh files outside the cache
allowlist are accepted because the file's mtime is within the window.
"""
self._patch_roots(monkeypatch) # zero cache allowlist
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
fresh = tmp_path / "scratch" / "report.pdf"
fresh.parent.mkdir(parents=True)
fresh.write_bytes(b"%PDF-1.4")
assert BasePlatformAdapter.validate_media_delivery_path(str(fresh)) == str(fresh.resolve())
def test_recency_trust_rejects_old_file(self, tmp_path, monkeypatch):
"""A pre-existing host file (~/.bashrc, /etc/passwd shape) is rejected.
Recency trust is the load-bearing anti-injection signal: prompt-injected
paths point at files that have existed for days or months, well outside
the trust window.
"""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "60")
stale = tmp_path / "stale.pdf"
stale.write_bytes(b"%PDF-1.4")
old_mtime = time.time() - 7200 # 2 hours ago
os.utime(stale, (old_mtime, old_mtime))
assert BasePlatformAdapter.validate_media_delivery_path(str(stale)) is None
def test_recency_trust_disabled_falls_back_to_pure_allowlist(self, tmp_path, monkeypatch):
"""Setting trust_recent_files=false reverts to pre-existing strict behavior."""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "0")
fresh = tmp_path / "report.pdf"
fresh.write_bytes(b"%PDF-1.4") # mtime = now
assert BasePlatformAdapter.validate_media_delivery_path(str(fresh)) is None
def test_recency_trust_denies_system_paths_even_when_fresh(self, tmp_path, monkeypatch):
"""A freshly-touched file under /etc must NOT be uploaded.
Belt-and-braces: even if an attacker rewrites the file's mtime
(e.g. via a separately compromised tool result that touches a system
file), the denylist refuses to deliver paths under /etc, /proc, /sys,
~/.ssh, ~/.aws, etc.
"""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
# Simulate $HOME so ~/.ssh resolves into our tmp dir.
fake_home = tmp_path / "home"
ssh_dir = fake_home / ".ssh"
ssh_dir.mkdir(parents=True)
secret = ssh_dir / "id_rsa.txt"
secret.write_bytes(b"-----BEGIN ...") # mtime = now
monkeypatch.setenv("HOME", str(fake_home))
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_recency_trust_allows_pdf_in_project_dir(self, tmp_path, monkeypatch):
"""The motivating case: agent produces a PDF in a project directory.
Reproduces the Discord-PDF-not-delivered bug. Before recency trust,
files outside ~/.hermes/cache/* were silently dropped, leaving the
user with a raw filepath in chat instead of an attachment.
"""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
project = tmp_path / "my-project"
report = project / "build" / "weekly-report.pdf"
report.parent.mkdir(parents=True)
report.write_bytes(b"%PDF-1.4")
assert BasePlatformAdapter.validate_media_delivery_path(str(report)) == str(report.resolve())
def test_filter_keeps_recently_produced_files(self, tmp_path, monkeypatch):
"""End-to-end: filter_local_delivery_paths routes a fresh PDF through."""
self._patch_roots(monkeypatch)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_SECONDS", "600")
fresh = tmp_path / "report.pdf"
fresh.write_bytes(b"%PDF-1.4")
out = BasePlatformAdapter.filter_local_delivery_paths([str(fresh)])
assert out == [str(fresh.resolve())]
class TestMediaDeliveryDefaultMode:
"""Default (non-strict) mode — denylist gates delivery, nothing else.
Symmetric with inbound delivery: Telegram/Discord/Slack accept any
document type the user uploads, and the agent can hand back any file
that isn't a credential. Strict mode is opt-in for operators running
public-facing gateways.
"""
def _patch_roots(self, monkeypatch, *roots):
# Empty cache allowlist so the only positive path through
# validate_media_delivery_path in these tests is the
# default-mode "anything not denied" branch.
monkeypatch.setattr(
"gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS",
tuple(roots),
)
# Pin strict OFF — the public default. Tests that exercise the
# strict path live in TestMediaDeliveryPathValidation.
monkeypatch.delenv("HERMES_MEDIA_DELIVERY_STRICT", raising=False)
monkeypatch.delenv("HERMES_MEDIA_ALLOW_DIRS", raising=False)
def test_accepts_stale_file_outside_allowlist(self, tmp_path, monkeypatch):
"""The motivating case — agent says ``MEDIA:/home/user/notes.md``
for an .md it has been working with for hours. Strict mode would
reject this (outside allowlist, outside recency window). Default
mode delivers it.
"""
self._patch_roots(monkeypatch)
notes = tmp_path / "notes.md"
notes.write_text("# Old notes\n")
old_mtime = time.time() - 7200 # 2 hours ago — far outside any window
os.utime(notes, (old_mtime, old_mtime))
assert BasePlatformAdapter.validate_media_delivery_path(str(notes)) == str(notes.resolve())
def test_accepts_any_extension_not_on_denylist(self, tmp_path, monkeypatch):
"""No extension allowlist — .md, .txt, .json, .py all deliver."""
self._patch_roots(monkeypatch)
for name in ("report.md", "log.txt", "data.json", "script.py", "blob.bin"):
f = tmp_path / name
f.write_bytes(b"x")
assert BasePlatformAdapter.validate_media_delivery_path(str(f)) == str(f.resolve())
def test_denylist_still_blocks_credentials(self, tmp_path, monkeypatch):
"""Default mode is permissive but not naive — credential paths
remain blocked. Simulate $HOME so ~/.ssh resolves into tmp_path.
"""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "home"
ssh_dir = fake_home / ".ssh"
ssh_dir.mkdir(parents=True)
secret = ssh_dir / "id_rsa"
secret.write_bytes(b"-----BEGIN ...")
monkeypatch.setenv("HOME", str(fake_home))
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_denylist_blocks_system_prefixes(self, tmp_path, monkeypatch):
"""Files under /etc, /proc, /sys, /root, /boot, /var/{log,lib,run}
are denied. We construct the test by patching the denylist root
to a tmp dir so we don't need to read /etc.
"""
self._patch_roots(monkeypatch)
fake_etc = tmp_path / "fake-etc"
fake_etc.mkdir()
secret = fake_etc / "shadow"
secret.write_bytes(b"root:!:0:0::/root:/bin/sh")
monkeypatch.setattr(
"gateway.platforms.base._MEDIA_DELIVERY_DENIED_PREFIXES",
(str(fake_etc),),
)
assert BasePlatformAdapter.validate_media_delivery_path(str(secret)) is None
def test_denylist_blocks_hermes_credentials(self, tmp_path, monkeypatch):
"""~/.hermes/.env and ~/.hermes/auth.json stay blocked even in
default mode. They live under $HOME (not the system prefix list)
so this exercises the home-relative denied paths.
"""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "home"
hermes_dir = fake_home / ".hermes"
hermes_dir.mkdir(parents=True)
env_file = hermes_dir / ".env"
env_file.write_text("OPENAI_API_KEY=sk-...")
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setattr(
"gateway.platforms.base._HERMES_HOME",
hermes_dir,
)
assert BasePlatformAdapter.validate_media_delivery_path(str(env_file)) is None
def test_denylist_blocks_hermes_config_in_active_profile(self, tmp_path, monkeypatch):
"""The active profile config stays blocked in default mode."""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "home"
hermes_dir = fake_home / ".hermes"
hermes_dir.mkdir(parents=True)
config_file = hermes_dir / "config.yaml"
config_file.write_text("model:\n provider: openai\n")
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setattr(
"gateway.platforms.base._HERMES_HOME",
hermes_dir,
)
assert BasePlatformAdapter.validate_media_delivery_path(str(config_file)) is None
def test_denylist_blocks_shared_hermes_root_config_for_profiles(self, tmp_path, monkeypatch):
"""Profile-mode gateways must still block the shared Hermes root config."""
self._patch_roots(monkeypatch)
fake_home = tmp_path / "home"
profile_home = fake_home / ".hermes" / "profiles" / "work"
profile_home.mkdir(parents=True)
hermes_root = fake_home / ".hermes"
config_file = hermes_root / "config.yaml"
config_file.write_text("profiles:\n active: work\n")
monkeypatch.setenv("HOME", str(fake_home))
monkeypatch.setattr(
"gateway.platforms.base._HERMES_HOME",
profile_home,
)
monkeypatch.setattr(
"gateway.platforms.base._HERMES_ROOT",
hermes_root,
)
assert BasePlatformAdapter.validate_media_delivery_path(str(config_file)) is None
def test_strict_mode_envvar_restores_legacy_behavior(self, tmp_path, monkeypatch):
"""Setting HERMES_MEDIA_DELIVERY_STRICT=1 reactivates the older
allowlist+recency logic. A stale file outside the allowlist is
rejected.
"""
self._patch_roots(monkeypatch)
monkeypatch.setenv("HERMES_MEDIA_DELIVERY_STRICT", "1")
monkeypatch.setenv("HERMES_MEDIA_TRUST_RECENT_FILES", "0")
stale = tmp_path / "old.pdf"
stale.write_bytes(b"%PDF-1.4")
old_mtime = time.time() - 7200
os.utime(stale, (old_mtime, old_mtime))
assert BasePlatformAdapter.validate_media_delivery_path(str(stale)) is None
def test_strict_mode_truthy_aliases(self, monkeypatch, tmp_path):
"""``HERMES_MEDIA_DELIVERY_STRICT=true|yes|on|1`` all enable strict mode."""
self._patch_roots(monkeypatch)
from gateway.platforms.base import _media_delivery_strict_mode
for raw in ("1", "true", "TRUE", "yes", "on"):
monkeypatch.setenv("HERMES_MEDIA_DELIVERY_STRICT", raw)
assert _media_delivery_strict_mode() is True
for raw in ("0", "false", "no", "off", ""):
monkeypatch.setenv("HERMES_MEDIA_DELIVERY_STRICT", raw)
assert _media_delivery_strict_mode() is False
def test_filter_passes_default_files_through(self, tmp_path, monkeypatch):
"""End-to-end: filter_local_delivery_paths accepts a stale .md in
default mode where strict mode would drop it.
"""
self._patch_roots(monkeypatch)
notes = tmp_path / "notes.md"
notes.write_text("# old\n")
os.utime(notes, (time.time() - 86400, time.time() - 86400))
out = BasePlatformAdapter.filter_local_delivery_paths([str(notes)])
assert out == [str(notes.resolve())]
# ---------------------------------------------------------------------------
# should_send_media_as_audio
# ---------------------------------------------------------------------------
class TestShouldSendMediaAsAudio:
"""Audio-routing policy shared by gateway + scheduler + send_message."""
def test_unknown_extension_returns_false(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio(None, ".png") is False
assert should_send_media_as_audio("telegram", ".pdf") is False
def test_non_telegram_platforms_route_all_audio(self):
from gateway.platforms.base import should_send_media_as_audio
for ext in (".mp3", ".m4a", ".wav", ".flac", ".ogg", ".opus"):
assert should_send_media_as_audio("discord", ext) is True
assert should_send_media_as_audio("slack", ext) is True
def test_telegram_mp3_and_m4a_route_to_audio(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio("telegram", ".mp3") is True
assert should_send_media_as_audio("telegram", ".m4a") is True
def test_telegram_wav_and_flac_fall_through_to_document(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio("telegram", ".wav") is False
assert should_send_media_as_audio("telegram", ".flac") is False
def test_telegram_ogg_opus_only_when_voice_flagged(self):
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio("telegram", ".ogg", is_voice=True) is True
assert should_send_media_as_audio("telegram", ".opus", is_voice=True) is True
assert should_send_media_as_audio("telegram", ".ogg") is False
assert should_send_media_as_audio("telegram", ".opus") is False
def test_accepts_platform_enum(self):
from gateway.config import Platform
from gateway.platforms.base import should_send_media_as_audio
assert should_send_media_as_audio(Platform.TELEGRAM, ".mp3") is True
assert should_send_media_as_audio(Platform.TELEGRAM, ".flac") is False
assert should_send_media_as_audio(Platform.DISCORD, ".flac") is True
# ---------------------------------------------------------------------------
# truncate_message
# ---------------------------------------------------------------------------
class TestTruncateMessage:
def _adapter(self):
"""Create a minimal adapter instance for testing static/instance methods."""
class StubAdapter(BasePlatformAdapter):
async def connect(self):
return True
async def disconnect(self):
pass
async def send(self, *a, **kw):
pass
async def get_chat_info(self, *a):
return {}
from gateway.config import Platform, PlatformConfig
config = PlatformConfig(enabled=True, token="test")
return StubAdapter(config=config, platform=Platform.TELEGRAM)
def test_short_message_single_chunk(self):
adapter = self._adapter()
chunks = adapter.truncate_message("Hello world", max_length=100)
assert chunks == ["Hello world"]
def test_exact_length_single_chunk(self):
adapter = self._adapter()
msg = "x" * 100
chunks = adapter.truncate_message(msg, max_length=100)
assert chunks == [msg]
def test_long_message_splits(self):
adapter = self._adapter()
msg = "word " * 200 # ~1000 chars
chunks = adapter.truncate_message(msg, max_length=200)
assert len(chunks) > 1
# Verify all original content is preserved across chunks
reassembled = "".join(chunks)
# Strip chunk indicators like (1/N) to get raw content
for word in msg.strip().split():
assert word in reassembled, f"Word '{word}' lost during truncation"
def test_chunks_have_indicators(self):
adapter = self._adapter()
msg = "word " * 200
chunks = adapter.truncate_message(msg, max_length=200)
assert "(1/" in chunks[0]
assert f"({len(chunks)}/{len(chunks)})" in chunks[-1]
def test_code_block_first_chunk_closed(self):
adapter = self._adapter()
msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
chunks = adapter.truncate_message(msg, max_length=300)
assert len(chunks) > 1
# First chunk must have a closing fence appended (code block was split)
first_fences = chunks[0].count("```")
assert first_fences == 2, "First chunk should have opening + closing fence"
def test_code_block_language_tag_carried(self):
adapter = self._adapter()
msg = "Start\n```javascript\n" + "console.log('x');\n" * 80 + "```\nEnd"
chunks = adapter.truncate_message(msg, max_length=300)
if len(chunks) > 1:
# At least one continuation chunk should reopen with ```javascript
reopened_with_lang = any("```javascript" in chunk for chunk in chunks[1:])
assert reopened_with_lang, (
"No continuation chunk reopened with language tag"
)
def test_continuation_chunks_have_balanced_fences(self):
"""Regression: continuation chunks must close reopened code blocks."""
adapter = self._adapter()
msg = "Before\n```python\n" + "x = 1\n" * 100 + "```\nAfter"
chunks = adapter.truncate_message(msg, max_length=300)
assert len(chunks) > 1
for i, chunk in enumerate(chunks):
fence_count = chunk.count("```")
assert fence_count % 2 == 0, (
f"Chunk {i} has unbalanced fences ({fence_count})"
)
def test_each_chunk_under_max_length(self):
adapter = self._adapter()
msg = "word " * 500
max_len = 200
chunks = adapter.truncate_message(msg, max_length=max_len)
for i, chunk in enumerate(chunks):
assert len(chunk) <= max_len + 20, (
f"Chunk {i} too long: {len(chunk)} > {max_len}"
)
# ---------------------------------------------------------------------------
# _get_human_delay
# ---------------------------------------------------------------------------
class TestGetHumanDelay:
def test_off_mode(self):
with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "off"}):
assert BasePlatformAdapter._get_human_delay() == 0.0
def test_default_is_off(self):
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("HERMES_HUMAN_DELAY_MODE", None)
assert BasePlatformAdapter._get_human_delay() == 0.0
def test_natural_mode_range(self):
with patch.dict(os.environ, {"HERMES_HUMAN_DELAY_MODE": "natural"}):
delay = BasePlatformAdapter._get_human_delay()
assert 0.8 <= delay <= 2.5
def test_natural_mode_ignores_malformed_custom_env_vars(self):
env = {
"HERMES_HUMAN_DELAY_MODE": "natural",
"HERMES_HUMAN_DELAY_MIN_MS": "oops",
"HERMES_HUMAN_DELAY_MAX_MS": "still-bad",
}
with patch.dict(os.environ, env):
delay = BasePlatformAdapter._get_human_delay()
assert 0.8 <= delay <= 2.5
def test_custom_mode_uses_env_vars(self):
env = {
"HERMES_HUMAN_DELAY_MODE": "custom",
"HERMES_HUMAN_DELAY_MIN_MS": "100",
"HERMES_HUMAN_DELAY_MAX_MS": "200",
}
with patch.dict(os.environ, env):
delay = BasePlatformAdapter._get_human_delay()
assert 0.1 <= delay <= 0.2
def test_custom_mode_tolerates_malformed_env_vars(self):
env = {
"HERMES_HUMAN_DELAY_MODE": "custom",
"HERMES_HUMAN_DELAY_MIN_MS": "oops",
"HERMES_HUMAN_DELAY_MAX_MS": "still-bad",
}
with patch.dict(os.environ, env):
# falls back to the custom-mode defaults instead of crashing
delay = BasePlatformAdapter._get_human_delay()
assert 0.8 <= delay <= 2.5
# ---------------------------------------------------------------------------
# utf16_len / _prefix_within_utf16_limit / truncate_message with len_fn
# ---------------------------------------------------------------------------
# Ported from nearai/ironclaw#2304 — Telegram counts message length in UTF-16
# code units, not Unicode code-points. Astral-plane characters (emoji, CJK
# Extension B) are surrogate pairs: 1 Python char but 2 UTF-16 units.
class TestUtf16Len:
"""Verify the UTF-16 length helper."""
def test_ascii(self):
assert utf16_len("hello") == 5
def test_bmp_cjk(self):
# CJK ideographs in the BMP are 1 code unit each
assert utf16_len("你好") == 2
def test_emoji_surrogate_pair(self):
# 😀 (U+1F600) is outside BMP → 2 UTF-16 code units
assert utf16_len("😀") == 2
def test_mixed(self):
# "hi😀" = 2 + 2 = 4 UTF-16 units
assert utf16_len("hi😀") == 4
def test_musical_symbol(self):
# 𝄞 (U+1D11E) — Musical Symbol G Clef, surrogate pair
assert utf16_len("𝄞") == 2
def test_empty(self):
assert utf16_len("") == 0
class TestPrefixWithinUtf16Limit:
"""Verify UTF-16-aware prefix truncation."""
def test_fits_entirely(self):
assert _prefix_within_utf16_limit("hello", 10) == "hello"
def test_ascii_truncation(self):
result = _prefix_within_utf16_limit("hello world", 5)
assert result == "hello"
assert utf16_len(result) <= 5
def test_does_not_split_surrogate_pair(self):
# "a😀b" = 1 + 2 + 1 = 4 UTF-16 units; limit 2 should give "a"
result = _prefix_within_utf16_limit("a😀b", 2)
assert result == "a"
assert utf16_len(result) <= 2
def test_emoji_at_limit(self):
# "😀" = 2 UTF-16 units; limit 2 should include it
result = _prefix_within_utf16_limit("😀x", 2)
assert result == "😀"
def test_all_emoji(self):
msg = "😀" * 10 # 20 UTF-16 units
result = _prefix_within_utf16_limit(msg, 6)
assert result == "😀😀😀"
assert utf16_len(result) == 6
def test_empty(self):
assert _prefix_within_utf16_limit("", 5) == ""
class TestTruncateMessageUtf16:
"""Verify truncate_message respects UTF-16 lengths when len_fn=utf16_len."""
def test_short_emoji_message_no_split(self):
"""A short message under the UTF-16 limit should not be split."""
msg = "Hello 😀 world"
chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len)
assert len(chunks) == 1
assert chunks[0] == msg
def test_emoji_near_limit_triggers_split(self):
"""A message at 4096 codepoints but >4096 UTF-16 units must split."""
# 2049 emoji = 2049 codepoints but 4098 UTF-16 units → exceeds 4096
msg = "😀" * 2049
assert len(msg) == 2049 # Python len sees 2049 chars
assert utf16_len(msg) == 4098 # but it's 4098 UTF-16 units
# Without UTF-16 awareness, this would NOT split (2049 < 4096)
chunks_naive = BasePlatformAdapter.truncate_message(msg, 4096)
assert len(chunks_naive) == 1, "Without len_fn, no split expected"
# With UTF-16 awareness, it MUST split
chunks = BasePlatformAdapter.truncate_message(msg, 4096, len_fn=utf16_len)
assert len(chunks) > 1, "With utf16_len, message should be split"
# Each chunk must fit within the UTF-16 limit
for i, chunk in enumerate(chunks):
assert utf16_len(chunk) <= 4096, (
f"Chunk {i} exceeds 4096 UTF-16 units: {utf16_len(chunk)}"
)
def test_each_utf16_chunk_within_limit(self):
"""All chunks produced with utf16_len must fit the limit."""
# Mix of BMP and astral-plane characters
msg = ("Hello 😀 world 🎵 test 𝄞 " * 200).strip()
max_len = 200
chunks = BasePlatformAdapter.truncate_message(msg, max_len, len_fn=utf16_len)
for i, chunk in enumerate(chunks):
u16_len = utf16_len(chunk)
assert u16_len <= max_len + 20, (
f"Chunk {i} UTF-16 length {u16_len} exceeds {max_len}"
)
def test_all_content_preserved(self):
"""Splitting with utf16_len must not lose content."""
words = ["emoji😀", "music🎵", "cjk你好", "plain"] * 100
msg = " ".join(words)
chunks = BasePlatformAdapter.truncate_message(msg, 200, len_fn=utf16_len)
reassembled = " ".join(chunks)
for word in words:
assert word in reassembled, f"Word '{word}' lost during UTF-16 split"
def test_code_blocks_preserved_with_utf16(self):
"""Code block fence handling should work with utf16_len too."""
msg = "Before\n```python\n" + "x = '😀'\n" * 200 + "```\nAfter"
chunks = BasePlatformAdapter.truncate_message(msg, 300, len_fn=utf16_len)
assert len(chunks) > 1
# Each chunk should have balanced fences
for i, chunk in enumerate(chunks):
fence_count = chunk.count("```")
assert fence_count % 2 == 0, (
f"Chunk {i} has unbalanced fences ({fence_count})"
)
class TestProxyKwargsForAiohttp:
"""Verify proxy_kwargs_for_aiohttp routes all schemes through ProxyConnector."""
def test_none_returns_empty(self):
from gateway.platforms.base import proxy_kwargs_for_aiohttp
sess_kw, req_kw = proxy_kwargs_for_aiohttp(None)
assert sess_kw == {}
assert req_kw == {}
def test_http_proxy_uses_connector_when_aiohttp_socks_available(self):
pytest.importorskip("aiohttp_socks")
from unittest.mock import MagicMock
from gateway.platforms.base import proxy_kwargs_for_aiohttp
sentinel = MagicMock(name="ProxyConnector")
with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel):
sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
assert sess_kw.get("connector") is sentinel, (
"HTTP proxy must use ProxyConnector so libraries that don't "
"forward per-request proxy= kwargs still route through the proxy"
)
assert req_kw == {}
def test_socks_proxy_uses_connector(self):
pytest.importorskip("aiohttp_socks")
from unittest.mock import MagicMock
from gateway.platforms.base import proxy_kwargs_for_aiohttp
sentinel = MagicMock(name="ProxyConnector")
with patch("aiohttp_socks.ProxyConnector.from_url", return_value=sentinel):
sess_kw, req_kw = proxy_kwargs_for_aiohttp("socks5://proxy:1080")
assert sess_kw.get("connector") is sentinel
assert req_kw == {}
def test_http_proxy_falls_back_without_aiohttp_socks(self):
from gateway.platforms.base import proxy_kwargs_for_aiohttp
with patch.dict("sys.modules", {"aiohttp_socks": None}):
sess_kw, req_kw = proxy_kwargs_for_aiohttp("http://proxy:8080")
assert sess_kw == {}
assert req_kw == {"proxy": "http://proxy:8080"}
class TestMediaDeliveryDiagnosability:
"""Diagnosable rejection logging + crafted-path robustness (#33251)."""
def test_rejected_path_appears_in_log(self, tmp_path, caplog):
outside = tmp_path / "outside.ogg"
outside.write_bytes(b"OggS")
with patch.dict(os.environ, {"HERMES_MEDIA_DELIVERY_STRICT": "1",
"HERMES_MEDIA_TRUST_RECENT_FILES": "0"}), \
patch("gateway.platforms.base.MEDIA_DELIVERY_SAFE_ROOTS", ()):
with caplog.at_level("WARNING"):
out = BasePlatformAdapter.filter_media_delivery_paths([(str(outside), False)])
assert out == []
# The dropped path must be in the log so operators can diagnose it.
assert str(outside) in caplog.text
def test_crafted_null_path_does_not_abort_batch(self, tmp_path, monkeypatch):
"""One crafted ~\\x00 path must not drop every other attachment."""
good = tmp_path / "good.png"
good.write_bytes(b"\x89PNG")
monkeypatch.setenv("HERMES_MEDIA_DELIVERY_STRICT", "0")
out = BasePlatformAdapter.filter_media_delivery_paths([
("~\x00evil.png", False),
(str(good), False),
])
assert out == [(str(good.resolve()), False)]
def test_extract_media_tolerates_crafted_null_path(self):
"""extract_media must not raise on a crafted ~\\x00 MEDIA tag."""
content = "here\nMEDIA:`~\x00evil.png`\ntrailing"
# Must not raise ValueError("embedded null byte").
media, cleaned = BasePlatformAdapter.extract_media(content)
assert all("\x00" not in p for p, _ in media)
def test_log_safe_path_neutralises_line_breaks(self):
forged = "/tmp/a.png\nWARNING forged second line"
assert "\n" not in _log_safe_path(forged)
# Unicode separators that split log lines are also neutralised.
for sep in ("\u2028", "\u2029", "\x85"):
assert sep not in _log_safe_path(f"/tmp/a{sep}b.png")
def test_canonical_cache_roots_present(self):
from gateway.platforms.base import MEDIA_DELIVERY_SAFE_ROOTS
roots = {str(r) for r in MEDIA_DELIVERY_SAFE_ROOTS}
assert any(r.endswith("cache/images") for r in roots)
assert any(r.endswith("cache/documents") for r in roots)
# Legacy layout still present.
assert any(r.endswith("image_cache") for r in roots)