From 58d8e25e671ead6d3ae4f30d6fa2f193bab44059 Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Tue, 30 Jun 2026 13:20:31 +0530 Subject: [PATCH] fix(agent): make compression lock-lease refresher tolerate transient DB blips MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up hardening on the salvaged #54465 backoff persistence work. The lease refresher's loop treated ANY falsy refresh as a permanent stop (`if not refreshed: break`), conflating two distinct cases: - genuine lost-ownership (rowcount 0) — correct to stop, and - a one-off transient DB error (write contention that escapes _execute_write's retry budget) — which returned False identically. A single transient blip therefore killed the lease for the rest of a multi-minute compression call, silently reintroducing the exact 300s-TTL < ~361s-call expiry wedge the PR set out to fix. Changes: - _CompressionLockLeaseRefresher._run now tolerates a bounded run of consecutive failures (_MAX_CONSECUTIVE_REFRESH_FAILURES = 3) before giving up the lease; a recovered tick resets the counter. Worst-case extra hold is cap * refresh_interval, still bounded by the acquirer's TTL. - Replace the two remaining silent `except Exception: pass` arms in the compression-failure-cooldown persist/clear helpers with debug logging, for parity with their sqlite3.Error sibling arms (a non-sqlite bug was invisible). - Document the join(timeout=1.0) quiesce bound in stop(). - Add 3 regression tests: single-blip tolerance, persistent-failure stop at the cap, and refresh-raising tolerance. --- agent/context_compressor.py | 8 +- agent/conversation_compression.py | 36 ++++- .../agent/test_compression_concurrent_fork.py | 152 ++++++++++++++++++ 3 files changed, 190 insertions(+), 6 deletions(-) diff --git a/agent/context_compressor.py b/agent/context_compressor.py index 5943eaa9c98..1cf3bb1e358 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -737,8 +737,8 @@ class ContextCompressor(ContextEngine): recorder(session_id, cooldown_until, error) except sqlite3.Error as exc: logger.debug("compression failure cooldown persist failed: %s", exc) - except Exception: - pass + except Exception as exc: + logger.debug("compression failure cooldown persist failed (non-sqlite): %s", exc) def _clear_compression_failure_cooldown(self) -> None: self._summary_failure_cooldown_until = 0.0 @@ -756,8 +756,8 @@ class ContextCompressor(ContextEngine): clearer(session_id) except sqlite3.Error as exc: logger.debug("compression failure cooldown clear failed: %s", exc) - except Exception: - pass + except Exception as exc: + logger.debug("compression failure cooldown clear failed (non-sqlite): %s", exc) def update_model( self, diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index b78e24b729c..74e9feda2e3 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -88,6 +88,14 @@ class _CompressionLockLeaseRefresher: if refresh_interval_seconds is None: refresh_interval_seconds = max(1.0, min(60.0, ttl_seconds / 2.0)) self._refresh_interval_seconds = max(0.1, float(refresh_interval_seconds)) + # Tolerate transient refresh failures for at most one lease's worth of + # time, so the give-up window is genuinely bounded by the TTL the + # acquirer set (a single blip recovers on the next tick; a persistent + # failure stops before the lease could outlive its TTL). Floor of 1 so a + # degenerate interval >= ttl still tolerates one blip. + self._max_consecutive_failures = max( + 1, int(self._ttl_seconds / self._refresh_interval_seconds) + ) self._stop = threading.Event() self._thread = threading.Thread( target=self._run, @@ -101,10 +109,25 @@ class _CompressionLockLeaseRefresher: def stop(self) -> None: self._stop.set() + # join() may time out while the refresher is mid-UPDATE; that's safe — + # it's a daemon thread, and a late refresh on an already-released lock + # matches rowcount 0 (a no-op). stop() returning does not guarantee the + # thread has fully quiesced, only that we've signalled it and waited + # briefly. if self._thread.is_alive() and threading.current_thread() is not self._thread: self._thread.join(timeout=1.0) def _run(self) -> None: + # A single falsy refresh must NOT permanently kill the lease: a + # transient DB blip (write contention escaping _execute_write's retry + # budget, a momentary "database is locked") returns False just like a + # genuine lost-ownership, but only the latter should stop the loop. + # Tolerate consecutive failures for at most one lease's worth of time + # (_max_consecutive_failures = ttl / interval), so a one-off blip + # recovers on the next tick while the total give-up window stays bounded + # by the TTL the acquirer set — the lock can never be held past its TTL + # by a stuck refresher. + consecutive_failures = 0 while not self._stop.wait(self._refresh_interval_seconds): try: refreshed = self._db.refresh_compression_lock( @@ -113,9 +136,18 @@ class _CompressionLockLeaseRefresher: ttl_seconds=self._ttl_seconds, ) except Exception as exc: - logger.debug("compression lock refresh failed: %s", exc) + logger.debug("compression lock refresh raised: %s", exc) refreshed = False - if not refreshed: + if refreshed: + consecutive_failures = 0 + continue + consecutive_failures += 1 + if consecutive_failures >= self._max_consecutive_failures: + logger.debug( + "compression lock refresh failed %d times in a row; " + "stopping lease refresher for session %s", + consecutive_failures, self._session_id, + ) break diff --git a/tests/agent/test_compression_concurrent_fork.py b/tests/agent/test_compression_concurrent_fork.py index d02f2845273..88d4f0911d2 100644 --- a/tests/agent/test_compression_concurrent_fork.py +++ b/tests/agent/test_compression_concurrent_fork.py @@ -461,3 +461,155 @@ def test_review_fork_disables_compression_to_prevent_stale_parent_fork(tmp_path: "False — this flag MUST be cleared on the review fork." ) db.close() + + +# ── Lease-refresher bounded-failure tolerance (salvage follow-up, #54465) ──── +# A single falsy refresh (transient DB blip) must NOT permanently kill the +# lease — only a *persistent* failure (genuine lost-ownership) should stop the +# refresher after a bounded number of consecutive failures. Without this, one +# escaped lock-contention error silently reintroduces the TTL-expiry wedge the +# PR set out to fix. + + +class _FlakyRefreshDB: + """A db whose refresh_compression_lock returns a scripted sequence.""" + + def __init__(self, results): + self._results = list(results) + self.calls = 0 + + def refresh_compression_lock(self, session_id, holder, ttl_seconds=300.0): + self.calls += 1 + if self._results: + return self._results.pop(0) + return True # steady-state success after the scripted prefix + + +def _no_sleep(refresher) -> None: + """Make the refresher loop iterate without real wall-clock sleeps. + + ``_stop.wait(interval)`` returns False (keep looping) instantly instead of + blocking for the (clamped) interval, so count-based tests stay fast and + deterministic — the loop's termination is driven by the failure cap / the + scripted db, not by timing. + """ + refresher._stop.wait = lambda _interval: False # type: ignore[assignment] + + +def test_lease_refresher_survives_single_transient_failure() -> None: + """One False (transient blip) followed by success must NOT stop the loop. + + Regression for the W1/W2 finding: the original ``if not refreshed: break`` + treated a one-off failure identically to genuine lost-ownership, killing + the lease on the first hiccup. + """ + from agent.conversation_compression import _CompressionLockLeaseRefresher + + # Script: success, FAILURE (blip), success, then stop the loop externally. + db = _FlakyRefreshDB([True, False, True]) + refresher = _CompressionLockLeaseRefresher( + db, "sess", "holder", ttl_seconds=10.0, refresh_interval_seconds=0.001 + ) + # Stop after exactly 4 ticks (3 scripted + 1 steady success), no real sleep. + refresher._stop.wait = lambda _i: db.calls >= 4 # type: ignore[assignment] + refresher._run() + + # The single False at call 2 must NOT have ended the loop — we keep going + # past it (calls reach >= 4), proving the blip was tolerated. + assert db.calls >= 4, ( + "Lease refresher stopped after a single transient failure — the " + "bounded-tolerance fix regressed (one blip must not kill the lease)." + ) + + +def test_lease_refresher_failure_window_is_bounded_by_ttl() -> None: + """Persistent failure stops within one lease's worth of time, not forever. + + The contract (not a magic count): the give-up window + ``cap * refresh_interval`` must be <= the TTL, so a stuck refresher can + never hold the lock past its TTL. We assert that relationship directly + rather than freezing a literal cap (behavior contract over snapshot). + """ + from agent.conversation_compression import _CompressionLockLeaseRefresher + + ttl, interval = 10.0, 2.0 # cap should be int(10/2) = 5 + db = _FlakyRefreshDB([False] * 50) # never recovers (lost ownership) + refresher = _CompressionLockLeaseRefresher( + db, "sess", "holder", ttl_seconds=ttl, refresh_interval_seconds=interval + ) + _no_sleep(refresher) + refresher._run() + + cap = refresher._max_consecutive_failures + assert cap == int(ttl / interval), "cap must derive from ttl/interval" + # Stops at the cap — not on the first failure, not forever. + assert db.calls == cap + # The invariant that makes the cap honest: total tolerance <= one TTL. + assert cap * interval <= ttl, ( + f"give-up window {cap * interval}s must not exceed the lease TTL {ttl}s" + ) + + +def test_lease_refresher_failure_cap_has_floor_of_one() -> None: + """A degenerate interval >= ttl still tolerates exactly one blip (floor 1).""" + from agent.conversation_compression import _CompressionLockLeaseRefresher + + db = _FlakyRefreshDB([False] * 10) + refresher = _CompressionLockLeaseRefresher( + db, "sess", "holder", ttl_seconds=1.0, refresh_interval_seconds=5.0 + ) + _no_sleep(refresher) + refresher._run() + assert refresher._max_consecutive_failures == 1 + assert db.calls == 1 + + +def test_lease_refresher_recovers_after_raise() -> None: + """A raise treated as a failure tick must RESET on a later success — the + exception arm gets the same blip-tolerance as a falsy return, not just a + 'doesn't crash' guarantee.""" + from agent.conversation_compression import _CompressionLockLeaseRefresher + + class _RaiseThenOKDB: + """Raise once, then succeed forever — the transient-blip analog.""" + + def __init__(self): + self.calls = 0 + + def refresh_compression_lock(self, *a, **k): + self.calls += 1 + if self.calls == 1: + raise RuntimeError("simulated DB hiccup") + return True + + db = _RaiseThenOKDB() + refresher = _CompressionLockLeaseRefresher( + db, "sess", "holder", ttl_seconds=10.0, refresh_interval_seconds=2.0 + ) + # Run a handful of ticks past the raise, then stop. + refresher._stop.wait = lambda _i: db.calls >= 4 # type: ignore[assignment] + refresher._run() # must not propagate the RuntimeError + # Survived the raise and kept refreshing — the counter reset on recovery. + assert db.calls >= 4 + + +def test_lease_refresher_stops_on_persistent_raise() -> None: + """A refresh that raises every tick is bounded by the same TTL-derived cap, + never propagates, and never loops forever.""" + from agent.conversation_compression import _CompressionLockLeaseRefresher + + class _AlwaysRaiseDB: + def __init__(self): + self.calls = 0 + + def refresh_compression_lock(self, *a, **k): + self.calls += 1 + raise RuntimeError("simulated DB hiccup") + + db = _AlwaysRaiseDB() + refresher = _CompressionLockLeaseRefresher( + db, "sess", "holder", ttl_seconds=10.0, refresh_interval_seconds=2.0 + ) + _no_sleep(refresher) + refresher._run() # must not propagate + assert db.calls == refresher._max_consecutive_failures