diff --git a/gateway/run.py b/gateway/run.py index 73700e3b529..08415eb8629 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -9134,7 +9134,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # the NEW session so the old transcript stays intact # and searchable via session_search. _hyg_new_sid = _hyg_agent.session_id - if _hyg_new_sid != session_entry.session_id: + _hyg_rotated = _hyg_new_sid != session_entry.session_id + _hyg_in_place = bool( + getattr(_hyg_agent, "compression_in_place", False) + ) + if _hyg_rotated: session_entry.session_id = _hyg_new_sid self.session_store._save() self._sync_telegram_topic_binding( @@ -9142,16 +9146,41 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew reason="hygiene-compression", ) - self.session_store.rewrite_transcript( - session_entry.session_id, _compressed - ) - # Reset stored token count — transcript was rewritten - session_entry.last_prompt_tokens = 0 - history = _compressed - _new_count = len(_compressed) - _new_tokens = estimate_messages_tokens_rough( - _compressed - ) + # Only rewrite the transcript when rotation produced + # a NEW session id OR in-place compaction succeeded. + # The danger this guards against (mirrors the + # /compress fix #44794/#39704): the hygiene agent is + # built WITHOUT a session_db, so _compress_context + # cannot rotate — if it also wasn't in-place, the + # session_id is unchanged for a FAILURE reason, and an + # unconditional rewrite_transcript() would DELETE the + # original messages and replace them with only the + # compressed summary (permanent data loss, #21301). + if _hyg_rotated or _hyg_in_place: + self.session_store.rewrite_transcript( + session_entry.session_id, _compressed + ) + # Reset stored token count — transcript rewritten + session_entry.last_prompt_tokens = 0 + history = _compressed + _new_count = len(_compressed) + _new_tokens = estimate_messages_tokens_rough( + _compressed + ) + else: + # No rewrite happened — transcript preserved + # unchanged, so the post-compression counts equal + # the pre-compression ones. + _new_count = _msg_count + _new_tokens = _approx_tokens + logger.warning( + "Gateway hygiene compression for session %s " + "did not rotate or compact in place " + "(no session_db on the hygiene agent) — " + "preserving the original transcript instead " + "of overwriting it with the summary (#21301).", + session_entry.session_id, + ) logger.info( "Session hygiene: compressed %s → %s msgs, " diff --git a/tests/gateway/test_session_hygiene.py b/tests/gateway/test_session_hygiene.py index b54f588cb10..fee815d2203 100644 --- a/tests/gateway/test_session_hygiene.py +++ b/tests/gateway/test_session_hygiene.py @@ -395,6 +395,105 @@ async def test_session_hygiene_messages_stay_in_originating_topic(monkeypatch, t FakeCompressAgent.last_instance.close.assert_called_once() +@pytest.mark.asyncio +async def test_session_hygiene_preserves_transcript_when_no_rotation(monkeypatch, tmp_path): + """Regression for #21301: the hygiene agent is built without a session_db, + so _compress_context cannot rotate. When it neither rotates NOR compacts + in place, the transcript MUST be preserved — an unconditional + rewrite_transcript() would replace the original messages with only the + summary (permanent data loss). Mirrors the /compress guard (#44794).""" + fake_dotenv = types.ModuleType("dotenv") + fake_dotenv.load_dotenv = lambda *args, **kwargs: None + monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv) + + class NonRotatingCompressAgent: + last_instance = None + + def __init__(self, **kwargs): + self.model = kwargs.get("model") + self.session_id = kwargs.get("session_id", "fake-session") + self.compression_in_place = False # not in-place either + self._print_fn = None + self.shutdown_memory_provider = MagicMock() + self.close = MagicMock() + type(self).last_instance = self + + def _compress_context(self, messages, *_args, **_kwargs): + # No session_db → cannot rotate: session_id is UNCHANGED, and this + # is a failure-to-rotate, not an in-place success. + return ([{"role": "assistant", "content": "summary only"}], None) + + fake_run_agent = types.ModuleType("run_agent") + fake_run_agent.AIAgent = NonRotatingCompressAgent + monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent) + + gateway_run = importlib.import_module("gateway.run") + GatewayRunner = gateway_run.GatewayRunner + + adapter = HygieneCaptureAdapter() + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")} + ) + runner.adapters = {Platform.TELEGRAM: adapter} + runner._voice_mode = {} + runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False) + runner.session_store = MagicMock() + runner.session_store.get_or_create_session.return_value = SessionEntry( + session_key="agent:main:telegram:group:-1001:17585", + session_id="sess-1", + created_at=datetime.now(), + updated_at=datetime.now(), + platform=Platform.TELEGRAM, + chat_type="group", + ) + runner.session_store.load_transcript.return_value = _make_history(6, content_size=400) + runner.session_store.has_any_sessions.return_value = True + runner.session_store.rewrite_transcript = MagicMock() + runner.session_store.append_to_transcript = MagicMock() + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._session_db = None + runner._is_user_authorized = lambda _source: True + runner._set_session_env = lambda _context: None + runner._run_agent = AsyncMock( + return_value={ + "final_response": "ok", + "messages": [], + "tools": [], + "history_offset": 0, + "last_prompt_tokens": 0, + } + ) + + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"}) + monkeypatch.setattr( + "agent.model_metadata.get_model_context_length", + lambda *_args, **_kwargs: 100, + ) + monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "795544298") + + event = MessageEvent( + text="hello", + source=SessionSource( + platform=Platform.TELEGRAM, + chat_id="-1001", + chat_type="group", + thread_id="17585", + user_id="12345", + ), + message_id="1", + ) + + result = await runner._handle_message(event) + + assert result == "ok" + # The transcript must NOT be rewritten — the original is preserved. + runner.session_store.rewrite_transcript.assert_not_called() + + @pytest.mark.asyncio async def test_session_hygiene_warns_user_when_compression_aborts(monkeypatch, tmp_path): """When auxiliary compression's summary LLM call fails, the compressor