From cafe9d9261db96f322ae2fa553a36fd4b62f2b9f Mon Sep 17 00:00:00 2001 From: Rod Boev Date: Sun, 28 Jun 2026 20:14:24 -0400 Subject: [PATCH] fix(agent): prevent stale lock leases after early compression exits (#54465) --- agent/conversation_compression.py | 34 ++++++---- .../agent/test_compression_concurrent_fork.py | 67 +++++++++++++++++++ 2 files changed, 87 insertions(+), 14 deletions(-) diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index 4ff81d2cd3d..b78e24b729c 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -552,7 +552,11 @@ def compress_context( except TypeError: # Plugin context engine with strict signature that doesn't accept # focus_topic / force — fall back to calling without them. - compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens) + try: + compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens) + except BaseException: + _release_lock() + raise except BaseException: # ANY exception during compress() must release the lock so the # session isn't permanently blocked from future compression. @@ -565,19 +569,21 @@ def compress_context( # session has logically ended), and let auto-compress callers detect # the no-op via len(returned) == len(input). if getattr(agent.context_compressor, "_last_compress_aborted", False): - _err = getattr(agent.context_compressor, "_last_summary_error", None) or "unknown error" - if getattr(agent, "_last_compression_summary_warning", None) != _err: - agent._last_compression_summary_warning = _err - agent._emit_warning( - f"⚠ Compression aborted: {_err}. " - "No messages were dropped — conversation continues unchanged. " - "Run /compress to retry, or /new to start a fresh session." - ) - _existing_sp = getattr(agent, "_cached_system_prompt", None) - if not _existing_sp: - _existing_sp = agent._build_system_prompt(system_message) - _release_lock() # compression aborted — no rotation will happen - return messages, _existing_sp + try: + _err = getattr(agent.context_compressor, "_last_summary_error", None) or "unknown error" + if getattr(agent, "_last_compression_summary_warning", None) != _err: + agent._last_compression_summary_warning = _err + agent._emit_warning( + f"⚠ Compression aborted: {_err}. " + "No messages were dropped — conversation continues unchanged. " + "Run /compress to retry, or /new to start a fresh session." + ) + _existing_sp = getattr(agent, "_cached_system_prompt", None) + if not _existing_sp: + _existing_sp = agent._build_system_prompt(system_message) + return messages, _existing_sp + finally: + _release_lock() try: summary_error = getattr(agent.context_compressor, "_last_summary_error", None) diff --git a/tests/agent/test_compression_concurrent_fork.py b/tests/agent/test_compression_concurrent_fork.py index 18101fa811b..d02f2845273 100644 --- a/tests/agent/test_compression_concurrent_fork.py +++ b/tests/agent/test_compression_concurrent_fork.py @@ -253,6 +253,73 @@ def test_post_compress_exception_stops_lock_refresher(tmp_path: Path, monkeypatc assert db.try_acquire_compression_lock(parent_sid, "probe", ttl_seconds=1.0) is True +def test_abort_warning_exception_stops_lock_refresher(tmp_path: Path, monkeypatch) -> None: + """An abort-path warning exception must still release the refreshed lock.""" + real_try_acquire = SessionDB.try_acquire_compression_lock + + def _short_ttl(self, session_id: str, holder: str, ttl_seconds: float = 300.0) -> bool: + return real_try_acquire(self, session_id, holder, ttl_seconds=1.0) + + monkeypatch.setattr(SessionDB, "try_acquire_compression_lock", _short_ttl) + + db = SessionDB(db_path=tmp_path / "state.db") + parent_sid = "REFRESH_ABORT_TEST" + db.create_session(parent_sid, source="discord") + + agent = _build_agent_with_db(db, parent_sid) + agent._compression_lock_ttl_seconds = 1.0 + agent._compression_lock_refresh_interval = 0.1 + + def _aborting_compress(*_a, **_kw): + agent.context_compressor._last_compress_aborted = True + agent.context_compressor._last_summary_error = "summary failed" + return [{"role": "user", "content": "tail"}] + + agent.context_compressor.compress.side_effect = _aborting_compress + agent._emit_warning = lambda *_a, **_k: (_ for _ in ()).throw(RuntimeError("abort boom")) + + messages = [{"role": "user", "content": f"m{i}"} for i in range(20)] + + with pytest.raises(RuntimeError, match="abort boom"): + agent._compress_context(messages, "sys", approx_tokens=120_000) + + time.sleep(1.3) + assert db.try_acquire_compression_lock(parent_sid, "probe", ttl_seconds=1.0) is True + + +def test_typeerror_fallback_exception_stops_lock_refresher(tmp_path: Path, monkeypatch) -> None: + """A strict-signature fallback failure must still release the refreshed lock.""" + real_try_acquire = SessionDB.try_acquire_compression_lock + + def _short_ttl(self, session_id: str, holder: str, ttl_seconds: float = 300.0) -> bool: + return real_try_acquire(self, session_id, holder, ttl_seconds=1.0) + + monkeypatch.setattr(SessionDB, "try_acquire_compression_lock", _short_ttl) + + db = SessionDB(db_path=tmp_path / "state.db") + parent_sid = "REFRESH_TYPEERROR_TEST" + db.create_session(parent_sid, source="discord") + + agent = _build_agent_with_db(db, parent_sid) + agent._compression_lock_ttl_seconds = 1.0 + agent._compression_lock_refresh_interval = 0.1 + + def _strict_signature(*_a, **_kw): + if "focus_topic" in _kw or "force" in _kw: + raise TypeError("strict signature") + raise RuntimeError("fallback boom") + + agent.context_compressor.compress.side_effect = _strict_signature + + messages = [{"role": "user", "content": f"m{i}"} for i in range(20)] + + with pytest.raises(RuntimeError, match="fallback boom"): + agent._compress_context(messages, "sys", approx_tokens=120_000) + + time.sleep(1.3) + assert db.try_acquire_compression_lock(parent_sid, "probe", ttl_seconds=1.0) is True + + class _NoLockSubsystemDB: """Wraps a real SessionDB but simulates a pre-#34351 version skew.