From 400e6e43cade2a5f5863c592f4d923d9468db565 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Mon, 8 Jun 2026 02:20:45 -0700 Subject: [PATCH] test(gateway): de-flake concurrent-compression lock test with a barrier MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_concurrent_compressions_same_session_serialize relied on a time.sleep(0.25) inside the stubbed compressor to make the two threads overlap inside the per-session lock window. Under CI CPU starvation that sleep is insufficient: one thread can acquire -> compress -> rotate -> RELEASE the lock before the other reaches try_acquire, so both acquire on the shared session_id and both compress (the recurring 'Expected exactly one agent to compress, got 2' failure on shard test (1)). Replace the timing dependency with a threading.Barrier(2) wrapped around the shared db's try_acquire_compression_lock: both threads rendezvous immediately before the real (atomic) acquire, guaranteeing genuine simultaneous contention regardless of scheduling. The real lock logic is unchanged and still picks exactly one winner — this only fixes the test's overlap guarantee. Restored after join so the post-join lock-leak assertion hits the unwrapped method. Verified: 20/20 plain + 15/15 under all-core CPU stress (load avg ~4.6), where the old version flaked. --- .../test_compression_concurrent_sessions.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/gateway/test_compression_concurrent_sessions.py b/tests/gateway/test_compression_concurrent_sessions.py index 5cb18b2e229..d6fd26deb35 100644 --- a/tests/gateway/test_compression_concurrent_sessions.py +++ b/tests/gateway/test_compression_concurrent_sessions.py @@ -146,6 +146,30 @@ def test_concurrent_compressions_same_session_serialize(tmp_path: Path) -> None: agent_a = _build_agent_with_db(db, shared_sid) agent_b = _build_agent_with_db(db, shared_sid) + # Force genuine simultaneous lock contention instead of relying on a + # ``time.sleep`` inside the compressor stub to make the threads overlap. + # Under CI CPU starvation that sleep is not enough: one thread could + # acquire → compress → rotate → RELEASE the lock before the other even + # reaches ``try_acquire``, so both would acquire on the shared id and + # both would compress (the historical "got 2" flake). A two-party + # barrier in front of the real acquire guarantees both threads are + # contending for the lock at the same instant, which is exactly the + # condition this test means to assert — with zero timing dependency. + barrier = threading.Barrier(2, timeout=15) + _real_acquire = db.try_acquire_compression_lock + + def _barriered_acquire(*args, **kwargs): + # Rendezvous both callers, then let the real (atomic) acquire decide + # the single winner. Tolerate a broken barrier so a test-side timeout + # never masquerades as a lock-logic failure. + try: + barrier.wait() + except threading.BrokenBarrierError: + pass + return _real_acquire(*args, **kwargs) + + db.try_acquire_compression_lock = _barriered_acquire + results: dict[str, list | None] = {"a": None, "b": None} errors: list[Exception] = [] @@ -163,6 +187,10 @@ def test_concurrent_compressions_same_session_serialize(tmp_path: Path) -> None: t_a.join(timeout=15) t_b.join(timeout=15) + # Restore the real method so the post-join lock-leak assertion below + # (and any future call) hits the unwrapped implementation. + db.try_acquire_compression_lock = _real_acquire + assert not errors, f"Compression raised exceptions: {errors}" # Count which agents actually compressed (returned fewer messages than input)