mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
test(gateway): de-flake concurrent-compression lock test with a barrier
test_concurrent_compressions_same_session_serialize relied on a time.sleep(0.25) inside the stubbed compressor to make the two threads overlap inside the per-session lock window. Under CI CPU starvation that sleep is insufficient: one thread can acquire -> compress -> rotate -> RELEASE the lock before the other reaches try_acquire, so both acquire on the shared session_id and both compress (the recurring 'Expected exactly one agent to compress, got 2' failure on shard test (1)). Replace the timing dependency with a threading.Barrier(2) wrapped around the shared db's try_acquire_compression_lock: both threads rendezvous immediately before the real (atomic) acquire, guaranteeing genuine simultaneous contention regardless of scheduling. The real lock logic is unchanged and still picks exactly one winner — this only fixes the test's overlap guarantee. Restored after join so the post-join lock-leak assertion hits the unwrapped method. Verified: 20/20 plain + 15/15 under all-core CPU stress (load avg ~4.6), where the old version flaked.
This commit is contained in:
parent
b99c6c4277
commit
400e6e43ca
1 changed files with 28 additions and 0 deletions
|
|
@ -146,6 +146,30 @@ def test_concurrent_compressions_same_session_serialize(tmp_path: Path) -> None:
|
|||
agent_a = _build_agent_with_db(db, shared_sid)
|
||||
agent_b = _build_agent_with_db(db, shared_sid)
|
||||
|
||||
# Force genuine simultaneous lock contention instead of relying on a
|
||||
# ``time.sleep`` inside the compressor stub to make the threads overlap.
|
||||
# Under CI CPU starvation that sleep is not enough: one thread could
|
||||
# acquire → compress → rotate → RELEASE the lock before the other even
|
||||
# reaches ``try_acquire``, so both would acquire on the shared id and
|
||||
# both would compress (the historical "got 2" flake). A two-party
|
||||
# barrier in front of the real acquire guarantees both threads are
|
||||
# contending for the lock at the same instant, which is exactly the
|
||||
# condition this test means to assert — with zero timing dependency.
|
||||
barrier = threading.Barrier(2, timeout=15)
|
||||
_real_acquire = db.try_acquire_compression_lock
|
||||
|
||||
def _barriered_acquire(*args, **kwargs):
|
||||
# Rendezvous both callers, then let the real (atomic) acquire decide
|
||||
# the single winner. Tolerate a broken barrier so a test-side timeout
|
||||
# never masquerades as a lock-logic failure.
|
||||
try:
|
||||
barrier.wait()
|
||||
except threading.BrokenBarrierError:
|
||||
pass
|
||||
return _real_acquire(*args, **kwargs)
|
||||
|
||||
db.try_acquire_compression_lock = _barriered_acquire
|
||||
|
||||
results: dict[str, list | None] = {"a": None, "b": None}
|
||||
errors: list[Exception] = []
|
||||
|
||||
|
|
@ -163,6 +187,10 @@ def test_concurrent_compressions_same_session_serialize(tmp_path: Path) -> None:
|
|||
t_a.join(timeout=15)
|
||||
t_b.join(timeout=15)
|
||||
|
||||
# Restore the real method so the post-join lock-leak assertion below
|
||||
# (and any future call) hits the unwrapped implementation.
|
||||
db.try_acquire_compression_lock = _real_acquire
|
||||
|
||||
assert not errors, f"Compression raised exceptions: {errors}"
|
||||
|
||||
# Count which agents actually compressed (returned fewer messages than input)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue