From 4fd8521e44e920fbf545b408ea8727423436cad4 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Fri, 29 May 2026 13:16:19 -0700 Subject: [PATCH] test(tui-gateway): isolate completion_queue in poller requeue test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit test_notification_poller_requeues_when_busy drained and reused the process-global process_registry.completion_queue, so a concurrent test in the same xdist worker could put/get on the shared singleton mid-run and empty the event the poller requeues — flaking 'assert not completion_queue.empty()' under parallel CI load only. Monkeypatch a fresh Queue onto the singleton for the test's duration so nothing external can interleave. The poller reads completion_queue by attribute at runtime, so the isolated queue is what it operates on. monkeypatch restores the original on teardown. Verified immune: 50/50 passes under a background thread hammering the global queue. --- tests/test_tui_gateway_server.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index 2631dab3787..4524fb88cb6 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -5114,6 +5114,8 @@ def test_notification_poller_skips_consumed(monkeypatch): def test_notification_poller_requeues_when_busy(monkeypatch): """When the agent is busy, the poller requeues the event.""" + import queue as _queue_mod + from tools.process_registry import process_registry emitted = [] @@ -5122,8 +5124,13 @@ def test_notification_poller_requeues_when_busy(monkeypatch): server._sessions["sid_busy"] = sess monkeypatch.setattr(server, "_emit", lambda *a, **kw: emitted.append(a)) - while not process_registry.completion_queue.empty(): - process_registry.completion_queue.get_nowait() + # Isolate the completion queue for the duration of this test. The poller + # reads process_registry.completion_queue by attribute at runtime, so a + # fresh Queue here means no concurrently-running test in the same xdist + # worker can put/get on the shared singleton mid-run and drain the event + # we expect to be requeued. monkeypatch restores the original on teardown. + isolated_queue: _queue_mod.Queue = _queue_mod.Queue() + monkeypatch.setattr(process_registry, "completion_queue", isolated_queue) process_registry._completion_consumed.discard("proc_busy_test") evt = { @@ -5133,7 +5140,7 @@ def test_notification_poller_requeues_when_busy(monkeypatch): "exit_code": 0, "output": "ok", } - process_registry.completion_queue.put(evt) + isolated_queue.put(evt) stop = threading.Event() stop.set() @@ -5146,10 +5153,8 @@ def test_notification_poller_requeues_when_busy(monkeypatch): assert len(status_calls) == 1 # Event was requeued (agent was busy, no turn triggered) - assert not process_registry.completion_queue.empty() - requeued = process_registry.completion_queue.get_nowait() + assert not isolated_queue.empty() + requeued = isolated_queue.get_nowait() assert requeued["session_id"] == "proc_busy_test" finally: server._sessions.pop("sid_busy", None) - while not process_registry.completion_queue.empty(): - process_registry.completion_queue.get_nowait()