mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix: persist compressed context to gateway session after mid-run compression
When context compression fires during run_conversation() in the gateway, the compressed messages were silently lost on the next turn. Two bugs: 1. Agent-side: _flush_messages_to_session_db() calculated flush_from = max(len(conversation_history), _last_flushed_db_idx). After compression, _last_flushed_db_idx was correctly reset to 0, but conversation_history still had its original pre-compression length (e.g. 200). Since compressed messages are shorter (~30), messages[200:] was empty — nothing written to the new session's SQLite. Fix: Set conversation_history = None after each _compress_context() call so start_idx = 0 and all compressed messages are flushed. 2. Gateway-side: history_offset was always len(agent_history) — the original pre-compression length. After compression shortened the message list, agent_messages[200:] was empty, causing the gateway to fall back to writing only a user/assistant pair, losing the compressed summary and tail context. Fix: Detect session splits (agent.session_id != original) and set history_offset = 0 so all compressed messages are written to JSONL.
This commit is contained in:
parent
44d02f35d2
commit
f776191650
3 changed files with 222 additions and 1 deletions
|
|
@ -5739,7 +5739,9 @@ class GatewayRunner:
|
|||
# If so, update the session store entry so the NEXT message loads
|
||||
# the compressed transcript, not the stale pre-compression one.
|
||||
agent = agent_holder[0]
|
||||
_session_was_split = False
|
||||
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
|
||||
_session_was_split = True
|
||||
logger.info(
|
||||
"Session split detected: %s → %s (compression)",
|
||||
session_id, agent.session_id,
|
||||
|
|
@ -5751,6 +5753,13 @@ class GatewayRunner:
|
|||
|
||||
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
|
||||
|
||||
# When compression created a new session, the messages list was
|
||||
# shortened. Using the original history offset would produce an
|
||||
# empty new_messages slice, causing the gateway to write only a
|
||||
# user/assistant pair — losing the compressed summary and tail.
|
||||
# Reset to 0 so the gateway writes ALL compressed messages.
|
||||
_effective_history_offset = 0 if _session_was_split else len(agent_history)
|
||||
|
||||
# Auto-generate session title after first exchange (non-blocking)
|
||||
if final_response and self._session_db:
|
||||
try:
|
||||
|
|
@ -5772,7 +5781,7 @@ class GatewayRunner:
|
|||
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
|
||||
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
|
||||
"tools": tools_holder[0] or [],
|
||||
"history_offset": len(agent_history),
|
||||
"history_offset": _effective_history_offset,
|
||||
"last_prompt_tokens": _last_prompt_toks,
|
||||
"input_tokens": _input_toks,
|
||||
"output_tokens": _output_toks,
|
||||
|
|
|
|||
10
run_agent.py
10
run_agent.py
|
|
@ -6250,6 +6250,12 @@ class AIAgent:
|
|||
)
|
||||
if len(messages) >= _orig_len:
|
||||
break # Cannot compress further
|
||||
# Compression created a new session — clear the history
|
||||
# reference so _flush_messages_to_session_db writes ALL
|
||||
# compressed messages to the new session's SQLite, not
|
||||
# skipping them because conversation_history is still the
|
||||
# pre-compression length.
|
||||
conversation_history = None
|
||||
# Re-estimate after compression
|
||||
_preflight_tokens = estimate_request_tokens_rough(
|
||||
messages,
|
||||
|
|
@ -7765,6 +7771,10 @@ class AIAgent:
|
|||
approx_tokens=self.context_compressor.last_prompt_tokens,
|
||||
task_id=effective_task_id,
|
||||
)
|
||||
# Compression created a new session — clear history so
|
||||
# _flush_messages_to_session_db writes compressed messages
|
||||
# to the new session (see preflight compression comment).
|
||||
conversation_history = None
|
||||
|
||||
# Save session log incrementally (so progress is visible even if interrupted)
|
||||
self._session_messages = messages
|
||||
|
|
|
|||
202
tests/test_compression_persistence.py
Normal file
202
tests/test_compression_persistence.py
Normal file
|
|
@ -0,0 +1,202 @@
|
|||
"""Tests for context compression persistence in the gateway.
|
||||
|
||||
Verifies that when context compression fires during run_conversation(),
|
||||
the compressed messages are properly persisted to both SQLite (via the
|
||||
agent) and JSONL (via the gateway).
|
||||
|
||||
Bug scenario (pre-fix):
|
||||
1. Gateway loads 200-message history, passes to agent
|
||||
2. Agent's run_conversation() compresses to ~30 messages mid-run
|
||||
3. _compress_context() resets _last_flushed_db_idx = 0
|
||||
4. On exit, _flush_messages_to_session_db() calculates:
|
||||
flush_from = max(len(conversation_history=200), _last_flushed_db_idx=0) = 200
|
||||
5. messages[200:] is empty (only ~30 messages after compression)
|
||||
6. Nothing written to new session's SQLite — compressed context lost
|
||||
7. Gateway's history_offset was still 200, producing empty new_messages
|
||||
8. Fallback wrote only user/assistant pair — summary lost
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Part 1: Agent-side — _flush_messages_to_session_db after compression
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFlushAfterCompression:
|
||||
"""Verify that compressed messages are flushed to the new session's SQLite
|
||||
even when conversation_history (from the original session) is longer than
|
||||
the compressed messages list."""
|
||||
|
||||
def _make_agent(self, session_db):
|
||||
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}):
|
||||
from run_agent import AIAgent
|
||||
agent = AIAgent(
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
session_db=session_db,
|
||||
session_id="original-session",
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
return agent
|
||||
|
||||
def test_flush_after_compression_with_long_history(self):
|
||||
"""The actual bug: conversation_history longer than compressed messages.
|
||||
|
||||
Before the fix, flush_from = max(len(conversation_history), 0) = 200,
|
||||
but messages only has ~30 entries, so messages[200:] is empty.
|
||||
After the fix, conversation_history is cleared to None after compression,
|
||||
so flush_from = max(0, 0) = 0, and ALL compressed messages are written.
|
||||
"""
|
||||
from hermes_state import SessionDB
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test.db"
|
||||
db = SessionDB(db_path=db_path)
|
||||
|
||||
agent = self._make_agent(db)
|
||||
|
||||
# Simulate the original long history (200 messages)
|
||||
original_history = [
|
||||
{"role": "user" if i % 2 == 0 else "assistant",
|
||||
"content": f"message {i}"}
|
||||
for i in range(200)
|
||||
]
|
||||
|
||||
# First, flush original messages to the original session
|
||||
agent._flush_messages_to_session_db(original_history, [])
|
||||
original_rows = db.get_messages("original-session")
|
||||
assert len(original_rows) == 200
|
||||
|
||||
# Now simulate compression: new session, reset idx, shorter messages
|
||||
agent.session_id = "compressed-session"
|
||||
db.create_session(session_id="compressed-session", source="test")
|
||||
agent._last_flushed_db_idx = 0
|
||||
|
||||
# The compressed messages (summary + tail + new turn)
|
||||
compressed_messages = [
|
||||
{"role": "user", "content": "[CONTEXT COMPACTION] Summary of work..."},
|
||||
{"role": "user", "content": "What should we do next?"},
|
||||
{"role": "assistant", "content": "Let me check..."},
|
||||
{"role": "user", "content": "new question"},
|
||||
{"role": "assistant", "content": "new answer"},
|
||||
]
|
||||
|
||||
# THE BUG: passing the original history as conversation_history
|
||||
# causes flush_from = max(200, 0) = 200, skipping everything.
|
||||
# After the fix, conversation_history should be None.
|
||||
agent._flush_messages_to_session_db(compressed_messages, None)
|
||||
|
||||
new_rows = db.get_messages("compressed-session")
|
||||
assert len(new_rows) == 5, (
|
||||
f"Expected 5 compressed messages in new session, got {len(new_rows)}. "
|
||||
f"Compression persistence bug: messages not written to SQLite."
|
||||
)
|
||||
|
||||
def test_flush_with_stale_history_loses_messages(self):
|
||||
"""Demonstrates the bug condition: stale conversation_history causes data loss."""
|
||||
from hermes_state import SessionDB
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "test.db"
|
||||
db = SessionDB(db_path=db_path)
|
||||
|
||||
agent = self._make_agent(db)
|
||||
|
||||
# Simulate compression reset
|
||||
agent.session_id = "new-session"
|
||||
db.create_session(session_id="new-session", source="test")
|
||||
agent._last_flushed_db_idx = 0
|
||||
|
||||
compressed = [
|
||||
{"role": "user", "content": "summary"},
|
||||
{"role": "assistant", "content": "continuing..."},
|
||||
]
|
||||
|
||||
# Bug: passing a conversation_history longer than compressed messages
|
||||
stale_history = [{"role": "user", "content": f"msg{i}"} for i in range(100)]
|
||||
agent._flush_messages_to_session_db(compressed, stale_history)
|
||||
|
||||
rows = db.get_messages("new-session")
|
||||
# With the stale history, flush_from = max(100, 0) = 100
|
||||
# But compressed only has 2 entries → messages[100:] = empty
|
||||
assert len(rows) == 0, (
|
||||
"Expected 0 messages with stale conversation_history "
|
||||
"(this test verifies the bug condition exists)"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Part 2: Gateway-side — history_offset after session split
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGatewayHistoryOffsetAfterSplit:
|
||||
"""Verify that when the agent creates a new session during compression,
|
||||
the gateway uses history_offset=0 so all compressed messages are written
|
||||
to the JSONL transcript."""
|
||||
|
||||
def test_history_offset_zero_on_session_split(self):
|
||||
"""When agent.session_id differs from the original, history_offset must be 0."""
|
||||
# This tests the logic in gateway/run.py run_sync():
|
||||
# _session_was_split = agent.session_id != session_id
|
||||
# _effective_history_offset = 0 if _session_was_split else len(agent_history)
|
||||
|
||||
original_session_id = "session-abc"
|
||||
agent_session_id = "session-compressed-xyz" # Different = compression happened
|
||||
agent_history_len = 200
|
||||
|
||||
# Simulate the gateway's offset calculation (post-fix)
|
||||
_session_was_split = (agent_session_id != original_session_id)
|
||||
_effective_history_offset = 0 if _session_was_split else agent_history_len
|
||||
|
||||
assert _session_was_split is True
|
||||
assert _effective_history_offset == 0
|
||||
|
||||
def test_history_offset_preserved_without_split(self):
|
||||
"""When no compression happened, history_offset is the original length."""
|
||||
session_id = "session-abc"
|
||||
agent_session_id = "session-abc" # Same = no compression
|
||||
agent_history_len = 200
|
||||
|
||||
_session_was_split = (agent_session_id != session_id)
|
||||
_effective_history_offset = 0 if _session_was_split else agent_history_len
|
||||
|
||||
assert _session_was_split is False
|
||||
assert _effective_history_offset == 200
|
||||
|
||||
def test_new_messages_extraction_after_split(self):
|
||||
"""After compression with offset=0, new_messages should be ALL agent messages."""
|
||||
# Simulates the gateway's new_messages calculation
|
||||
agent_messages = [
|
||||
{"role": "user", "content": "[CONTEXT COMPACTION] Summary..."},
|
||||
{"role": "user", "content": "recent question"},
|
||||
{"role": "assistant", "content": "recent answer"},
|
||||
{"role": "user", "content": "new question"},
|
||||
{"role": "assistant", "content": "new answer"},
|
||||
]
|
||||
history_offset = 0 # After fix: 0 on session split
|
||||
|
||||
new_messages = agent_messages[history_offset:] if len(agent_messages) > history_offset else []
|
||||
assert len(new_messages) == 5, (
|
||||
f"Expected all 5 messages with offset=0, got {len(new_messages)}"
|
||||
)
|
||||
|
||||
def test_new_messages_empty_with_stale_offset(self):
|
||||
"""Demonstrates the bug: stale offset produces empty new_messages."""
|
||||
agent_messages = [
|
||||
{"role": "user", "content": "summary"},
|
||||
{"role": "assistant", "content": "answer"},
|
||||
]
|
||||
# Bug: offset is the pre-compression history length
|
||||
history_offset = 200
|
||||
|
||||
new_messages = agent_messages[history_offset:] if len(agent_messages) > history_offset else []
|
||||
assert len(new_messages) == 0, (
|
||||
"Expected 0 messages with stale offset=200 (demonstrates the bug)"
|
||||
)
|
||||
Loading…
Add table
Add a link
Reference in a new issue