fix(compression): prevent session-id fork from concurrent compressions (#34351)

* fix(compression): prevent session-id fork from concurrent compressions

When two AIAgent instances share the same session_id (most commonly the
parent-turn agent and its background-review fork, which inherits
session_id verbatim via background_review.py L451), both can call
compress_context() on overlapping snapshots of the same conversation.
Each ends the parent and creates its own NEW child session in state.db,
both parented to the same old id. The gateway SessionEntry only catches
one rotation; the other becomes an orphan that silently accumulates
writes — Damien's incident shape (parent 20260527_234659_e65f0e → two
children, only one visible).

Adds a state.db-backed per-session compression lock. Acquired before
the rotation in conversation_compression.compress_context(); on
failure, the caller returns messages unchanged so the auto-compress
retry loop stops cleanly. TTL (5min default) reclaims locks abandoned
by crashed compressors. Lock holder identity (pid:tid:agent:nonce) is
preserved for diagnostics via get_compression_lock_holder().

Schema bumped 13 -> 14 to track the new compression_locks table.
Reconciled additively via the existing declarative-column pattern;
no data migration needed for existing DBs.

Regression test reproduces Damien's shape: two threads racing
_compress_context on a shared parent_sid. Without the lock the test
deterministically produces 2 child sessions; with the lock, exactly 1.

Covers all six compression entry points (preflight in conversation_loop,
mid-turn fallback, hygiene compression in gateway, /compact, CLI
/compress, TUI /compress). ACP /compress was already protected by
nulling out _session_db before its compress call.

* ci: trigger rerun (transient GitHub API rate limit on CodeQL workflow)
This commit is contained in:
Teknium 2026-05-28 21:40:39 -07:00 committed by GitHub
parent 28bb7e0a8e
commit a30480bd2b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 549 additions and 1 deletions

View file

@ -41,6 +41,26 @@ from agent.model_metadata import estimate_request_tokens_rough
logger = logging.getLogger(__name__)
def _compression_lock_holder(agent: Any) -> str:
"""Build a unique holder id for the lock: pid:tid:agent-instance:uuid.
The pid+tid prefix lets ops tell crashed/abandoned holders apart from
live ones (expiry-based recovery uses the timestamp, but ``holder``
is what shows up in diagnostics + log lines). The agent instance id
and a per-acquire uuid disambiguate two co-resident agents on the
same thread (background_review forks run on a worker thread, but
on machines where compression itself dispatches to a thread pool
we want each acquire to be unique).
"""
import threading
return (
f"pid={os.getpid()}"
f":tid={threading.get_ident()}"
f":agent={id(agent):x}"
f":nonce={uuid.uuid4().hex[:8]}"
)
def check_compression_model_feasibility(agent: Any) -> None:
"""Warn at session start if the auxiliary compression model's context
window is smaller than the main model's compression threshold.
@ -305,6 +325,65 @@ def compress_context(
"🗜️ Compacting context — summarizing earlier conversation so I can continue..."
)
# ── Compression lock ────────────────────────────────────────────────
# Atomic, state.db-backed lock per session_id. Without this, two
# AIAgent instances that share the same session_id (most commonly the
# parent-turn agent and its background-review fork — see
# ``agent/background_review.py``: ``review_agent.session_id =
# agent.session_id``) can each call compress() on overlapping
# snapshots of the same conversation. Both succeed, both rotate
# ``agent.session_id`` to a fresh id, both create child sessions in
# state.db parented to the same old id. The gateway's SessionEntry
# only catches one rotation, so the other child becomes an orphan
# that silently accumulates writes — Damien's repro shape.
#
# Acquire keyed on the OLD session_id (the rotation target's parent),
# because that's the id that competing paths see and read from
# SessionEntry at the start of their own compression attempt.
#
# If we can't acquire the lock, another path is mid-compression on
# this session. Aborting is correct: the messages are unchanged, the
# other path's rotation will produce the canonical new session_id,
# and our caller's auto-compress loop sees ``len(returned) == len(input)``
# and stops retrying for this cycle. The session is NOT corrupted —
# we just sit out this round and let the winner finish.
_lock_db = getattr(agent, "_session_db", None)
_lock_sid = agent.session_id or ""
_lock_holder: Optional[str] = None
if _lock_db is not None and _lock_sid:
_lock_holder = _compression_lock_holder(agent)
if not _lock_db.try_acquire_compression_lock(_lock_sid, _lock_holder):
existing = _lock_db.get_compression_lock_holder(_lock_sid)
logger.warning(
"compression skipped: another path is compressing session=%s "
"(holder=%s) — returning messages unchanged to avoid session fork",
_lock_sid, existing,
)
_lock_holder = None # don't release a lock we don't own
# Surface to the user once — quiet for downstream auto-compress loops
if getattr(agent, "_last_compression_lock_warning_sid", None) != _lock_sid:
agent._last_compression_lock_warning_sid = _lock_sid
try:
agent._emit_warning(
"⚠ Skipping concurrent compression — another path "
"is already compressing this session. Will retry "
"after it finishes."
)
except Exception:
pass
_existing_sp = getattr(agent, "_cached_system_prompt", None)
if not _existing_sp:
_existing_sp = agent._build_system_prompt(system_message)
return messages, _existing_sp
def _release_lock() -> None:
"""Release the lock keyed on the OLD session_id (before rotation)."""
if _lock_db is not None and _lock_sid and _lock_holder:
try:
_lock_db.release_compression_lock(_lock_sid, _lock_holder)
except Exception as _rel_err:
logger.debug("compression lock release failed: %s", _rel_err)
# Notify external memory provider before compression discards context
if agent._memory_manager:
try:
@ -318,6 +397,11 @@ def compress_context(
# Plugin context engine with strict signature that doesn't accept
# focus_topic / force — fall back to calling without them.
compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens)
except BaseException:
# ANY exception during compress() must release the lock so the
# session isn't permanently blocked from future compression.
_release_lock()
raise
# If compression aborted (aux LLM failed to produce a usable summary)
# the compressor returns the input messages unchanged. Surface the
@ -336,6 +420,7 @@ def compress_context(
_existing_sp = getattr(agent, "_cached_system_prompt", None)
if not _existing_sp:
_existing_sp = agent._build_system_prompt(system_message)
_release_lock() # compression aborted — no rotation will happen
return messages, _existing_sp
summary_error = getattr(agent.context_compressor, "_last_summary_error", None)
@ -480,6 +565,12 @@ def compress_context(
agent.session_id or "none", _pre_msg_count, len(compressed),
f"{_compressed_est:,}",
)
# Release the lock on the OLD session_id only AFTER rotation completed
# and all post-rotation bookkeeping (memory manager, context engine,
# file dedup) ran. A concurrent path that wakes up the moment we
# release will see the NEW session_id in state.db / SessionEntry and
# acquire on that — no race against our just-finished work.
_release_lock()
return compressed, new_system_prompt