"""Tests for gateway /compress user-facing messaging.""" from datetime import datetime from unittest.mock import MagicMock, patch import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig from gateway.platforms.base import MessageEvent from gateway.session import SessionEntry, SessionSource, build_session_key def _make_source() -> SessionSource: return SessionSource( platform=Platform.TELEGRAM, user_id="u1", chat_id="c1", user_name="tester", chat_type="dm", ) def _make_event(text: str = "/compress") -> MessageEvent: return MessageEvent(text=text, source=_make_source(), message_id="m1") def _make_history() -> list[dict[str, str]]: return [ {"role": "user", "content": "one"}, {"role": "assistant", "content": "two"}, {"role": "user", "content": "three"}, {"role": "assistant", "content": "four"}, ] def _make_runner(history: list[dict[str, str]]): from gateway.run import GatewayRunner runner = object.__new__(GatewayRunner) runner.config = GatewayConfig( platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} ) session_entry = SessionEntry( session_key=build_session_key(_make_source()), session_id="sess-1", created_at=datetime.now(), updated_at=datetime.now(), platform=Platform.TELEGRAM, chat_type="dm", ) runner.session_store = MagicMock() runner.session_store.get_or_create_session.return_value = session_entry runner.session_store.load_transcript.return_value = history runner.session_store.rewrite_transcript = MagicMock() runner.session_store.update_session = MagicMock() runner.session_store._save = MagicMock() return runner @pytest.mark.asyncio async def test_compress_command_reports_noop_without_success_banner(): history = _make_history() runner = _make_runner(history) agent_instance = MagicMock() agent_instance.shutdown_memory_provider = MagicMock() agent_instance.close = MagicMock() agent_instance.context_compressor.has_content_to_compress.return_value = True agent_instance.session_id = "sess-1" agent_instance._compress_context.return_value = (list(history), "") def _estimate(messages): assert messages == history return 100 with ( patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "test-key"}), patch("gateway.run._resolve_gateway_model", return_value="test-model"), patch("run_agent.AIAgent", return_value=agent_instance), patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate), ): result = await runner._handle_compress_command(_make_event()) assert "No changes from compression" in result assert "Compressed:" not in result assert "Rough transcript estimate: ~100 tokens (unchanged)" in result agent_instance.shutdown_memory_provider.assert_called_once() agent_instance.close.assert_called_once() @pytest.mark.asyncio async def test_compress_command_explains_when_token_estimate_rises(): history = _make_history() compressed = [ history[0], {"role": "assistant", "content": "Dense summary that still counts as more tokens."}, history[-1], ] runner = _make_runner(history) agent_instance = MagicMock() agent_instance.shutdown_memory_provider = MagicMock() agent_instance.close = MagicMock() agent_instance.context_compressor.has_content_to_compress.return_value = True agent_instance.session_id = "sess-1" agent_instance._compress_context.return_value = (compressed, "") def _estimate(messages): if messages == history: return 100 if messages == compressed: return 120 raise AssertionError(f"unexpected transcript: {messages!r}") with ( patch("gateway.run._resolve_runtime_agent_kwargs", return_value={"api_key": "test-key"}), patch("gateway.run._resolve_gateway_model", return_value="test-model"), patch("run_agent.AIAgent", return_value=agent_instance), patch("agent.model_metadata.estimate_messages_tokens_rough", side_effect=_estimate), ): result = await runner._handle_compress_command(_make_event()) assert "Compressed: 4 → 3 messages" in result assert "Rough transcript estimate: ~100 → ~120 tokens" in result assert "denser summaries" in result agent_instance.shutdown_memory_provider.assert_called_once() agent_instance.close.assert_called_once()