fix(agent): make compression lock-lease refresher tolerate transient DB blips

Follow-up hardening on the salvaged #54465 backoff persistence work.

The lease refresher's loop treated ANY falsy refresh as a permanent stop
(`if not refreshed: break`), conflating two distinct cases:
  - genuine lost-ownership (rowcount 0) — correct to stop, and
  - a one-off transient DB error (write contention that escapes
    _execute_write's retry budget) — which returned False identically.

A single transient blip therefore killed the lease for the rest of a
multi-minute compression call, silently reintroducing the exact 300s-TTL <
~361s-call expiry wedge the PR set out to fix.

Changes:
- _CompressionLockLeaseRefresher._run now tolerates a bounded run of
  consecutive failures (_MAX_CONSECUTIVE_REFRESH_FAILURES = 3) before giving
  up the lease; a recovered tick resets the counter. Worst-case extra hold is
  cap * refresh_interval, still bounded by the acquirer's TTL.
- Replace the two remaining silent `except Exception: pass` arms in the
  compression-failure-cooldown persist/clear helpers with debug logging, for
  parity with their sqlite3.Error sibling arms (a non-sqlite bug was invisible).
- Document the join(timeout=1.0) quiesce bound in stop().
- Add 3 regression tests: single-blip tolerance, persistent-failure stop at the
  cap, and refresh-raising tolerance.
This commit is contained in:
kshitijk4poor 2026-06-30 13:20:31 +05:30 committed by kshitij
parent 7479f26b3f
commit 58d8e25e67
3 changed files with 190 additions and 6 deletions

View file

@ -737,8 +737,8 @@ class ContextCompressor(ContextEngine):
recorder(session_id, cooldown_until, error)
except sqlite3.Error as exc:
logger.debug("compression failure cooldown persist failed: %s", exc)
except Exception:
pass
except Exception as exc:
logger.debug("compression failure cooldown persist failed (non-sqlite): %s", exc)
def _clear_compression_failure_cooldown(self) -> None:
self._summary_failure_cooldown_until = 0.0
@ -756,8 +756,8 @@ class ContextCompressor(ContextEngine):
clearer(session_id)
except sqlite3.Error as exc:
logger.debug("compression failure cooldown clear failed: %s", exc)
except Exception:
pass
except Exception as exc:
logger.debug("compression failure cooldown clear failed (non-sqlite): %s", exc)
def update_model(
self,

View file

@ -88,6 +88,14 @@ class _CompressionLockLeaseRefresher:
if refresh_interval_seconds is None:
refresh_interval_seconds = max(1.0, min(60.0, ttl_seconds / 2.0))
self._refresh_interval_seconds = max(0.1, float(refresh_interval_seconds))
# Tolerate transient refresh failures for at most one lease's worth of
# time, so the give-up window is genuinely bounded by the TTL the
# acquirer set (a single blip recovers on the next tick; a persistent
# failure stops before the lease could outlive its TTL). Floor of 1 so a
# degenerate interval >= ttl still tolerates one blip.
self._max_consecutive_failures = max(
1, int(self._ttl_seconds / self._refresh_interval_seconds)
)
self._stop = threading.Event()
self._thread = threading.Thread(
target=self._run,
@ -101,10 +109,25 @@ class _CompressionLockLeaseRefresher:
def stop(self) -> None:
self._stop.set()
# join() may time out while the refresher is mid-UPDATE; that's safe —
# it's a daemon thread, and a late refresh on an already-released lock
# matches rowcount 0 (a no-op). stop() returning does not guarantee the
# thread has fully quiesced, only that we've signalled it and waited
# briefly.
if self._thread.is_alive() and threading.current_thread() is not self._thread:
self._thread.join(timeout=1.0)
def _run(self) -> None:
# A single falsy refresh must NOT permanently kill the lease: a
# transient DB blip (write contention escaping _execute_write's retry
# budget, a momentary "database is locked") returns False just like a
# genuine lost-ownership, but only the latter should stop the loop.
# Tolerate consecutive failures for at most one lease's worth of time
# (_max_consecutive_failures = ttl / interval), so a one-off blip
# recovers on the next tick while the total give-up window stays bounded
# by the TTL the acquirer set — the lock can never be held past its TTL
# by a stuck refresher.
consecutive_failures = 0
while not self._stop.wait(self._refresh_interval_seconds):
try:
refreshed = self._db.refresh_compression_lock(
@ -113,9 +136,18 @@ class _CompressionLockLeaseRefresher:
ttl_seconds=self._ttl_seconds,
)
except Exception as exc:
logger.debug("compression lock refresh failed: %s", exc)
logger.debug("compression lock refresh raised: %s", exc)
refreshed = False
if not refreshed:
if refreshed:
consecutive_failures = 0
continue
consecutive_failures += 1
if consecutive_failures >= self._max_consecutive_failures:
logger.debug(
"compression lock refresh failed %d times in a row; "
"stopping lease refresher for session %s",
consecutive_failures, self._session_id,
)
break

View file

@ -461,3 +461,155 @@ def test_review_fork_disables_compression_to_prevent_stale_parent_fork(tmp_path:
"False — this flag MUST be cleared on the review fork."
)
db.close()
# ── Lease-refresher bounded-failure tolerance (salvage follow-up, #54465) ────
# A single falsy refresh (transient DB blip) must NOT permanently kill the
# lease — only a *persistent* failure (genuine lost-ownership) should stop the
# refresher after a bounded number of consecutive failures. Without this, one
# escaped lock-contention error silently reintroduces the TTL-expiry wedge the
# PR set out to fix.
class _FlakyRefreshDB:
"""A db whose refresh_compression_lock returns a scripted sequence."""
def __init__(self, results):
self._results = list(results)
self.calls = 0
def refresh_compression_lock(self, session_id, holder, ttl_seconds=300.0):
self.calls += 1
if self._results:
return self._results.pop(0)
return True # steady-state success after the scripted prefix
def _no_sleep(refresher) -> None:
"""Make the refresher loop iterate without real wall-clock sleeps.
``_stop.wait(interval)`` returns False (keep looping) instantly instead of
blocking for the (clamped) interval, so count-based tests stay fast and
deterministic the loop's termination is driven by the failure cap / the
scripted db, not by timing.
"""
refresher._stop.wait = lambda _interval: False # type: ignore[assignment]
def test_lease_refresher_survives_single_transient_failure() -> None:
"""One False (transient blip) followed by success must NOT stop the loop.
Regression for the W1/W2 finding: the original ``if not refreshed: break``
treated a one-off failure identically to genuine lost-ownership, killing
the lease on the first hiccup.
"""
from agent.conversation_compression import _CompressionLockLeaseRefresher
# Script: success, FAILURE (blip), success, then stop the loop externally.
db = _FlakyRefreshDB([True, False, True])
refresher = _CompressionLockLeaseRefresher(
db, "sess", "holder", ttl_seconds=10.0, refresh_interval_seconds=0.001
)
# Stop after exactly 4 ticks (3 scripted + 1 steady success), no real sleep.
refresher._stop.wait = lambda _i: db.calls >= 4 # type: ignore[assignment]
refresher._run()
# The single False at call 2 must NOT have ended the loop — we keep going
# past it (calls reach >= 4), proving the blip was tolerated.
assert db.calls >= 4, (
"Lease refresher stopped after a single transient failure — the "
"bounded-tolerance fix regressed (one blip must not kill the lease)."
)
def test_lease_refresher_failure_window_is_bounded_by_ttl() -> None:
"""Persistent failure stops within one lease's worth of time, not forever.
The contract (not a magic count): the give-up window
``cap * refresh_interval`` must be <= the TTL, so a stuck refresher can
never hold the lock past its TTL. We assert that relationship directly
rather than freezing a literal cap (behavior contract over snapshot).
"""
from agent.conversation_compression import _CompressionLockLeaseRefresher
ttl, interval = 10.0, 2.0 # cap should be int(10/2) = 5
db = _FlakyRefreshDB([False] * 50) # never recovers (lost ownership)
refresher = _CompressionLockLeaseRefresher(
db, "sess", "holder", ttl_seconds=ttl, refresh_interval_seconds=interval
)
_no_sleep(refresher)
refresher._run()
cap = refresher._max_consecutive_failures
assert cap == int(ttl / interval), "cap must derive from ttl/interval"
# Stops at the cap — not on the first failure, not forever.
assert db.calls == cap
# The invariant that makes the cap honest: total tolerance <= one TTL.
assert cap * interval <= ttl, (
f"give-up window {cap * interval}s must not exceed the lease TTL {ttl}s"
)
def test_lease_refresher_failure_cap_has_floor_of_one() -> None:
"""A degenerate interval >= ttl still tolerates exactly one blip (floor 1)."""
from agent.conversation_compression import _CompressionLockLeaseRefresher
db = _FlakyRefreshDB([False] * 10)
refresher = _CompressionLockLeaseRefresher(
db, "sess", "holder", ttl_seconds=1.0, refresh_interval_seconds=5.0
)
_no_sleep(refresher)
refresher._run()
assert refresher._max_consecutive_failures == 1
assert db.calls == 1
def test_lease_refresher_recovers_after_raise() -> None:
"""A raise treated as a failure tick must RESET on a later success — the
exception arm gets the same blip-tolerance as a falsy return, not just a
'doesn't crash' guarantee."""
from agent.conversation_compression import _CompressionLockLeaseRefresher
class _RaiseThenOKDB:
"""Raise once, then succeed forever — the transient-blip analog."""
def __init__(self):
self.calls = 0
def refresh_compression_lock(self, *a, **k):
self.calls += 1
if self.calls == 1:
raise RuntimeError("simulated DB hiccup")
return True
db = _RaiseThenOKDB()
refresher = _CompressionLockLeaseRefresher(
db, "sess", "holder", ttl_seconds=10.0, refresh_interval_seconds=2.0
)
# Run a handful of ticks past the raise, then stop.
refresher._stop.wait = lambda _i: db.calls >= 4 # type: ignore[assignment]
refresher._run() # must not propagate the RuntimeError
# Survived the raise and kept refreshing — the counter reset on recovery.
assert db.calls >= 4
def test_lease_refresher_stops_on_persistent_raise() -> None:
"""A refresh that raises every tick is bounded by the same TTL-derived cap,
never propagates, and never loops forever."""
from agent.conversation_compression import _CompressionLockLeaseRefresher
class _AlwaysRaiseDB:
def __init__(self):
self.calls = 0
def refresh_compression_lock(self, *a, **k):
self.calls += 1
raise RuntimeError("simulated DB hiccup")
db = _AlwaysRaiseDB()
refresher = _CompressionLockLeaseRefresher(
db, "sess", "holder", ttl_seconds=10.0, refresh_interval_seconds=2.0
)
_no_sleep(refresher)
refresher._run() # must not propagate
assert db.calls == refresher._max_consecutive_failures