fix: prevent duplicate completion notifications on process kill (#7124)

When kill_process() sends SIGTERM, both it and the reader thread race
to call _move_to_finished() — kill_process sets exit_code=-15 and
enqueues a notification, then the reader thread's process.wait()
returns with exit_code=143 (128+SIGTERM) and enqueues a second one.

Fix: make _move_to_finished() idempotent by tracking whether the
session was actually removed from _running. The second call sees it
was already moved and skips the completion_queue.put().

Adds regression test: test_move_to_finished_idempotent_no_duplicate
This commit is contained in:
Teknium 2026-04-10 03:52:16 -07:00 committed by GitHub
parent 00dd5cc491
commit c8e4dcf412
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 31 additions and 5 deletions

View file

@ -120,6 +120,26 @@ class TestCompletionQueue:
assert completion["exit_code"] == 1 assert completion["exit_code"] == 1
assert "FAILED" in completion["output"] assert "FAILED" in completion["output"]
def test_move_to_finished_idempotent_no_duplicate(self, registry):
"""Calling _move_to_finished twice must NOT enqueue two notifications.
Regression test: kill_process() and the reader thread can both call
_move_to_finished() for the same session, producing duplicate
[SYSTEM: Background process ...] messages.
"""
s = _make_session(notify_on_complete=True, output="done", exit_code=-15)
s.exited = True
s.exit_code = -15
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s) # first call — should enqueue
s.exit_code = 143 # reader thread updates exit code
registry._move_to_finished(s) # second call — should be no-op
assert registry.completion_queue.qsize() == 1
completion = registry.completion_queue.get_nowait()
assert completion["exit_code"] == -15 # from the first (kill) call
def test_output_truncated_to_2000(self, registry): def test_output_truncated_to_2000(self, registry):
"""Long output is truncated to last 2000 chars.""" """Long output is truncated to last 2000 chars."""
long_output = "x" * 5000 long_output = "x" * 5000

View file

@ -484,15 +484,21 @@ class ProcessRegistry:
self._move_to_finished(session) self._move_to_finished(session)
def _move_to_finished(self, session: ProcessSession): def _move_to_finished(self, session: ProcessSession):
"""Move a session from running to finished.""" """Move a session from running to finished.
Idempotent: if the session was already moved (e.g. kill_process raced
with the reader thread), the second call is a no-op no duplicate
completion notification is enqueued.
"""
with self._lock: with self._lock:
self._running.pop(session.id, None) was_running = self._running.pop(session.id, None) is not None
self._finished[session.id] = session self._finished[session.id] = session
self._write_checkpoint() self._write_checkpoint()
# If the caller requested agent notification, enqueue the completion # Only enqueue completion notification on the FIRST move. Without
# so the CLI/gateway can auto-trigger a new agent turn. # this guard, kill_process() and the reader thread can both call
if session.notify_on_complete: # _move_to_finished(), producing duplicate [SYSTEM: ...] messages.
if was_running and session.notify_on_complete:
from tools.ansi_strip import strip_ansi from tools.ansi_strip import strip_ansi
output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else "" output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
self.completion_queue.put({ self.completion_queue.put({