mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-24 10:52:21 +00:00
fix(gateway): preserve transcript when hygiene auto-compress can't rotate
Gateway Session Hygiene auto-compression destroyed the original transcript when the throwaway hygiene agent couldn't rotate the session (#21301, P1). The _hyg_agent is built WITHOUT a session_db, so _compress_context cannot end-and-fork the session (its rotate block is gated on agent._session_db). The session_id stays unchanged, and the rewrite_transcript() call ran UNCONDITIONALLY — replacing the full original transcript with just the head+summary list. Permanent data loss on every hygiene compaction. Guard the rewrite behind 'rotated OR in-place' exactly like the /compress path already does (#44794/#39704): only overwrite when a new session id was minted or in-place compaction succeeded; otherwise preserve the original transcript and log a warning. The token/count bookkeeping that followed the rewrite is moved inside the guard, with no-change values in the preserve branch. Co-authored-by: SandroHub013 <sandrohub013@gmail.com> Co-authored-by: WuTianyi123 <wtyopenclaw@gmail.com> Co-authored-by: kyssta-exe <kyssta-exe@users.noreply.github.com>
This commit is contained in:
parent
79f297834a
commit
4c349e85f8
2 changed files with 139 additions and 11 deletions
|
|
@ -9134,7 +9134,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# the NEW session so the old transcript stays intact
|
||||
# and searchable via session_search.
|
||||
_hyg_new_sid = _hyg_agent.session_id
|
||||
if _hyg_new_sid != session_entry.session_id:
|
||||
_hyg_rotated = _hyg_new_sid != session_entry.session_id
|
||||
_hyg_in_place = bool(
|
||||
getattr(_hyg_agent, "compression_in_place", False)
|
||||
)
|
||||
if _hyg_rotated:
|
||||
session_entry.session_id = _hyg_new_sid
|
||||
self.session_store._save()
|
||||
self._sync_telegram_topic_binding(
|
||||
|
|
@ -9142,16 +9146,41 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
reason="hygiene-compression",
|
||||
)
|
||||
|
||||
self.session_store.rewrite_transcript(
|
||||
session_entry.session_id, _compressed
|
||||
)
|
||||
# Reset stored token count — transcript was rewritten
|
||||
session_entry.last_prompt_tokens = 0
|
||||
history = _compressed
|
||||
_new_count = len(_compressed)
|
||||
_new_tokens = estimate_messages_tokens_rough(
|
||||
_compressed
|
||||
)
|
||||
# Only rewrite the transcript when rotation produced
|
||||
# a NEW session id OR in-place compaction succeeded.
|
||||
# The danger this guards against (mirrors the
|
||||
# /compress fix #44794/#39704): the hygiene agent is
|
||||
# built WITHOUT a session_db, so _compress_context
|
||||
# cannot rotate — if it also wasn't in-place, the
|
||||
# session_id is unchanged for a FAILURE reason, and an
|
||||
# unconditional rewrite_transcript() would DELETE the
|
||||
# original messages and replace them with only the
|
||||
# compressed summary (permanent data loss, #21301).
|
||||
if _hyg_rotated or _hyg_in_place:
|
||||
self.session_store.rewrite_transcript(
|
||||
session_entry.session_id, _compressed
|
||||
)
|
||||
# Reset stored token count — transcript rewritten
|
||||
session_entry.last_prompt_tokens = 0
|
||||
history = _compressed
|
||||
_new_count = len(_compressed)
|
||||
_new_tokens = estimate_messages_tokens_rough(
|
||||
_compressed
|
||||
)
|
||||
else:
|
||||
# No rewrite happened — transcript preserved
|
||||
# unchanged, so the post-compression counts equal
|
||||
# the pre-compression ones.
|
||||
_new_count = _msg_count
|
||||
_new_tokens = _approx_tokens
|
||||
logger.warning(
|
||||
"Gateway hygiene compression for session %s "
|
||||
"did not rotate or compact in place "
|
||||
"(no session_db on the hygiene agent) — "
|
||||
"preserving the original transcript instead "
|
||||
"of overwriting it with the summary (#21301).",
|
||||
session_entry.session_id,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Session hygiene: compressed %s → %s msgs, "
|
||||
|
|
|
|||
|
|
@ -395,6 +395,105 @@ async def test_session_hygiene_messages_stay_in_originating_topic(monkeypatch, t
|
|||
FakeCompressAgent.last_instance.close.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_hygiene_preserves_transcript_when_no_rotation(monkeypatch, tmp_path):
|
||||
"""Regression for #21301: the hygiene agent is built without a session_db,
|
||||
so _compress_context cannot rotate. When it neither rotates NOR compacts
|
||||
in place, the transcript MUST be preserved — an unconditional
|
||||
rewrite_transcript() would replace the original messages with only the
|
||||
summary (permanent data loss). Mirrors the /compress guard (#44794)."""
|
||||
fake_dotenv = types.ModuleType("dotenv")
|
||||
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
|
||||
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
|
||||
|
||||
class NonRotatingCompressAgent:
|
||||
last_instance = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.model = kwargs.get("model")
|
||||
self.session_id = kwargs.get("session_id", "fake-session")
|
||||
self.compression_in_place = False # not in-place either
|
||||
self._print_fn = None
|
||||
self.shutdown_memory_provider = MagicMock()
|
||||
self.close = MagicMock()
|
||||
type(self).last_instance = self
|
||||
|
||||
def _compress_context(self, messages, *_args, **_kwargs):
|
||||
# No session_db → cannot rotate: session_id is UNCHANGED, and this
|
||||
# is a failure-to-rotate, not an in-place success.
|
||||
return ([{"role": "assistant", "content": "summary only"}], None)
|
||||
|
||||
fake_run_agent = types.ModuleType("run_agent")
|
||||
fake_run_agent.AIAgent = NonRotatingCompressAgent
|
||||
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
|
||||
|
||||
gateway_run = importlib.import_module("gateway.run")
|
||||
GatewayRunner = gateway_run.GatewayRunner
|
||||
|
||||
adapter = HygieneCaptureAdapter()
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(
|
||||
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="fake-token")}
|
||||
)
|
||||
runner.adapters = {Platform.TELEGRAM: adapter}
|
||||
runner._voice_mode = {}
|
||||
runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False)
|
||||
runner.session_store = MagicMock()
|
||||
runner.session_store.get_or_create_session.return_value = SessionEntry(
|
||||
session_key="agent:main:telegram:group:-1001:17585",
|
||||
session_id="sess-1",
|
||||
created_at=datetime.now(),
|
||||
updated_at=datetime.now(),
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="group",
|
||||
)
|
||||
runner.session_store.load_transcript.return_value = _make_history(6, content_size=400)
|
||||
runner.session_store.has_any_sessions.return_value = True
|
||||
runner.session_store.rewrite_transcript = MagicMock()
|
||||
runner.session_store.append_to_transcript = MagicMock()
|
||||
runner._running_agents = {}
|
||||
runner._pending_messages = {}
|
||||
runner._pending_approvals = {}
|
||||
runner._session_db = None
|
||||
runner._is_user_authorized = lambda _source: True
|
||||
runner._set_session_env = lambda _context: None
|
||||
runner._run_agent = AsyncMock(
|
||||
return_value={
|
||||
"final_response": "ok",
|
||||
"messages": [],
|
||||
"tools": [],
|
||||
"history_offset": 0,
|
||||
"last_prompt_tokens": 0,
|
||||
}
|
||||
)
|
||||
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "fake"})
|
||||
monkeypatch.setattr(
|
||||
"agent.model_metadata.get_model_context_length",
|
||||
lambda *_args, **_kwargs: 100,
|
||||
)
|
||||
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "795544298")
|
||||
|
||||
event = MessageEvent(
|
||||
text="hello",
|
||||
source=SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id="-1001",
|
||||
chat_type="group",
|
||||
thread_id="17585",
|
||||
user_id="12345",
|
||||
),
|
||||
message_id="1",
|
||||
)
|
||||
|
||||
result = await runner._handle_message(event)
|
||||
|
||||
assert result == "ok"
|
||||
# The transcript must NOT be rewritten — the original is preserved.
|
||||
runner.session_store.rewrite_transcript.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_session_hygiene_warns_user_when_compression_aborts(monkeypatch, tmp_path):
|
||||
"""When auxiliary compression's summary LLM call fails, the compressor
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue