mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
test(gateway): add compression session_id rotation integration tests (#34089)
This commit is contained in:
parent
39c4ac3af1
commit
648706936d
4 changed files with 338 additions and 0 deletions
|
|
@ -507,6 +507,8 @@ def compress_context(
|
|||
agent._session_db.end_session(agent.session_id, "compression")
|
||||
old_session_id = agent.session_id
|
||||
agent.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
||||
# Ordering contract: the agent thread updates the contextvar here;
|
||||
# the gateway propagates to SessionEntry after run_in_executor returns.
|
||||
try:
|
||||
from gateway.session_context import set_current_session_id
|
||||
|
||||
|
|
|
|||
|
|
@ -9672,6 +9672,8 @@ class GatewayRunner:
|
|||
)
|
||||
response = _sanitize_gateway_final_response(source.platform, response)
|
||||
|
||||
# Ordering contract: the agent thread already updated the contextvar
|
||||
# in conversation_compression.py; propagate to SessionEntry + _save().
|
||||
# If the agent's session_id changed during compression, update
|
||||
# session_entry so transcript writes below go to the right session.
|
||||
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
|
||||
|
|
|
|||
201
tests/gateway/test_compression_concurrent_sessions.py
Normal file
201
tests/gateway/test_compression_concurrent_sessions.py
Normal file
|
|
@ -0,0 +1,201 @@
|
|||
"""Behavioral tests for concurrent compression across distinct and shared sessions.
|
||||
|
||||
Complements ``test_compression_concurrent_fork.py`` (which tests the
|
||||
agent-level lock against a real ``SessionDB``) by focusing on gateway-level
|
||||
isolation guarantees:
|
||||
|
||||
1. Five distinct sessions compressing in parallel must not alias each other's
|
||||
session_ids (no cross-session contamination).
|
||||
2. Two agents sharing the same session_id must serialize: exactly one rotates,
|
||||
the other returns its input unchanged (the no-op / lock-loser contract).
|
||||
|
||||
The stub-compressor pattern mirrors ``test_compression_concurrent_fork.py``:
|
||||
the compressor returns deterministic output and sleeps briefly so threads
|
||||
actually overlap at the OS level, making the absence of aliasing a genuine
|
||||
stress test rather than a timing accident.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_state import SessionDB
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shared helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _build_agent_with_db(db: SessionDB, session_id: str):
|
||||
"""Construct an AIAgent wired to *db* and pinned to *session_id*.
|
||||
|
||||
Mirrors the helper in test_compression_concurrent_fork.py exactly so the
|
||||
two test modules can be read side-by-side without cognitive overhead.
|
||||
"""
|
||||
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}):
|
||||
from run_agent import AIAgent
|
||||
|
||||
agent = AIAgent(
|
||||
api_key="test-key",
|
||||
base_url="https://openrouter.ai/api/v1",
|
||||
model="test/model",
|
||||
quiet_mode=True,
|
||||
session_db=db,
|
||||
session_id=session_id,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
|
||||
# Stub the compressor: deterministic output, brief sleep to force thread overlap.
|
||||
compressor = MagicMock()
|
||||
|
||||
def _compress_with_overlap(*_a, **_kw):
|
||||
time.sleep(0.25) # match fork test sleep so threads reliably overlap
|
||||
return [
|
||||
{"role": "user", "content": "[CONTEXT COMPACTION] summary"},
|
||||
{"role": "user", "content": "tail"},
|
||||
]
|
||||
|
||||
compressor.compress.side_effect = _compress_with_overlap
|
||||
compressor.compression_count = 1
|
||||
compressor.last_prompt_tokens = 0
|
||||
compressor.last_completion_tokens = 0
|
||||
compressor._last_summary_error = None
|
||||
compressor._last_compress_aborted = False
|
||||
compressor._last_aux_model_failure_model = None
|
||||
compressor._last_aux_model_failure_error = None
|
||||
agent.context_compressor = compressor
|
||||
return agent
|
||||
|
||||
|
||||
_MESSAGES = [{"role": "user", "content": f"m{i}"} for i in range(20)]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_concurrent_compressions_do_not_alias_sessions(tmp_path: Path) -> None:
|
||||
"""Five distinct sessions compressing in parallel must each produce a unique
|
||||
post-compression session_id; no two agents must end up sharing an id.
|
||||
|
||||
Without per-session locking there is no cross-session aliasing anyway (each
|
||||
agent generates its own timestamp + uuid suffix), but this test makes the
|
||||
invariant explicit and would catch any regression where session_id generation
|
||||
became shared state (e.g. a module-level counter or a shared random seed).
|
||||
"""
|
||||
db = SessionDB(db_path=tmp_path / "state.db")
|
||||
|
||||
n = 5
|
||||
parent_ids = [f"DISTINCT_PARENT_{i:02d}" for i in range(n)]
|
||||
for sid in parent_ids:
|
||||
db.create_session(sid, source="discord")
|
||||
|
||||
agents = [_build_agent_with_db(db, sid) for sid in parent_ids]
|
||||
errors: list[Exception] = []
|
||||
|
||||
def run(agent):
|
||||
try:
|
||||
agent._compress_context(_MESSAGES, "sys", approx_tokens=120_000)
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
threads = [threading.Thread(target=run, args=(a,), name=f"session-{i}") for i, a in enumerate(agents)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(timeout=15)
|
||||
|
||||
assert not errors, f"Compression raised exceptions: {errors}"
|
||||
|
||||
# Every agent must have rotated to a new, unique session_id.
|
||||
new_ids = [a.session_id for a in agents]
|
||||
assert all(sid not in parent_ids for sid in new_ids), (
|
||||
"At least one agent did not rotate its session_id during compression. "
|
||||
f"parent_ids={parent_ids} new_ids={new_ids}"
|
||||
)
|
||||
assert len(set(new_ids)) == n, (
|
||||
f"Post-compression session_ids are not unique: {new_ids}. "
|
||||
"Two agents aliased to the same id — cross-session contamination."
|
||||
)
|
||||
|
||||
|
||||
def test_concurrent_compressions_same_session_serialize(tmp_path: Path) -> None:
|
||||
"""Two agents sharing a session_id must not both rotate it.
|
||||
|
||||
The per-session compression lock (added in #34351) serializes concurrent
|
||||
compress() calls keyed on the same session_id. Exactly one agent must
|
||||
rotate (the lock winner); the other must return its messages unchanged (the
|
||||
lock loser, which detects ``len(returned) == len(input)`` and backs off).
|
||||
|
||||
This is the gateway analogue of the fork test in
|
||||
``test_compression_concurrent_fork.py`` but scoped to the two-agent /
|
||||
same-session shape most likely to occur in practice: the main-turn agent
|
||||
and its background-review fork both hitting the compression threshold.
|
||||
"""
|
||||
db = SessionDB(db_path=tmp_path / "state.db")
|
||||
shared_sid = "SHARED_SESSION_CONCURRENT"
|
||||
db.create_session(shared_sid, source="discord")
|
||||
|
||||
agent_a = _build_agent_with_db(db, shared_sid)
|
||||
agent_b = _build_agent_with_db(db, shared_sid)
|
||||
|
||||
results: dict[str, list | None] = {"a": None, "b": None}
|
||||
errors: list[Exception] = []
|
||||
|
||||
def run(key, agent):
|
||||
try:
|
||||
compressed, _sp = agent._compress_context(_MESSAGES, "sys", approx_tokens=120_000)
|
||||
results[key] = compressed
|
||||
except Exception as exc:
|
||||
errors.append(exc)
|
||||
|
||||
t_a = threading.Thread(target=run, args=("a", agent_a), name="main_turn")
|
||||
t_b = threading.Thread(target=run, args=("b", agent_b), name="review_fork")
|
||||
t_a.start()
|
||||
t_b.start()
|
||||
t_a.join(timeout=15)
|
||||
t_b.join(timeout=15)
|
||||
|
||||
assert not errors, f"Compression raised exceptions: {errors}"
|
||||
|
||||
# Count which agents actually compressed (returned fewer messages than input)
|
||||
compressed_count = sum(
|
||||
1 for msgs in results.values()
|
||||
if msgs is not None and len(msgs) < len(_MESSAGES)
|
||||
)
|
||||
unchanged_count = sum(
|
||||
1 for msgs in results.values()
|
||||
if msgs is not None and len(msgs) == len(_MESSAGES)
|
||||
)
|
||||
|
||||
assert compressed_count == 1, (
|
||||
f"Expected exactly one agent to compress, got {compressed_count}. "
|
||||
"If both compressed, the lock failed to serialize. "
|
||||
"If neither compressed, both lost the lock (check lock logic)."
|
||||
)
|
||||
assert unchanged_count == 1, (
|
||||
f"Expected exactly one agent to return messages unchanged (lock loser), "
|
||||
f"got {unchanged_count}."
|
||||
)
|
||||
|
||||
# Exactly one session_id rotation must have occurred.
|
||||
rotated = sum(
|
||||
1 for a in (agent_a, agent_b) if a.session_id != shared_sid
|
||||
)
|
||||
assert rotated == 1, (
|
||||
f"Expected exactly one agent to rotate session_id, got {rotated}. "
|
||||
"Both agents rotating produces a session fork (Damien's incident shape)."
|
||||
)
|
||||
|
||||
# The lock must be released so future compression on the NEW session_id works.
|
||||
assert db.get_compression_lock_holder(shared_sid) is None, (
|
||||
"Compression lock leaked: still held on the parent session_id after both "
|
||||
"threads joined. Future compression on the child session would deadlock."
|
||||
)
|
||||
|
|
@ -11,6 +11,10 @@ re-triggers compression forever.
|
|||
Three sites in ``gateway/run.py`` mutate ``session_entry.session_id`` after
|
||||
a compression-induced session split. All three MUST be followed by a
|
||||
``_save()`` call. This test pins that invariant.
|
||||
|
||||
``TestCompressionSessionPropagation`` adds behavioral tests that exercise the
|
||||
actual propagation path inline, verifying that the mock session_entry update
|
||||
and _save() semantics are correct without requiring a live gateway.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
|
@ -18,8 +22,10 @@ from __future__ import annotations
|
|||
import ast
|
||||
import inspect
|
||||
import textwrap
|
||||
from unittest.mock import MagicMock, call
|
||||
|
||||
from gateway import run as gateway_run
|
||||
from gateway.session_context import set_current_session_id, get_session_env
|
||||
|
||||
|
||||
def _session_id_assignments_followed_by_save(source: str) -> list[tuple[int, bool]]:
|
||||
|
|
@ -109,3 +115,130 @@ def test_every_post_compression_session_id_assignment_persists():
|
|||
f"or the next turn loads the pre-compression transcript and triggers an "
|
||||
f"infinite compression loop. See issue #29335."
|
||||
)
|
||||
|
||||
|
||||
class TestCompressionSessionPropagation:
|
||||
"""Behavioral tests for post-compression session_id propagation.
|
||||
|
||||
The structural AST test above pins that every ``session_entry.session_id``
|
||||
assignment in gateway/run.py is followed by ``_save()``. These tests
|
||||
exercise the *behavior* of that propagation path inline, using mocks that
|
||||
mirror the objects gateway/run.py works with (``session_entry`` and
|
||||
``session_store``), verifying the semantics are correct without requiring a
|
||||
live gateway instance.
|
||||
|
||||
Ordering contract (from the comments added to the source in this PR):
|
||||
1. The agent thread updates the contextvar in ``conversation_compression.py``
|
||||
via ``set_current_session_id(agent.session_id)``.
|
||||
2. After ``run_in_executor`` returns, the gateway propagates the new id to
|
||||
``session_entry.session_id`` and calls ``session_store._save()``.
|
||||
Both halves must agree for the next turn to route correctly.
|
||||
"""
|
||||
|
||||
def test_gateway_session_entry_follows_compression_rotation(self) -> None:
|
||||
"""The gateway handler must update session_entry and call _save() when
|
||||
the agent result carries a rotated session_id.
|
||||
|
||||
Simulates the inline propagation block in gateway/run.py:
|
||||
|
||||
if agent_result.get("session_id") and \\
|
||||
agent_result["session_id"] != session_entry.session_id:
|
||||
session_entry.session_id = agent_result["session_id"]
|
||||
self.session_store._save()
|
||||
|
||||
Verifies that session_entry.session_id is mutated and _save is called
|
||||
exactly once — the minimal contract that prevents the restart-loop bug.
|
||||
"""
|
||||
old_sid = "20260101_000000_aaaaaa"
|
||||
new_sid = "20260101_000001_bbbbbb"
|
||||
|
||||
session_entry = MagicMock()
|
||||
session_entry.session_id = old_sid
|
||||
|
||||
session_store = MagicMock()
|
||||
|
||||
agent_result = {"session_id": new_sid, "response": "hello"}
|
||||
|
||||
# Inline the propagation logic exactly as it appears in gateway/run.py
|
||||
# (around line 9459). This is the behavior we are pinning.
|
||||
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
|
||||
session_entry.session_id = agent_result["session_id"]
|
||||
session_store._save()
|
||||
|
||||
assert session_entry.session_id == new_sid, (
|
||||
"session_entry.session_id was not updated to the compressed session id. "
|
||||
"The next turn would load the old transcript and re-trigger compression."
|
||||
)
|
||||
session_store._save.assert_called_once_with(), (
|
||||
"session_store._save() was not called after session_entry update. "
|
||||
"The new session mapping would not survive a gateway restart."
|
||||
)
|
||||
|
||||
def test_no_update_when_session_id_unchanged(self) -> None:
|
||||
"""The propagation block must be a no-op when the agent did not compress.
|
||||
|
||||
If the agent returns the same session_id (normal turn, no compression),
|
||||
session_entry must not be touched and _save must not be called — avoiding
|
||||
spurious writes on every turn.
|
||||
"""
|
||||
same_sid = "20260101_000000_aaaaaa"
|
||||
|
||||
session_entry = MagicMock()
|
||||
session_entry.session_id = same_sid
|
||||
|
||||
session_store = MagicMock()
|
||||
|
||||
# Normal turn: agent returns same session_id (or none at all)
|
||||
agent_result = {"response": "hello"} # no "session_id" key
|
||||
|
||||
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
|
||||
session_entry.session_id = agent_result["session_id"]
|
||||
session_store._save()
|
||||
|
||||
# session_entry.session_id was set during mock construction; the
|
||||
# propagation block must not have set it again.
|
||||
session_store._save.assert_not_called()
|
||||
|
||||
def test_contextvar_and_session_entry_agree_after_compression(self) -> None:
|
||||
"""After compression, the contextvar and session_entry must carry the
|
||||
same session_id.
|
||||
|
||||
The agent thread calls ``set_current_session_id(new_sid)`` inside
|
||||
``conversation_compression.py`` (step 1). The gateway then propagates
|
||||
``new_sid`` to ``session_entry.session_id`` (step 2). If either step
|
||||
is missing, tool calls and transcript writes will disagree on which
|
||||
session is active.
|
||||
|
||||
This test simulates both steps and asserts agreement.
|
||||
"""
|
||||
old_sid = "20260101_000000_cccccc"
|
||||
new_sid = "20260101_000002_dddddd"
|
||||
|
||||
# Step 1: agent thread updates contextvar (mirrors conversation_compression.py
|
||||
# around line 511-513)
|
||||
set_current_session_id(new_sid)
|
||||
|
||||
# Step 2: gateway propagates to session_entry (mirrors gateway/run.py
|
||||
# around line 9459-9461)
|
||||
session_entry = MagicMock()
|
||||
session_entry.session_id = old_sid
|
||||
agent_result = {"session_id": new_sid}
|
||||
|
||||
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
|
||||
session_entry.session_id = agent_result["session_id"]
|
||||
|
||||
contextvar_sid = get_session_env("HERMES_SESSION_ID", "")
|
||||
assert contextvar_sid == new_sid, (
|
||||
f"Contextvar still holds old session_id '{contextvar_sid}' after "
|
||||
f"set_current_session_id('{new_sid}'). Tool calls in the next turn "
|
||||
"will read stale routing state."
|
||||
)
|
||||
assert session_entry.session_id == new_sid, (
|
||||
f"session_entry.session_id is '{session_entry.session_id}' but contextvar "
|
||||
f"says '{contextvar_sid}'. The two routing paths disagree after compression."
|
||||
)
|
||||
assert contextvar_sid == session_entry.session_id, (
|
||||
"Contextvar and session_entry disagree on the active session_id "
|
||||
"after compression rotation. Exactly one of the two ordering steps "
|
||||
"was skipped."
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue