diff --git a/gateway/run.py b/gateway/run.py index 0b5e3a1b4d..c094fddd63 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5739,7 +5739,9 @@ class GatewayRunner: # If so, update the session store entry so the NEXT message loads # the compressed transcript, not the stale pre-compression one. agent = agent_holder[0] + _session_was_split = False if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id: + _session_was_split = True logger.info( "Session split detected: %s → %s (compression)", session_id, agent.session_id, @@ -5751,6 +5753,13 @@ class GatewayRunner: effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id + # When compression created a new session, the messages list was + # shortened. Using the original history offset would produce an + # empty new_messages slice, causing the gateway to write only a + # user/assistant pair — losing the compressed summary and tail. + # Reset to 0 so the gateway writes ALL compressed messages. + _effective_history_offset = 0 if _session_was_split else len(agent_history) + # Auto-generate session title after first exchange (non-blocking) if final_response and self._session_db: try: @@ -5772,7 +5781,7 @@ class GatewayRunner: "messages": result_holder[0].get("messages", []) if result_holder[0] else [], "api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0, "tools": tools_holder[0] or [], - "history_offset": len(agent_history), + "history_offset": _effective_history_offset, "last_prompt_tokens": _last_prompt_toks, "input_tokens": _input_toks, "output_tokens": _output_toks, diff --git a/run_agent.py b/run_agent.py index 794c9f67ab..fad7fca5a0 100644 --- a/run_agent.py +++ b/run_agent.py @@ -6250,6 +6250,12 @@ class AIAgent: ) if len(messages) >= _orig_len: break # Cannot compress further + # Compression created a new session — clear the history + # reference so _flush_messages_to_session_db writes ALL + # compressed messages to the new session's SQLite, not + # skipping them because conversation_history is still the + # pre-compression length. + conversation_history = None # Re-estimate after compression _preflight_tokens = estimate_request_tokens_rough( messages, @@ -7765,6 +7771,10 @@ class AIAgent: approx_tokens=self.context_compressor.last_prompt_tokens, task_id=effective_task_id, ) + # Compression created a new session — clear history so + # _flush_messages_to_session_db writes compressed messages + # to the new session (see preflight compression comment). + conversation_history = None # Save session log incrementally (so progress is visible even if interrupted) self._session_messages = messages diff --git a/tests/test_compression_persistence.py b/tests/test_compression_persistence.py new file mode 100644 index 0000000000..272b39bfea --- /dev/null +++ b/tests/test_compression_persistence.py @@ -0,0 +1,202 @@ +"""Tests for context compression persistence in the gateway. + +Verifies that when context compression fires during run_conversation(), +the compressed messages are properly persisted to both SQLite (via the +agent) and JSONL (via the gateway). + +Bug scenario (pre-fix): + 1. Gateway loads 200-message history, passes to agent + 2. Agent's run_conversation() compresses to ~30 messages mid-run + 3. _compress_context() resets _last_flushed_db_idx = 0 + 4. On exit, _flush_messages_to_session_db() calculates: + flush_from = max(len(conversation_history=200), _last_flushed_db_idx=0) = 200 + 5. messages[200:] is empty (only ~30 messages after compression) + 6. Nothing written to new session's SQLite — compressed context lost + 7. Gateway's history_offset was still 200, producing empty new_messages + 8. Fallback wrote only user/assistant pair — summary lost +""" + +import os +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Part 1: Agent-side — _flush_messages_to_session_db after compression +# --------------------------------------------------------------------------- + +class TestFlushAfterCompression: + """Verify that compressed messages are flushed to the new session's SQLite + even when conversation_history (from the original session) is longer than + the compressed messages list.""" + + def _make_agent(self, session_db): + with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): + from run_agent import AIAgent + agent = AIAgent( + model="test/model", + quiet_mode=True, + session_db=session_db, + session_id="original-session", + skip_context_files=True, + skip_memory=True, + ) + return agent + + def test_flush_after_compression_with_long_history(self): + """The actual bug: conversation_history longer than compressed messages. + + Before the fix, flush_from = max(len(conversation_history), 0) = 200, + but messages only has ~30 entries, so messages[200:] is empty. + After the fix, conversation_history is cleared to None after compression, + so flush_from = max(0, 0) = 0, and ALL compressed messages are written. + """ + from hermes_state import SessionDB + + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + db = SessionDB(db_path=db_path) + + agent = self._make_agent(db) + + # Simulate the original long history (200 messages) + original_history = [ + {"role": "user" if i % 2 == 0 else "assistant", + "content": f"message {i}"} + for i in range(200) + ] + + # First, flush original messages to the original session + agent._flush_messages_to_session_db(original_history, []) + original_rows = db.get_messages("original-session") + assert len(original_rows) == 200 + + # Now simulate compression: new session, reset idx, shorter messages + agent.session_id = "compressed-session" + db.create_session(session_id="compressed-session", source="test") + agent._last_flushed_db_idx = 0 + + # The compressed messages (summary + tail + new turn) + compressed_messages = [ + {"role": "user", "content": "[CONTEXT COMPACTION] Summary of work..."}, + {"role": "user", "content": "What should we do next?"}, + {"role": "assistant", "content": "Let me check..."}, + {"role": "user", "content": "new question"}, + {"role": "assistant", "content": "new answer"}, + ] + + # THE BUG: passing the original history as conversation_history + # causes flush_from = max(200, 0) = 200, skipping everything. + # After the fix, conversation_history should be None. + agent._flush_messages_to_session_db(compressed_messages, None) + + new_rows = db.get_messages("compressed-session") + assert len(new_rows) == 5, ( + f"Expected 5 compressed messages in new session, got {len(new_rows)}. " + f"Compression persistence bug: messages not written to SQLite." + ) + + def test_flush_with_stale_history_loses_messages(self): + """Demonstrates the bug condition: stale conversation_history causes data loss.""" + from hermes_state import SessionDB + + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test.db" + db = SessionDB(db_path=db_path) + + agent = self._make_agent(db) + + # Simulate compression reset + agent.session_id = "new-session" + db.create_session(session_id="new-session", source="test") + agent._last_flushed_db_idx = 0 + + compressed = [ + {"role": "user", "content": "summary"}, + {"role": "assistant", "content": "continuing..."}, + ] + + # Bug: passing a conversation_history longer than compressed messages + stale_history = [{"role": "user", "content": f"msg{i}"} for i in range(100)] + agent._flush_messages_to_session_db(compressed, stale_history) + + rows = db.get_messages("new-session") + # With the stale history, flush_from = max(100, 0) = 100 + # But compressed only has 2 entries → messages[100:] = empty + assert len(rows) == 0, ( + "Expected 0 messages with stale conversation_history " + "(this test verifies the bug condition exists)" + ) + + +# --------------------------------------------------------------------------- +# Part 2: Gateway-side — history_offset after session split +# --------------------------------------------------------------------------- + +class TestGatewayHistoryOffsetAfterSplit: + """Verify that when the agent creates a new session during compression, + the gateway uses history_offset=0 so all compressed messages are written + to the JSONL transcript.""" + + def test_history_offset_zero_on_session_split(self): + """When agent.session_id differs from the original, history_offset must be 0.""" + # This tests the logic in gateway/run.py run_sync(): + # _session_was_split = agent.session_id != session_id + # _effective_history_offset = 0 if _session_was_split else len(agent_history) + + original_session_id = "session-abc" + agent_session_id = "session-compressed-xyz" # Different = compression happened + agent_history_len = 200 + + # Simulate the gateway's offset calculation (post-fix) + _session_was_split = (agent_session_id != original_session_id) + _effective_history_offset = 0 if _session_was_split else agent_history_len + + assert _session_was_split is True + assert _effective_history_offset == 0 + + def test_history_offset_preserved_without_split(self): + """When no compression happened, history_offset is the original length.""" + session_id = "session-abc" + agent_session_id = "session-abc" # Same = no compression + agent_history_len = 200 + + _session_was_split = (agent_session_id != session_id) + _effective_history_offset = 0 if _session_was_split else agent_history_len + + assert _session_was_split is False + assert _effective_history_offset == 200 + + def test_new_messages_extraction_after_split(self): + """After compression with offset=0, new_messages should be ALL agent messages.""" + # Simulates the gateway's new_messages calculation + agent_messages = [ + {"role": "user", "content": "[CONTEXT COMPACTION] Summary..."}, + {"role": "user", "content": "recent question"}, + {"role": "assistant", "content": "recent answer"}, + {"role": "user", "content": "new question"}, + {"role": "assistant", "content": "new answer"}, + ] + history_offset = 0 # After fix: 0 on session split + + new_messages = agent_messages[history_offset:] if len(agent_messages) > history_offset else [] + assert len(new_messages) == 5, ( + f"Expected all 5 messages with offset=0, got {len(new_messages)}" + ) + + def test_new_messages_empty_with_stale_offset(self): + """Demonstrates the bug: stale offset produces empty new_messages.""" + agent_messages = [ + {"role": "user", "content": "summary"}, + {"role": "assistant", "content": "answer"}, + ] + # Bug: offset is the pre-compression history length + history_offset = 200 + + new_messages = agent_messages[history_offset:] if len(agent_messages) > history_offset else [] + assert len(new_messages) == 0, ( + "Expected 0 messages with stale offset=200 (demonstrates the bug)" + )