fix(agent): prevent stale lock leases after early compression exits (#54465)

This commit is contained in:
Rod Boev 2026-06-28 20:14:24 -04:00 committed by kshitij
parent f2ace45286
commit cafe9d9261
2 changed files with 87 additions and 14 deletions

View file

@ -552,7 +552,11 @@ def compress_context(
except TypeError:
# Plugin context engine with strict signature that doesn't accept
# focus_topic / force — fall back to calling without them.
compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens)
try:
compressed = agent.context_compressor.compress(messages, current_tokens=approx_tokens)
except BaseException:
_release_lock()
raise
except BaseException:
# ANY exception during compress() must release the lock so the
# session isn't permanently blocked from future compression.
@ -565,19 +569,21 @@ def compress_context(
# session has logically ended), and let auto-compress callers detect
# the no-op via len(returned) == len(input).
if getattr(agent.context_compressor, "_last_compress_aborted", False):
_err = getattr(agent.context_compressor, "_last_summary_error", None) or "unknown error"
if getattr(agent, "_last_compression_summary_warning", None) != _err:
agent._last_compression_summary_warning = _err
agent._emit_warning(
f"⚠ Compression aborted: {_err}. "
"No messages were dropped — conversation continues unchanged. "
"Run /compress to retry, or /new to start a fresh session."
)
_existing_sp = getattr(agent, "_cached_system_prompt", None)
if not _existing_sp:
_existing_sp = agent._build_system_prompt(system_message)
_release_lock() # compression aborted — no rotation will happen
return messages, _existing_sp
try:
_err = getattr(agent.context_compressor, "_last_summary_error", None) or "unknown error"
if getattr(agent, "_last_compression_summary_warning", None) != _err:
agent._last_compression_summary_warning = _err
agent._emit_warning(
f"⚠ Compression aborted: {_err}. "
"No messages were dropped — conversation continues unchanged. "
"Run /compress to retry, or /new to start a fresh session."
)
_existing_sp = getattr(agent, "_cached_system_prompt", None)
if not _existing_sp:
_existing_sp = agent._build_system_prompt(system_message)
return messages, _existing_sp
finally:
_release_lock()
try:
summary_error = getattr(agent.context_compressor, "_last_summary_error", None)

View file

@ -253,6 +253,73 @@ def test_post_compress_exception_stops_lock_refresher(tmp_path: Path, monkeypatc
assert db.try_acquire_compression_lock(parent_sid, "probe", ttl_seconds=1.0) is True
def test_abort_warning_exception_stops_lock_refresher(tmp_path: Path, monkeypatch) -> None:
"""An abort-path warning exception must still release the refreshed lock."""
real_try_acquire = SessionDB.try_acquire_compression_lock
def _short_ttl(self, session_id: str, holder: str, ttl_seconds: float = 300.0) -> bool:
return real_try_acquire(self, session_id, holder, ttl_seconds=1.0)
monkeypatch.setattr(SessionDB, "try_acquire_compression_lock", _short_ttl)
db = SessionDB(db_path=tmp_path / "state.db")
parent_sid = "REFRESH_ABORT_TEST"
db.create_session(parent_sid, source="discord")
agent = _build_agent_with_db(db, parent_sid)
agent._compression_lock_ttl_seconds = 1.0
agent._compression_lock_refresh_interval = 0.1
def _aborting_compress(*_a, **_kw):
agent.context_compressor._last_compress_aborted = True
agent.context_compressor._last_summary_error = "summary failed"
return [{"role": "user", "content": "tail"}]
agent.context_compressor.compress.side_effect = _aborting_compress
agent._emit_warning = lambda *_a, **_k: (_ for _ in ()).throw(RuntimeError("abort boom"))
messages = [{"role": "user", "content": f"m{i}"} for i in range(20)]
with pytest.raises(RuntimeError, match="abort boom"):
agent._compress_context(messages, "sys", approx_tokens=120_000)
time.sleep(1.3)
assert db.try_acquire_compression_lock(parent_sid, "probe", ttl_seconds=1.0) is True
def test_typeerror_fallback_exception_stops_lock_refresher(tmp_path: Path, monkeypatch) -> None:
"""A strict-signature fallback failure must still release the refreshed lock."""
real_try_acquire = SessionDB.try_acquire_compression_lock
def _short_ttl(self, session_id: str, holder: str, ttl_seconds: float = 300.0) -> bool:
return real_try_acquire(self, session_id, holder, ttl_seconds=1.0)
monkeypatch.setattr(SessionDB, "try_acquire_compression_lock", _short_ttl)
db = SessionDB(db_path=tmp_path / "state.db")
parent_sid = "REFRESH_TYPEERROR_TEST"
db.create_session(parent_sid, source="discord")
agent = _build_agent_with_db(db, parent_sid)
agent._compression_lock_ttl_seconds = 1.0
agent._compression_lock_refresh_interval = 0.1
def _strict_signature(*_a, **_kw):
if "focus_topic" in _kw or "force" in _kw:
raise TypeError("strict signature")
raise RuntimeError("fallback boom")
agent.context_compressor.compress.side_effect = _strict_signature
messages = [{"role": "user", "content": f"m{i}"} for i in range(20)]
with pytest.raises(RuntimeError, match="fallback boom"):
agent._compress_context(messages, "sys", approx_tokens=120_000)
time.sleep(1.3)
assert db.try_acquire_compression_lock(parent_sid, "probe", ttl_seconds=1.0) is True
class _NoLockSubsystemDB:
"""Wraps a real SessionDB but simulates a pre-#34351 version skew.