test(gateway): assert interleaving safety of concurrent offloaded DB calls

This commit is contained in:
yoniebans 2026-06-29 14:22:46 +02:00 committed by Teknium
parent 6735162531
commit d2ce2c852d

View file

@ -370,3 +370,33 @@ def test_offloaded_helpers_never_called_bare_on_loop():
"Offloaded sync helper called bare on the gateway loop — wrap in "
"await asyncio.to_thread(self.<helper>, ...):\n " + "\n ".join(violations)
)
# --------------------------------------------------------------------------
# Interleaving safety: offloading opens await points where coroutines can
# interleave against the same session rows. The gateway relies on SessionDB's
# atomic operations (compare-and-set, INSERT OR IGNORE) to stay single-winner.
# These pin that the defenses hold when driven concurrently through the facade.
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_concurrent_claim_handoff_single_winner(tmp_path):
db = AsyncSessionDB(hermes_state.SessionDB(db_path=tmp_path / "state.db"))
sid = "s-handoff"
await db.create_session(sid, "test")
await db.request_handoff(sid, "telegram")
results = await asyncio.gather(*(db.claim_handoff(sid) for _ in range(20)))
assert sum(results) == 1, f"exactly one claim must win, got {sum(results)}"
@pytest.mark.asyncio
async def test_concurrent_create_session_idempotent(tmp_path):
db = AsyncSessionDB(hermes_state.SessionDB(db_path=tmp_path / "state.db"))
sid = "s-create"
await asyncio.gather(*(db.create_session(sid, "test") for _ in range(20)))
rows = await db.list_sessions_rich(limit=100)
assert sum(1 for r in rows if r["id"] == sid) == 1