From a30480bd2b15ffd942ae3a24f1f993f575c89af2 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 28 May 2026 21:40:39 -0700 Subject: [PATCH] fix(compression): prevent session-id fork from concurrent compressions (#34351) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(compression): prevent session-id fork from concurrent compressions When two AIAgent instances share the same session_id (most commonly the parent-turn agent and its background-review fork, which inherits session_id verbatim via background_review.py L451), both can call compress_context() on overlapping snapshots of the same conversation. Each ends the parent and creates its own NEW child session in state.db, both parented to the same old id. The gateway SessionEntry only catches one rotation; the other becomes an orphan that silently accumulates writes — Damien's incident shape (parent 20260527_234659_e65f0e → two children, only one visible). Adds a state.db-backed per-session compression lock. Acquired before the rotation in conversation_compression.compress_context(); on failure, the caller returns messages unchanged so the auto-compress retry loop stops cleanly. TTL (5min default) reclaims locks abandoned by crashed compressors. Lock holder identity (pid:tid:agent:nonce) is preserved for diagnostics via get_compression_lock_holder(). Schema bumped 13 -> 14 to track the new compression_locks table. Reconciled additively via the existing declarative-column pattern; no data migration needed for existing DBs. Regression test reproduces Damien's shape: two threads racing _compress_context on a shared parent_sid. Without the lock the test deterministically produces 2 child sessions; with the lock, exactly 1. Covers all six compression entry points (preflight in conversation_loop, mid-turn fallback, hygiene compression in gateway, /compact, CLI /compress, TUI /compress). ACP /compress was already protected by nulling out _session_db before its compress call. * ci: trigger rerun (transient GitHub API rate limit on CodeQL workflow) --- agent/conversation_compression.py | 91 +++++++++ hermes_state.py | 137 +++++++++++++- .../agent/test_compression_concurrent_fork.py | 173 ++++++++++++++++++ tests/test_hermes_state_compression_locks.py | 149 +++++++++++++++ 4 files changed, 549 insertions(+), 1 deletion(-) create mode 100644 tests/agent/test_compression_concurrent_fork.py create mode 100644 tests/test_hermes_state_compression_locks.py diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index e11dc7c171d..1ab861bbec9 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -41,6 +41,26 @@ from agent.model_metadata import estimate_request_tokens_rough logger = logging.getLogger(__name__) +def _compression_lock_holder(agent: Any) -> str: + """Build a unique holder id for the lock: pid:tid:agent-instance:uuid. + + The pid+tid prefix lets ops tell crashed/abandoned holders apart from + live ones (expiry-based recovery uses the timestamp, but ``holder`` + is what shows up in diagnostics + log lines). The agent instance id + and a per-acquire uuid disambiguate two co-resident agents on the + same thread (background_review forks run on a worker thread, but + on machines where compression itself dispatches to a thread pool + we want each acquire to be unique). + """ + import threading + return ( + f"pid={os.getpid()}" + f":tid={threading.get_ident()}" + f":agent={id(agent):x}" + f":nonce={uuid.uuid4().hex[:8]}" + ) + + def check_compression_model_feasibility(agent: Any) -> None: """Warn at session start if the auxiliary compression model's context window is smaller than the main model's compression threshold. @@ -305,6 +325,65 @@ def compress_context( "🗜️ Compacting context — summarizing earlier conversation so I can continue..." ) + # ── Compression lock ──────────────────────────────────────────────── + # Atomic, state.db-backed lock per session_id. Without this, two + # AIAgent instances that share the same session_id (most commonly the + # parent-turn agent and its background-review fork — see + # ``agent/background_review.py``: ``review_agent.session_id = + # agent.session_id``) can each call compress() on overlapping + # snapshots of the same conversation. Both succeed, both rotate + # ``agent.session_id`` to a fresh id, both create child sessions in + # state.db parented to the same old id. The gateway's SessionEntry + # only catches one rotation, so the other child becomes an orphan + # that silently accumulates writes — Damien's repro shape. + # + # Acquire keyed on the OLD session_id (the rotation target's parent), + # because that's the id that competing paths see and read from + # SessionEntry at the start of their own compression attempt. + # + # If we can't acquire the lock, another path is mid-compression on + # this session. Aborting is correct: the messages are unchanged, the + # other path's rotation will produce the canonical new session_id, + # and our caller's auto-compress loop sees ``len(returned) == len(input)`` + # and stops retrying for this cycle. The session is NOT corrupted — + # we just sit out this round and let the winner finish. + _lock_db = getattr(agent, "_session_db", None) + _lock_sid = agent.session_id or "" + _lock_holder: Optional[str] = None + if _lock_db is not None and _lock_sid: + _lock_holder = _compression_lock_holder(agent) + if not _lock_db.try_acquire_compression_lock(_lock_sid, _lock_holder): + existing = _lock_db.get_compression_lock_holder(_lock_sid) + logger.warning( + "compression skipped: another path is compressing session=%s " + "(holder=%s) — returning messages unchanged to avoid session fork", + _lock_sid, existing, + ) + _lock_holder = None # don't release a lock we don't own + # Surface to the user once — quiet for downstream auto-compress loops + if getattr(agent, "_last_compression_lock_warning_sid", None) != _lock_sid: + agent._last_compression_lock_warning_sid = _lock_sid + try: + agent._emit_warning( + "⚠ Skipping concurrent compression — another path " + "is already compressing this session. Will retry " + "after it finishes." + ) + except Exception: + pass + _existing_sp = getattr(agent, "_cached_system_prompt", None) + if not _existing_sp: + _existing_sp = agent._build_system_prompt(system_message) + return messages, _existing_sp + + def _release_lock() -> None: + """Release the lock keyed on the OLD session_id (before rotation).""" + if _lock_db is not None and _lock_sid and _lock_holder: + try: + _lock_db.release_compression_lock(_lock_sid, _lock_holder) + except Exception as _rel_err: + logger.debug("compression lock release failed: %s", _rel_err) + # Notify external memory provider before compression discards context if agent._memory_manager: try: @@ -318,6 +397,11 @@ def compress_context( # Plugin context engine with strict signature that doesn't accept # focus_topic / force — fall back to calling without them. compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens) + except BaseException: + # ANY exception during compress() must release the lock so the + # session isn't permanently blocked from future compression. + _release_lock() + raise # If compression aborted (aux LLM failed to produce a usable summary) # the compressor returns the input messages unchanged. Surface the @@ -336,6 +420,7 @@ def compress_context( _existing_sp = getattr(agent, "_cached_system_prompt", None) if not _existing_sp: _existing_sp = agent._build_system_prompt(system_message) + _release_lock() # compression aborted — no rotation will happen return messages, _existing_sp summary_error = getattr(agent.context_compressor, "_last_summary_error", None) @@ -480,6 +565,12 @@ def compress_context( agent.session_id or "none", _pre_msg_count, len(compressed), f"{_compressed_est:,}", ) + # Release the lock on the OLD session_id only AFTER rotation completed + # and all post-rotation bookkeeping (memory manager, context engine, + # file dedup) ran. A concurrent path that wakes up the moment we + # release will see the NEW session_id in state.db / SessionEntry and + # acquire on that — no race against our just-finished work. + _release_lock() return compressed, new_system_prompt diff --git a/hermes_state.py b/hermes_state.py index ba33598b91e..ced77563357 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -33,7 +33,7 @@ T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" -SCHEMA_VERSION = 13 +SCHEMA_VERSION = 14 # --------------------------------------------------------------------------- # WAL-compatibility fallback @@ -281,10 +281,18 @@ CREATE TABLE IF NOT EXISTS state_meta ( value TEXT ); +CREATE TABLE IF NOT EXISTS compression_locks ( + session_id TEXT PRIMARY KEY, + holder TEXT NOT NULL, + acquired_at REAL NOT NULL, + expires_at REAL NOT NULL +); + CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source); CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id); CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC); CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp); +CREATE INDEX IF NOT EXISTS idx_compression_locks_expires ON compression_locks(expires_at); """ FTS_SQL = """ @@ -791,6 +799,133 @@ class SessionDB: ) self._execute_write(_do) + # ────────────────────────────────────────────────────────────────────── + # Compression locks + # ────────────────────────────────────────────────────────────────────── + # Atomic per-session locks that prevent two compression paths from + # racing on the same session_id and producing orphan child sessions. + # + # The race: ``conversation_compression.py`` rotates ``agent.session_id`` + # as a side effect of a successful compression (end old session, create + # new). That mutation is local to the AIAgent instance — but ``state.db`` + # is shared across all instances. Two AIAgents that share the same + # ``session_id`` at the moment they both decide to compress (most + # commonly the parent turn's agent + a background-review fork started + # right after the turn ended) each end the parent and create their own + # NEW session, parented to the same old id. The gateway SessionEntry + # only catches one rotation; the other child silently accumulates + # writes — Damien's "parent → two orphan children" repro shape. + # + # The lock is keyed by ``session_id`` and is held for the duration of + # the compress() call plus the rotation. ``holder`` identifies the + # current owner (pid:tid:nonce) for diagnostics; the lock is recovered + # via ``expires_at`` if the holder process crashed without releasing. + def try_acquire_compression_lock( + self, + session_id: str, + holder: str, + ttl_seconds: float = 300.0, + ) -> bool: + """Try to atomically acquire the compression lock for ``session_id``. + + Returns ``True`` on success (caller now owns the lock and must + release via :meth:`release_compression_lock`). Returns ``False`` + if another holder already owns a non-expired lock — the caller + MUST NOT proceed with compression in that case (its rotation would + race against the holder's, splitting the session lineage). + + Expired locks (``expires_at < now``) are reclaimed transparently: + the stale row is deleted and the new holder acquires it. This + prevents a crashed compressor from permanently blocking the + session. + + Implementation: single-transaction DELETE-expired + INSERT-or-IGNORE, + followed by a SELECT to confirm we got the row. SQLite serialises + writes, so the whole sequence is atomic against other writers. + """ + if not session_id: + return False + now = time.time() + expires_at = now + ttl_seconds + + def _do(conn): + # First: reclaim any expired lock for this session_id. + conn.execute( + "DELETE FROM compression_locks " + "WHERE session_id = ? AND expires_at < ?", + (session_id, now), + ) + # Then: try to insert. INSERT OR IGNORE returns no rowcount + # difference — verify ownership via SELECT. + conn.execute( + "INSERT OR IGNORE INTO compression_locks " + "(session_id, holder, acquired_at, expires_at) " + "VALUES (?, ?, ?, ?)", + (session_id, holder, now, expires_at), + ) + row = conn.execute( + "SELECT holder FROM compression_locks WHERE session_id = ?", + (session_id,), + ).fetchone() + return row is not None and ( + row["holder"] if isinstance(row, sqlite3.Row) else row[0] + ) == holder + + try: + return bool(self._execute_write(_do)) + except sqlite3.Error as exc: + logger.warning( + "try_acquire_compression_lock(%s) failed: %s", + session_id, exc, + ) + # Fail open: returning False makes the caller skip compression, + # which is the safe behaviour when the lock subsystem is broken. + return False + + def release_compression_lock(self, session_id: str, holder: str) -> None: + """Release the compression lock for ``session_id`` iff we own it. + + Idempotent: no-op when the lock has already expired and been + reclaimed by a different holder, or when no lock exists. The + ``holder`` check prevents a late-returning compressor from + clobbering a fresh lock held by someone else. + """ + if not session_id: + return + + def _do(conn): + conn.execute( + "DELETE FROM compression_locks " + "WHERE session_id = ? AND holder = ?", + (session_id, holder), + ) + + try: + self._execute_write(_do) + except sqlite3.Error as exc: + logger.warning( + "release_compression_lock(%s) failed: %s", + session_id, exc, + ) + + def get_compression_lock_holder(self, session_id: str) -> Optional[str]: + """Return the current (non-expired) holder for ``session_id``, or None. + + Diagnostic helper — not used by the locking protocol itself. + """ + if not session_id: + return None + now = time.time() + row = self._conn.execute( + "SELECT holder FROM compression_locks " + "WHERE session_id = ? AND expires_at >= ?", + (session_id, now), + ).fetchone() + if row is None: + return None + return row["holder"] if isinstance(row, sqlite3.Row) else row[0] + + def update_system_prompt(self, session_id: str, system_prompt: str) -> None: """Store the full assembled system prompt snapshot.""" def _do(conn): diff --git a/tests/agent/test_compression_concurrent_fork.py b/tests/agent/test_compression_concurrent_fork.py new file mode 100644 index 00000000000..5147f7146ab --- /dev/null +++ b/tests/agent/test_compression_concurrent_fork.py @@ -0,0 +1,173 @@ +"""Regression: prevent transcript fork when two paths compress the same session_id. + +Damien's incident (Discord, 2026-05-28): a long Hermes session in a Discord +gateway hit the compression threshold at the end of a turn. The parent agent +finished delivering the response and ``conversation_loop.py`` fired +``_spawn_background_review(...)`` — which builds a forked ``AIAgent`` that +inherits ``agent.session_id`` (see ``agent/background_review.py``:: +``review_agent.session_id = agent.session_id``). Roughly two seconds later +a synthetic ``Background process proc_… completed`` event arrived and +started a fresh turn on the same parent ``session_id`` (still cached in the +gateway's ``SessionEntry``). Both paths hit preflight compression on the +same parent transcript and called ``_compress_context`` concurrently. Each +ended the parent and created its own CHILD session in ``state.db``, both +parented to the same old id. The gateway's ``SessionEntry`` only caught one +rotation; the other child became an orphan that silently accumulated writes. + +Repro shape on Damien's machine: + + parent 20260527_234659_e65f0e ended_at=set end_reason='compression' + child 20260528_113619_fc80e1 parent=20260527_234659_e65f0e (in SessionEntry) + child parent=20260527_234659_e65f0e (silent writes) + +This regression simulates the two concurrent ``compress_context`` calls +against a shared ``state.db`` and asserts that the per-session compression +lock added in this PR prevents the orphan child. Without the lock the +fixture deterministically produces 2 children; with the lock, exactly 1. +""" + +from __future__ import annotations + +import os +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from hermes_state import SessionDB + + +def _build_agent_with_db(db: SessionDB, session_id: str): + """Build an AIAgent that's wired to ``db`` and pinned to ``session_id``.""" + with patch.dict(os.environ, {"OPENROUTER_API_KEY": "test-key"}): + from run_agent import AIAgent + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + session_db=db, + session_id=session_id, + skip_context_files=True, + skip_memory=True, + ) + + # Stub the compressor so it returns deterministic output and DOESN'T make + # an LLM call. Sleep inside compress() so the two threads' rotations + # actually overlap — without that the OS could happen to serialize them + # and hide the bug. + compressor = MagicMock() + + def _compress_with_overlap(*_a, **_kw): + time.sleep(0.25) + return [ + {"role": "user", "content": "[CONTEXT COMPACTION] summary"}, + {"role": "user", "content": "tail"}, + ] + + compressor.compress.side_effect = _compress_with_overlap + compressor.compression_count = 1 + compressor.last_prompt_tokens = 0 + compressor.last_completion_tokens = 0 + compressor._last_summary_error = None + compressor._last_compress_aborted = False + compressor._last_aux_model_failure_model = None + compressor._last_aux_model_failure_error = None + agent.context_compressor = compressor + return agent + + +def _count_children(db: SessionDB, parent_sid: str) -> int: + """Count rows in state.db whose parent_session_id == parent_sid.""" + rows = db._conn.execute( + "SELECT id FROM sessions WHERE parent_session_id = ?", + (parent_sid,), + ).fetchall() + return len(rows) + + +def test_concurrent_compression_does_not_fork_session(tmp_path: Path) -> None: + """Two AIAgents that share a session_id MUST NOT both rotate it. + + Without the per-session compression lock this fixture deterministically + produces 2 child sessions (transcript fork). With the lock the second + path aborts cleanly, leaving exactly 1 canonical child. + """ + db = SessionDB(db_path=tmp_path / "state.db") + + parent_sid = "PARENT_TEST_SESSION" + db.create_session(parent_sid, source="discord") + + # Two agents on the same session_id, both wired to the same db — + # mirrors the parent-turn agent + the background-review fork right + # after a turn ends. + agent_a = _build_agent_with_db(db, parent_sid) + agent_b = _build_agent_with_db(db, parent_sid) + messages = [{"role": "user", "content": f"m{i}"} for i in range(20)] + + def run(agent): + try: + agent._compress_context(messages, "sys", approx_tokens=120_000) + except Exception: + # Surface to the test if either raises — should not happen. + raise + + t_a = threading.Thread(target=run, args=(agent_a,), name="main_turn") + t_b = threading.Thread(target=run, args=(agent_b,), name="review_fork") + t_a.start() + t_b.start() + t_a.join(timeout=10) + t_b.join(timeout=10) + + # Exactly one canonical child — not two orphans. + assert _count_children(db, parent_sid) == 1, ( + "Compression lock failed: parent session has multiple children in state.db " + "(transcript fork). This is Damien's incident shape — see the test docstring." + ) + + # And exactly one of the two agents actually rotated its session_id; the + # other should still hold the parent_sid (its compression was skipped). + rotated = sum( + 1 for a in (agent_a, agent_b) if a.session_id != parent_sid + ) + assert rotated == 1, ( + f"Expected exactly one agent to rotate session_id, got {rotated}. " + "Both agents rotating means the lock didn't serialize them." + ) + + # The lock must be released after the winner finished. + assert db.get_compression_lock_holder(parent_sid) is None, ( + "Compression lock leaked: still held after both rotations completed." + ) + + +def test_skipped_compression_returns_messages_unchanged(tmp_path: Path) -> None: + """The loser of the lock race must return its input messages verbatim. + + Callers (preflight compression in ``conversation_loop.py``) detect the + no-op via ``len(returned) == len(input)`` and stop the auto-compress + retry loop. If the skipped path returned the compressed view, that + detection would break and the caller would mutate the conversation + without going through state.db rotation. + """ + db = SessionDB(db_path=tmp_path / "state.db") + parent_sid = "LOSER_TEST" + db.create_session(parent_sid, source="discord") + + # Pre-acquire the lock so the agent's compress_context sees it held. + held = db.try_acquire_compression_lock(parent_sid, "external_holder") + assert held is True + + agent = _build_agent_with_db(db, parent_sid) + messages = [{"role": "user", "content": "m1"}, {"role": "user", "content": "m2"}] + + compressed, _sp = agent._compress_context(messages, "sys", approx_tokens=120_000) + + # Skipped: messages returned verbatim, no rotation + assert compressed is messages or compressed == messages + assert agent.session_id == parent_sid + # Compressor was never called (the skip happens before .compress()) + agent.context_compressor.compress.assert_not_called() diff --git a/tests/test_hermes_state_compression_locks.py b/tests/test_hermes_state_compression_locks.py new file mode 100644 index 00000000000..53e3bc0dec4 --- /dev/null +++ b/tests/test_hermes_state_compression_locks.py @@ -0,0 +1,149 @@ +"""Tests for ``SessionDB`` compression-lock primitives. + +These cover the atomic per-session lock that prevents two compression +paths from racing on the same ``session_id`` and producing orphan child +sessions (Damien's "parent → two orphan children" repro shape, see +``tests/agent/test_compression_concurrent_fork.py`` for the +behavioural regression test). + +Focus here: the lock primitives themselves (acquire, release, TTL, +diagnostic accessor) — not the wiring into compression. +""" + +from __future__ import annotations + +import threading +import time +from pathlib import Path + +import pytest + +from hermes_state import SessionDB + + +@pytest.fixture +def db(tmp_path: Path) -> SessionDB: + return SessionDB(tmp_path / "state.db") + + +# ---------------------------------------------------------------------- +# Single-holder semantics +# ---------------------------------------------------------------------- + + +def test_acquire_succeeds_when_unlocked(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("sess1", "holder1") is True + assert db.get_compression_lock_holder("sess1") == "holder1" + + +def test_acquire_blocks_second_holder(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("sess1", "holder1") is True + assert db.try_acquire_compression_lock("sess1", "holder2") is False + # First holder still owns it + assert db.get_compression_lock_holder("sess1") == "holder1" + + +def test_release_allows_reacquire(db: SessionDB) -> None: + db.try_acquire_compression_lock("sess1", "holder1") + db.release_compression_lock("sess1", "holder1") + assert db.get_compression_lock_holder("sess1") is None + assert db.try_acquire_compression_lock("sess1", "holder2") is True + + +def test_release_with_wrong_holder_is_noop(db: SessionDB) -> None: + db.try_acquire_compression_lock("sess1", "holder1") + # Late-returning compressor must not release a lock it doesn't own + db.release_compression_lock("sess1", "holder_other") + assert db.get_compression_lock_holder("sess1") == "holder1" + + +def test_release_when_unlocked_is_noop(db: SessionDB) -> None: + # No exception, no state change + db.release_compression_lock("never_locked", "holder1") + assert db.get_compression_lock_holder("never_locked") is None + + +# ---------------------------------------------------------------------- +# Per-session isolation +# ---------------------------------------------------------------------- + + +def test_locks_are_per_session(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("sess1", "holder1") is True + # Different session: independent lock + assert db.try_acquire_compression_lock("sess2", "holder2") is True + assert db.get_compression_lock_holder("sess1") == "holder1" + assert db.get_compression_lock_holder("sess2") == "holder2" + + +# ---------------------------------------------------------------------- +# TTL / expiry recovery +# ---------------------------------------------------------------------- + + +def test_expired_lock_is_reclaimable(db: SessionDB) -> None: + """A crashed compressor must not permanently block the session.""" + # Acquire with a very short TTL + db.try_acquire_compression_lock("sess1", "crashed_holder", ttl_seconds=0.05) + time.sleep(0.1) + # Holder check honours expiry + assert db.get_compression_lock_holder("sess1") is None + # New holder can claim it + assert db.try_acquire_compression_lock("sess1", "fresh_holder") is True + assert db.get_compression_lock_holder("sess1") == "fresh_holder" + + +def test_non_expired_lock_is_held(db: SessionDB) -> None: + db.try_acquire_compression_lock("sess1", "holder1", ttl_seconds=60) + # Immediately after, still held + assert db.try_acquire_compression_lock("sess1", "holder2") is False + + +# ---------------------------------------------------------------------- +# Empty / invalid input +# ---------------------------------------------------------------------- + + +def test_acquire_empty_session_id_returns_false(db: SessionDB) -> None: + assert db.try_acquire_compression_lock("", "holder1") is False + + +def test_release_empty_session_id_is_noop(db: SessionDB) -> None: + # No exception + db.release_compression_lock("", "holder1") + + +def test_holder_empty_session_id_returns_none(db: SessionDB) -> None: + assert db.get_compression_lock_holder("") is None + + +# ---------------------------------------------------------------------- +# Concurrency: real threads racing on the same session_id +# ---------------------------------------------------------------------- + + +def test_concurrent_acquire_only_one_winner(db: SessionDB) -> None: + """Damien's race shape: N threads call acquire on the same session_id; + exactly one must win, the rest must be cleanly rejected.""" + results: list[bool] = [] + barrier = threading.Barrier(8) + lock = threading.Lock() + + def try_acquire(idx: int) -> None: + holder = f"thread_{idx}" + barrier.wait() # synchronize start + got = db.try_acquire_compression_lock("contended_session", holder) + with lock: + results.append(got) + + threads = [threading.Thread(target=try_acquire, args=(i,)) for i in range(8)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Exactly one thread acquired + assert sum(1 for r in results if r is True) == 1 + assert sum(1 for r in results if r is False) == 7 + # The single winner still owns it + assert db.get_compression_lock_holder("contended_session") is not None