"""Tests for GatewayStreamConsumer — media directive stripping in streaming.""" import asyncio from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock import pytest from gateway.stream_consumer import GatewayStreamConsumer, StreamConsumerConfig # ── _clean_for_display unit tests ──────────────────────────────────────── class TestCleanForDisplay: """Verify MEDIA: directives and internal markers are stripped from display text.""" def test_no_media_passthrough(self): """Text without MEDIA: passes through unchanged.""" text = "Here is your analysis of the image." assert GatewayStreamConsumer._clean_for_display(text) == text def test_media_tag_stripped(self): """Basic MEDIA: tag is removed.""" text = "Here is the image\nMEDIA:/tmp/hermes/image.png" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "Here is the image" in result def test_media_tag_with_space(self): """MEDIA: tag with space after colon is removed.""" text = "Audio generated\nMEDIA: /home/user/.hermes/audio_cache/voice.mp3" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "Audio generated" in result def test_media_tag_with_quotes(self): """MEDIA: tags wrapped in quotes or backticks are removed.""" for wrapper in ['`MEDIA:/path/file.png`', '"MEDIA:/path/file.png"', "'MEDIA:/path/file.png'"]: text = f"Result: {wrapper}" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result, f"Failed for wrapper: {wrapper}" def test_audio_as_voice_stripped(self): """[[audio_as_voice]] directive is removed.""" text = "[[audio_as_voice]]\nMEDIA:/tmp/voice.ogg" result = GatewayStreamConsumer._clean_for_display(text) assert "[[audio_as_voice]]" not in result assert "MEDIA:" not in result def test_multiple_media_tags(self): """Multiple MEDIA: tags are all removed.""" text = "Here are two files:\nMEDIA:/tmp/a.png\nMEDIA:/tmp/b.jpg" result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "Here are two files:" in result def test_excessive_newlines_collapsed(self): """Blank lines left by removed tags are collapsed.""" text = "Before\n\n\nMEDIA:/tmp/file.png\n\n\nAfter" result = GatewayStreamConsumer._clean_for_display(text) # Should not have 3+ consecutive newlines assert "\n\n\n" not in result def test_media_only_response(self): """Response that is entirely MEDIA: tags returns empty/whitespace.""" text = "MEDIA:/tmp/image.png" result = GatewayStreamConsumer._clean_for_display(text) assert result.strip() == "" def test_media_mid_sentence(self): """MEDIA: tag embedded in prose is stripped cleanly.""" text = "I generated this image MEDIA:/tmp/art.png for you." result = GatewayStreamConsumer._clean_for_display(text) assert "MEDIA:" not in result assert "generated" in result assert "for you." in result def test_preserves_non_media_colons(self): """Normal colons and text with 'MEDIA' as a word aren't stripped.""" text = "The media: files are stored in /tmp. Use social MEDIA carefully." result = GatewayStreamConsumer._clean_for_display(text) # "MEDIA:" in upper case without a path won't match \S+ (space follows) # But "media:" is lowercase so won't match either assert result == text # ── Integration: _send_or_edit strips MEDIA: ───────────────────────────── class TestSendOrEditMediaStripping: """Verify _send_or_edit strips MEDIA: before sending to the platform.""" @pytest.mark.asyncio async def test_first_send_strips_media(self): """Initial send removes MEDIA: tags from visible text.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") adapter.send = AsyncMock(return_value=send_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer(adapter, "chat_123") await consumer._send_or_edit("Here is your image\nMEDIA:/tmp/test.png") adapter.send.assert_called_once() sent_text = adapter.send.call_args[1]["content"] assert "MEDIA:" not in sent_text assert "Here is your image" in sent_text @pytest.mark.asyncio async def test_edit_strips_media(self): """Edit call removes MEDIA: tags from visible text.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") edit_result = SimpleNamespace(success=True) adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer(adapter, "chat_123") # First send await consumer._send_or_edit("Starting response...") # Edit with MEDIA: tag await consumer._send_or_edit("Here is the result\nMEDIA:/tmp/image.png") adapter.edit_message.assert_called_once() edited_text = adapter.edit_message.call_args[1]["content"] assert "MEDIA:" not in edited_text @pytest.mark.asyncio async def test_media_only_skips_send(self): """If text is entirely MEDIA: tags, the send is skipped.""" adapter = MagicMock() adapter.send = AsyncMock() adapter.MAX_MESSAGE_LENGTH = 4096 consumer = GatewayStreamConsumer(adapter, "chat_123") await consumer._send_or_edit("MEDIA:/tmp/image.png") adapter.send.assert_not_called() # ── Integration: full stream run ───────────────────────────────────────── class TestStreamRunMediaStripping: """End-to-end: deltas with MEDIA: produce clean visible text.""" @pytest.mark.asyncio async def test_stream_with_media_tag(self): """Full stream run strips MEDIA: from the final visible message.""" adapter = MagicMock() send_result = SimpleNamespace(success=True, message_id="msg_1") edit_result = SimpleNamespace(success=True) adapter.send = AsyncMock(return_value=send_result) adapter.edit_message = AsyncMock(return_value=edit_result) adapter.MAX_MESSAGE_LENGTH = 4096 config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=5) consumer = GatewayStreamConsumer(adapter, "chat_123", config) # Feed deltas consumer.on_delta("Here is your generated image\n") consumer.on_delta("MEDIA:/home/user/.hermes/cache/images/abc123.png") consumer.finish() await consumer.run() # Verify the final text sent/edited doesn't contain MEDIA: all_calls = [] for call in adapter.send.call_args_list: all_calls.append(call[1].get("content", "")) for call in adapter.edit_message.call_args_list: all_calls.append(call[1].get("content", "")) for sent_text in all_calls: assert "MEDIA:" not in sent_text, f"MEDIA: leaked into display: {sent_text!r}" assert consumer.already_sent