diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index e11dc7c171d..1ab861bbec9 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -41,6 +41,26 @@ from agent.model_metadata import estimate_request_tokens_rough logger = logging.getLogger(__name__) +def _compression_lock_holder(agent: Any) -> str: + """Build a unique holder id for the lock: pid:tid:agent-instance:uuid. + + The pid+tid prefix lets ops tell crashed/abandoned holders apart from + live ones (expiry-based recovery uses the timestamp, but ``holder`` + is what shows up in diagnostics + log lines). The agent instance id + and a per-acquire uuid disambiguate two co-resident agents on the + same thread (background_review forks run on a worker thread, but + on machines where compression itself dispatches to a thread pool + we want each acquire to be unique). + """ + import threading + return ( + f"pid={os.getpid()}" + f":tid={threading.get_ident()}" + f":agent={id(agent):x}" + f":nonce={uuid.uuid4().hex[:8]}" + ) + + def check_compression_model_feasibility(agent: Any) -> None: """Warn at session start if the auxiliary compression model's context window is smaller than the main model's compression threshold. @@ -305,6 +325,65 @@ def compress_context( "🗜️ Compacting context — summarizing earlier conversation so I can continue..." ) + # ── Compression lock ──────────────────────────────────────────────── + # Atomic, state.db-backed lock per session_id. Without this, two + # AIAgent instances that share the same session_id (most commonly the + # parent-turn agent and its background-review fork — see + # ``agent/background_review.py``: ``review_agent.session_id = + # agent.session_id``) can each call compress() on overlapping + # snapshots of the same conversation. Both succeed, both rotate + # ``agent.session_id`` to a fresh id, both create child sessions in + # state.db parented to the same old id. The gateway's SessionEntry + # only catches one rotation, so the other child becomes an orphan + # that silently accumulates writes — Damien's repro shape. + # + # Acquire keyed on the OLD session_id (the rotation target's parent), + # because that's the id that competing paths see and read from + # SessionEntry at the start of their own compression attempt. + # + # If we can't acquire the lock, another path is mid-compression on + # this session. Aborting is correct: the messages are unchanged, the + # other path's rotation will produce the canonical new session_id, + # and our caller's auto-compress loop sees ``len(returned) == len(input)`` + # and stops retrying for this cycle. The session is NOT corrupted — + # we just sit out this round and let the winner finish. + _lock_db = getattr(agent, "_session_db", None) + _lock_sid = agent.session_id or "" + _lock_holder: Optional[str] = None + if _lock_db is not None and _lock_sid: + _lock_holder = _compression_lock_holder(agent) + if not _lock_db.try_acquire_compression_lock(_lock_sid, _lock_holder): + existing = _lock_db.get_compression_lock_holder(_lock_sid) + logger.warning( + "compression skipped: another path is compressing session=%s " + "(holder=%s) — returning messages unchanged to avoid session fork", + _lock_sid, existing, + ) + _lock_holder = None # don't release a lock we don't own + # Surface to the user once — quiet for downstream auto-compress loops + if getattr(agent, "_last_compression_lock_warning_sid", None) != _lock_sid: + agent._last_compression_lock_warning_sid = _lock_sid + try: + agent._emit_warning( + "⚠ Skipping concurrent compression — another path " + "is already compressing this session. Will retry " + "after it finishes." + ) + except Exception: + pass + _existing_sp = getattr(agent, "_cached_system_prompt", None) + if not _existing_sp: + _existing_sp = agent._build_system_prompt(system_message) + return messages, _existing_sp + + def _release_lock() -> None: + """Release the lock keyed on the OLD session_id (before rotation).""" + if _lock_db is not None and _lock_sid and _lock_holder: + try: + _lock_db.release_compression_lock(_lock_sid, _lock_holder) + except Exception as _rel_err: + logger.debug("compression lock release failed: %s", _rel_err) + # Notify external memory provider before compression discards context if agent._memory_manager: try: @@ -318,6 +397,11 @@ def compress_context( # Plugin context engine with strict signature that doesn't accept # focus_topic / force — fall back to calling without them. compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens) + except BaseException: + # ANY exception during compress() must release the lock so the + # session isn't permanently blocked from future compression. + _release_lock() + raise # If compression aborted (aux LLM failed to produce a usable summary) # the compressor returns the input messages unchanged. Surface the @@ -336,6 +420,7 @@ def compress_context( _existing_sp = getattr(agent, "_cached_system_prompt", None) if not _existing_sp: _existing_sp = agent._build_system_prompt(system_message) + _release_lock() # compression aborted — no rotation will happen return messages, _existing_sp summary_error = getattr(agent.context_compressor, "_last_summary_error", None) @@ -480,6 +565,12 @@ def compress_context( agent.session_id or "none", _pre_msg_count, len(compressed), f"{_compressed_est:,}", ) + # Release the lock on the OLD session_id only AFTER rotation completed + # and all post-rotation bookkeeping (memory manager, context engine, + # file dedup) ran. A concurrent path that wakes up the moment we + # release will see the NEW session_id in state.db / SessionEntry and + # acquire on that — no race against our just-finished work. + _release_lock() return compressed, new_system_prompt diff --git a/hermes_state.py b/hermes_state.py index ba33598b91e..ced77563357 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -33,7 +33,7 @@ T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" -SCHEMA_VERSION = 13 +SCHEMA_VERSION = 14 # --------------------------------------------------------------------------- # WAL-compatibility fallback @@ -281,10 +281,18 @@ CREATE TABLE IF NOT EXISTS state_meta ( value TEXT ); +CREATE TABLE IF NOT EXISTS compression_locks ( + session_id TEXT PRIMARY KEY, + holder TEXT NOT NULL, + acquired_at REAL NOT NULL, + expires_at REAL NOT NULL +); + CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source); CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp); +CREATE INDEX IF NOT EXISTS idx_compression_locks_expires ON compression_locks(expires_at); """ FTS_SQL = """ @@ -791,6 +799,133 @@ class SessionDB: ) self._execute_write(_do) + # ────────────────────────────────────────────────────────────────────── + # Compression locks + # ────────────────────────────────────────────────────────────────────── + # Atomic per-session locks that prevent two compression paths from + # racing on the same session_id and producing orphan child sessions. + # + # The race: ``conversation_compression.py`` rotates ``agent.session_id`` + # as a side effect of a successful compression (end old session, create + # new). That mutation is local to the AIAgent instance — but ``state.db`` + # is shared across all instances. Two AIAgents that share the same + # ``session_id`` at the moment they both decide to compress (most + # commonly the parent turn's agent + a background-review fork started + # right after the turn ended) each end the parent and create their own + # NEW session, parented to the same old id. The gateway SessionEntry + # only catches one rotation; the other child silently accumulates + # writes — Damien's "parent → two orphan children" repro shape. + # + # The lock is keyed by ``session_id`` and is held for the duration of + # the compress() call plus the rotation. ``holder`` identifies the + # current owner (pid:tid:nonce) for diagnostics; the lock is recovered + # via ``expires_at`` if the holder process crashed without releasing. + def try_acquire_compression_lock( + self, + session_id: str, + holder: str, + ttl_seconds: float = 300.0, + ) -> bool: + """Try to atomically acquire the compression lock for ``session_id``. + + Returns ``True`` on success (caller now owns the lock and must + release via :meth:`release_compression_lock`). Returns ``False`` + if another holder already owns a non-expired lock — the caller + MUST NOT proceed with compression in that case (its rotation would + race against the holder's, splitting the session lineage). + + Expired locks (``expires_at < now``) are reclaimed transparently: + the stale row is deleted and the new holder acquires it. This + prevents a crashed compressor from permanently blocking the + session. + + Implementation: single-transaction DELETE-expired + INSERT-or-IGNORE, + followed by a SELECT to confirm we got the row. SQLite serialises + writes, so the whole sequence is atomic against other writers. + """ + if not session_id: + return False + now = time.time() + expires_at = now + ttl_seconds + + def _do(conn): + # First: reclaim any expired lock for this session_id. + conn.execute( + "DELETE FROM compression_locks " + "WHERE session_id = ? AND expires_at < ?", + (session_id, now), + ) + # Then: try to insert. INSERT OR IGNORE returns no rowcount + # difference — verify ownership via SELECT. + conn.execute( + "INSERT OR IGNORE INTO compression_locks " + "(session_id, holder, acquired_at, expires_at) " + "VALUES (?, ?, ?, ?)", + (session_id, holder, now, expires_at), + ) + row = conn.execute( + "SELECT holder FROM compression_locks WHERE session_id = ?", + (session_id,), + ).fetchone() + return row is not None and ( + row["holder"] if isinstance(row, sqlite3.Row) else row[0] + ) == holder + + try: + return bool(self._execute_write(_do)) + except sqlite3.Error as exc: + logger.warning( + "try_acquire_compression_lock(%s) failed: %s", + session_id, exc, + ) + # Fail open: returning False makes the caller skip compression, + # which is the safe behaviour when the lock subsystem is broken. + return False + + def release_compression_lock(self, session_id: str, holder: str) -> None: + """Release the compression lock for ``session_id`` iff we own it. + + Idempotent: no-op when the lock has already expired and been + reclaimed by a different holder, or when no lock exists. The + ``holder`` check prevents a late-returning compressor from + clobbering a fresh lock held by someone else. + """ + if not session_id: + return + + def _do(conn): + conn.execute( + "DELETE FROM compression_locks " + "WHERE session_id = ? AND holder = ?", + (session_id, holder), + ) + + try: + self._execute_write(_do) + except sqlite3.Error as exc: + logger.warning( + "release_compression_lock(%s) failed: %s", + session_id, exc, + ) + + def get_compression_lock_holder(self, session_id: str) -> Optional[str]: + """Return the current (non-expired) holder for ``session_id``, or None. + + Diagnostic helper — not used by the locking protocol itself. + """ + if not session_id: + return None + now = time.time() + row = self._conn.execute( + "SELECT holder FROM compression_locks " + "WHERE session_id = ? AND expires_at >= ?", + (session_id, now), + ).fetchone() + if row is None: + return None + return row["holder"] if isinstance(row, sqlite3.Row) else row[0] + + def update_system_prompt(self, session_id: str, system_prompt: str) -> None: """Store the full assembled system prompt snapshot.""" def _do(conn): diff --git a/tests/agent/test_compression_concurrent_fork.py b/tests/agent/test_compression_concurrent_fork.py new file mode 100644 index 00000000000..5147f7146ab --- /dev/null +++ b/tests/agent/test_compression_concurrent_fork.py @@ -0,0 +1,173 @@ +"""Regression: prevent transcript fork when two paths compress the same session_id. + +Damien's incident (Discord, 2026-05-28): a long Hermes session in a Discord +gateway hit the compression threshold at the end of a turn. The parent agent +finished delivering the response and ``conversation_loop.py`` fired +``_spawn_background_review(...)`` — which builds a forked ``AIAgent`` that +inherits ``agent.session_id`` (see ``agent/background_review.py``:: +``review_agent.session_id = agent.session_id``). Roughly two seconds later +a synthetic ``Background process proc_… completed`` event arrived and +started a fresh turn on the same parent ``session_id`` (still cached in the +gateway's ``SessionEntry``). Both paths hit preflight compression on the +same parent transcript and called ``_compress_context`` concurrently. Each +ended the parent and created its own CHILD session in ``state.db``, both +parented to the same old id. The gateway's ``SessionEntry`` only caught one +rotation; the other child became an orphan that silently accumulated writes. + +Repro shape on Damien's machine: + + parent 20260527_234659_e65f0e ended_at=set end_reason='compression' + child 20260528_113619_fc80e1 parent=20260527_234659_e65f0e (in SessionEntry) + child parent=20260527_234659_e65f0e (silent writes) + +This regression simulates the two concurrent ``compress_context`` calls +against a shared ``state.db`` and asserts that the per-session compression +lock added in this PR prevents the orphan child. Without the lock the +fixture deterministically produces 2 children; with the lock, exactly 1. +""" + +from __future__ import annotations + +import os +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from hermes_state import SessionDB + + +def _build_agent_with_db(db: SessionDB, session_id: str): + """Build an AIAgent that's wired to ``db`` and pinned to ``session_id``.""" + with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): + from run_agent import AIAgent + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + session_db=db, + session_id=session_id, + skip_context_files=True, + skip_memory=True, + ) + + # Stub the compressor so it returns deterministic output and DOESN'T make + # an LLM call. Sleep inside compress() so the two threads' rotations + # actually overlap — without that the OS could happen to serialize them + # and hide the bug. + compressor = MagicMock() + + def _compress_with_overlap(*_a, **_kw): + time.sleep(0.25) + return [ + {"role": "user", "content": "[CONTEXT COMPACTION] summary"}, + {"role": "user", "content": "tail"}, + ] + + compressor.compress.side_effect = _compress_with_overlap + compressor.compression_count = 1 + compressor.last_prompt_tokens = 0 + compressor.last_completion_tokens = 0 + compressor._last_summary_error = None + compressor._last_compress_aborted = False + compressor._last_aux_model_failure_model = None + compressor._last_aux_model_failure_error = None + agent.context_compressor = compressor + return agent + + +def _count_children(db: SessionDB, parent_sid: str) -> int: + """Count rows in state.db whose parent_session_id == parent_sid.""" + rows = db._conn.execute( + "SELECT id FROM sessions WHERE parent_session_id = ?", + (parent_sid,), + ).fetchall() + return len(rows) + + +def test_concurrent_compression_does_not_fork_session(tmp_path: Path) -> None: + """Two AIAgents that share a session_id MUST NOT both rotate it. + + Without the per-session compression lock this fixture deterministically + produces 2 child sessions (transcript fork). With the lock the second + path aborts cleanly, leaving exactly 1 canonical child. + """ + db = SessionDB(db_path=tmp_path / "state.db") + + parent_sid = "PARENT_TEST_SESSION" + db.create_session(parent_sid, source="discord") + + # Two agents on the same session_id, both wired to the same db — + # mirrors the parent-turn agent + the background-review fork right + # after a turn ends. + agent_a = _build_agent_with_db(db, parent_sid) + agent_b = _build_agent_with_db(db, parent_sid) + messages = [{"role": "user", "content": f"m{i}"} for i in range(20)] + + def run(agent): + try: + agent._compress_context(messages, "sys", approx_tokens=120_000) + except Exception: + # Surface to the test if either raises — should not happen. + raise + + t_a = threading.Thread(target=run, args=(agent_a,), name="main_turn") + t_b = threading.Thread(target=run, args=(agent_b,), name="review_fork") + t_a.start() + t_b.start() + t_a.join(timeout=10) + t_b.join(timeout=10) + + # Exactly one canonical child — not two orphans. + assert _count_children(db, parent_sid) == 1, ( + "Compression lock failed: parent session has multiple children in state.db " + "(transcript fork). This is Damien's incident shape — see the test docstring." + ) + + # And exactly one of the two agents actually rotated its session_id; the + # other should still hold the parent_sid (its compression was skipped). + rotated = sum( + 1 for a in (agent_a, agent_b) if a.session_id != parent_sid + ) + assert rotated == 1, ( + f"Expected exactly one agent to rotate session_id, got {rotated}. " + "Both agents rotating means the lock didn't serialize them." + ) + + # The lock must be released after the winner finished. + assert db.get_compression_lock_holder(parent_sid) is None, ( + "Compression lock leaked: still held after both rotations completed." + ) + + +def test_skipped_compression_returns_messages_unchanged(tmp_path: Path) -> None: + """The loser of the lock race must return its input messages verbatim. + + Callers (preflight compression in ``conversation_loop.py``) detect the + no-op via ``len(returned) == len(input)`` and stop the auto-compress + retry loop. If the skipped path returned the compressed view, that + detection would break and the caller would mutate the conversation + without going through state.db rotation. + """ + db = SessionDB(db_path=tmp_path / "state.db") + parent_sid = "LOSER_TEST" + db.create_session(parent_sid, source="discord") + + # Pre-acquire the lock so the agent's compress_context sees it held. + held = db.try_acquire_compression_lock(parent_sid, "external_holder") + assert held is True + + agent = _build_agent_with_db(db, parent_sid) + messages = [{"role": "user", "content": "m1"}, {"role": "user", "content": "m2"}] + + compressed, _sp = agent._compress_context(messages, "sys", approx_tokens=120_000) + + # Skipped: messages returned verbatim, no rotation + assert compressed is messages or compressed == messages + assert agent.session_id == parent_sid + # Compressor was never called (the skip happens before .compress()) + agent.context_compressor.compress.assert_not_called() diff --git a/tests/test_hermes_state_compression_locks.py b/tests/test_hermes_state_compression_locks.py new file mode 100644 index 00000000000..53e3bc0dec4 --- /dev/null +++ b/tests/test_hermes_state_compression_locks.py @@ -0,0 +1,149 @@ +"""Tests for ``SessionDB`` compression-lock primitives. + +These cover the atomic per-session lock that prevents two compression +paths from racing on the same ``session_id`` and producing orphan child +sessions (Damien's "parent → two orphan children" repro shape, see +``tests/agent/test_compression_concurrent_fork.py`` for the +behavioural regression test). + +Focus here: the lock primitives themselves (acquire, release, TTL, +diagnostic accessor) — not the wiring into compression. +""" + +from __future__ import annotations + +import threading +import time +from pathlib import Path + +import pytest + +from hermes_state import SessionDB + + +@pytest.fixture +def db(tmp_path: Path) -> SessionDB: + return SessionDB(tmp_path / "state.db") + + +# ---------------------------------------------------------------------- +# Single-holder semantics +# ---------------------------------------------------------------------- + + +def test_acquire_succeeds_when_unlocked(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("sess1", "holder1") is True + assert db.get_compression_lock_holder("sess1") == "holder1" + + +def test_acquire_blocks_second_holder(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("sess1", "holder1") is True + assert db.try_acquire_compression_lock("sess1", "holder2") is False + # First holder still owns it + assert db.get_compression_lock_holder("sess1") == "holder1" + + +def test_release_allows_reacquire(db: SessionDB) -> None: + db.try_acquire_compression_lock("sess1", "holder1") + db.release_compression_lock("sess1", "holder1") + assert db.get_compression_lock_holder("sess1") is None + assert db.try_acquire_compression_lock("sess1", "holder2") is True + + +def test_release_with_wrong_holder_is_noop(db: SessionDB) -> None: + db.try_acquire_compression_lock("sess1", "holder1") + # Late-returning compressor must not release a lock it doesn't own + db.release_compression_lock("sess1", "holder_other") + assert db.get_compression_lock_holder("sess1") == "holder1" + + +def test_release_when_unlocked_is_noop(db: SessionDB) -> None: + # No exception, no state change + db.release_compression_lock("never_locked", "holder1") + assert db.get_compression_lock_holder("never_locked") is None + + +# ---------------------------------------------------------------------- +# Per-session isolation +# ---------------------------------------------------------------------- + + +def test_locks_are_per_session(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("sess1", "holder1") is True + # Different session: independent lock + assert db.try_acquire_compression_lock("sess2", "holder2") is True + assert db.get_compression_lock_holder("sess1") == "holder1" + assert db.get_compression_lock_holder("sess2") == "holder2" + + +# ---------------------------------------------------------------------- +# TTL / expiry recovery +# ---------------------------------------------------------------------- + + +def test_expired_lock_is_reclaimable(db: SessionDB) -> None: + """A crashed compressor must not permanently block the session.""" + # Acquire with a very short TTL + db.try_acquire_compression_lock("sess1", "crashed_holder", ttl_seconds=0.05) + time.sleep(0.1) + # Holder check honours expiry + assert db.get_compression_lock_holder("sess1") is None + # New holder can claim it + assert db.try_acquire_compression_lock("sess1", "fresh_holder") is True + assert db.get_compression_lock_holder("sess1") == "fresh_holder" + + +def test_non_expired_lock_is_held(db: SessionDB) -> None: + db.try_acquire_compression_lock("sess1", "holder1", ttl_seconds=60) + # Immediately after, still held + assert db.try_acquire_compression_lock("sess1", "holder2") is False + + +# ---------------------------------------------------------------------- +# Empty / invalid input +# ---------------------------------------------------------------------- + + +def test_acquire_empty_session_id_returns_false(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("", "holder1") is False + + +def test_release_empty_session_id_is_noop(db: SessionDB) -> None: + # No exception + db.release_compression_lock("", "holder1") + + +def test_holder_empty_session_id_returns_none(db: SessionDB) -> None: + assert db.get_compression_lock_holder("") is None + + +# ---------------------------------------------------------------------- +# Concurrency: real threads racing on the same session_id +# ---------------------------------------------------------------------- + + +def test_concurrent_acquire_only_one_winner(db: SessionDB) -> None: + """Damien's race shape: N threads call acquire on the same session_id; + exactly one must win, the rest must be cleanly rejected.""" + results: list[bool] = [] + barrier = threading.Barrier(8) + lock = threading.Lock() + + def try_acquire(idx: int) -> None: + holder = f"thread_{idx}" + barrier.wait() # synchronize start + got = db.try_acquire_compression_lock("contended_session", holder) + with lock: + results.append(got) + + threads = [threading.Thread(target=try_acquire, args=(i,)) for i in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Exactly one thread acquired + assert sum(1 for r in results if r is True) == 1 + assert sum(1 for r in results if r is False) == 7 + # The single winner still owns it + assert db.get_compression_lock_holder("contended_session") is not None