test(tui-gateway): isolate completion_queue in poller requeue test

test_notification_poller_requeues_when_busy drained and reused the
process-global process_registry.completion_queue, so a concurrent test
in the same xdist worker could put/get on the shared singleton mid-run
and empty the event the poller requeues — flaking 'assert not
completion_queue.empty()' under parallel CI load only.

Monkeypatch a fresh Queue onto the singleton for the test's duration so
nothing external can interleave. The poller reads completion_queue by
attribute at runtime, so the isolated queue is what it operates on.
monkeypatch restores the original on teardown. Verified immune: 50/50
passes under a background thread hammering the global queue.
This commit is contained in:
teknium1 2026-05-29 13:16:19 -07:00 committed by Teknium
parent edfdc77664
commit 4fd8521e44

View file

@ -5114,6 +5114,8 @@ def test_notification_poller_skips_consumed(monkeypatch):
def test_notification_poller_requeues_when_busy(monkeypatch):
"""When the agent is busy, the poller requeues the event."""
import queue as _queue_mod
from tools.process_registry import process_registry
emitted = []
@ -5122,8 +5124,13 @@ def test_notification_poller_requeues_when_busy(monkeypatch):
server._sessions["sid_busy"] = sess
monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a))
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
# Isolate the completion queue for the duration of this test. The poller
# reads process_registry.completion_queue by attribute at runtime, so a
# fresh Queue here means no concurrently-running test in the same xdist
# worker can put/get on the shared singleton mid-run and drain the event
# we expect to be requeued. monkeypatch restores the original on teardown.
isolated_queue: _queue_mod.Queue = _queue_mod.Queue()
monkeypatch.setattr(process_registry, "completion_queue", isolated_queue)
process_registry._completion_consumed.discard("proc_busy_test")
evt = {
@ -5133,7 +5140,7 @@ def test_notification_poller_requeues_when_busy(monkeypatch):
"exit_code": 0,
"output": "ok",
}
process_registry.completion_queue.put(evt)
isolated_queue.put(evt)
stop = threading.Event()
stop.set()
@ -5146,10 +5153,8 @@ def test_notification_poller_requeues_when_busy(monkeypatch):
assert len(status_calls) == 1
# Event was requeued (agent was busy, no turn triggered)
assert not process_registry.completion_queue.empty()
requeued = process_registry.completion_queue.get_nowait()
assert not isolated_queue.empty()
requeued = isolated_queue.get_nowait()
assert requeued["session_id"] == "proc_busy_test"
finally:
server._sessions.pop("sid_busy", None)
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()