diff --git a/tests/tools/test_notify_on_complete.py b/tests/tools/test_notify_on_complete.py index 8cf17bfbf..ff6f14922 100644 --- a/tests/tools/test_notify_on_complete.py +++ b/tests/tools/test_notify_on_complete.py @@ -120,6 +120,26 @@ class TestCompletionQueue: assert completion["exit_code"] == 1 assert "FAILED" in completion["output"] + def test_move_to_finished_idempotent_no_duplicate(self, registry): + """Calling _move_to_finished twice must NOT enqueue two notifications. + + Regression test: kill_process() and the reader thread can both call + _move_to_finished() for the same session, producing duplicate + [SYSTEM: Background process ...] messages. + """ + s = _make_session(notify_on_complete=True, output="done", exit_code=-15) + s.exited = True + s.exit_code = -15 + registry._running[s.id] = s + with patch.object(registry, "_write_checkpoint"): + registry._move_to_finished(s) # first call — should enqueue + s.exit_code = 143 # reader thread updates exit code + registry._move_to_finished(s) # second call — should be no-op + + assert registry.completion_queue.qsize() == 1 + completion = registry.completion_queue.get_nowait() + assert completion["exit_code"] == -15 # from the first (kill) call + def test_output_truncated_to_2000(self, registry): """Long output is truncated to last 2000 chars.""" long_output = "x" * 5000 diff --git a/tools/process_registry.py b/tools/process_registry.py index 7f55ae6db..39d3704b1 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -484,15 +484,21 @@ class ProcessRegistry: self._move_to_finished(session) def _move_to_finished(self, session: ProcessSession): - """Move a session from running to finished.""" + """Move a session from running to finished. + + Idempotent: if the session was already moved (e.g. kill_process raced + with the reader thread), the second call is a no-op — no duplicate + completion notification is enqueued. + """ with self._lock: - self._running.pop(session.id, None) + was_running = self._running.pop(session.id, None) is not None self._finished[session.id] = session self._write_checkpoint() - # If the caller requested agent notification, enqueue the completion - # so the CLI/gateway can auto-trigger a new agent turn. - if session.notify_on_complete: + # Only enqueue completion notification on the FIRST move. Without + # this guard, kill_process() and the reader thread can both call + # _move_to_finished(), producing duplicate [SYSTEM: ...] messages. + if was_running and session.notify_on_complete: from tools.ansi_strip import strip_ansi output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else "" self.completion_queue.put({