From be7c919bf9773bd2d6c0f2b090575fd8159a4a02 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 15 Jun 2026 07:08:24 -0700 Subject: [PATCH] fix(process): label background completion causes (#46659) Track why a background process finished and include that source in notify-on-complete messages so SIGTERM from process.kill, kill_all, backend loss, and ordinary exits are distinguishable. --- gateway/run.py | 19 +++++--- tests/test_tui_gateway_server.py | 2 +- tests/tools/test_notify_on_complete.py | 33 +++++++++++++- tests/tools/test_process_registry.py | 33 +++++++++++++- tools/process_registry.py | 60 ++++++++++++++++++++++---- 5 files changed, 128 insertions(+), 19 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index f469647be45..475320c65a7 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -12318,7 +12318,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if session.exited: # --- Agent-triggered completion: inject synthetic message --- # Skip if the agent already consumed the result via wait/poll/log - from tools.process_registry import process_registry as _pr_check + from tools.process_registry import format_process_notification, process_registry as _pr_check if agent_notify and not _pr_check.is_completion_consumed(session_id): from tools.ansi_strip import strip_ansi _raw = strip_ansi(session.output_buffer) if session.output_buffer else "" @@ -12334,12 +12334,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew _out = f"[… output truncated — showing last {len(_tail)} chars]\n{_tail}" else: _out = _raw - synth_text = ( - f"[IMPORTANT: Background process {session_id} completed " - f"(exit code {session.exit_code}).\n" - f"Command: {session.command}\n" - f"Output:\n{_out}]" - ) + synth_text = format_process_notification({ + "type": "completion", + "session_id": session_id, + "command": session.command, + "exit_code": session.exit_code, + "completion_reason": getattr(session, "completion_reason", "exited"), + "termination_source": getattr(session, "termination_source", ""), + "output": _out, + }) + if not synth_text: + break source = self._build_process_event_source({ "session_id": session_id, "session_key": session_key, diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index 4379d80aeb3..90a7f200255 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -6652,7 +6652,7 @@ def test_notification_poller_delivers_completion(monkeypatch): # Should have triggered an agent turn assert len(turns) == 1 - assert "[IMPORTANT: Background process proc_poller_test completed" in turns[0] + assert "[IMPORTANT: Background process proc_poller_test completed normally" in turns[0] finally: server._sessions.pop("sid_poll", None) while not process_registry.completion_queue.empty(): diff --git a/tests/tools/test_notify_on_complete.py b/tests/tools/test_notify_on_complete.py index 84bf5f1f6b5..5c2af09441d 100644 --- a/tests/tools/test_notify_on_complete.py +++ b/tests/tools/test_notify_on_complete.py @@ -12,7 +12,7 @@ import json import os import time import pytest -from unittest.mock import patch +from unittest.mock import MagicMock, patch from tools.process_registry import ( ProcessRegistry, @@ -99,6 +99,8 @@ class TestCompletionQueue: assert completion["session_id"] == s.id assert completion["command"] == "echo hello" assert completion["exit_code"] == 0 + assert completion["completion_reason"] == "exited" + assert completion["termination_source"] == "" assert "build succeeded" in completion["output"] def test_move_to_finished_nonzero_exit(self, registry): @@ -138,6 +140,35 @@ class TestCompletionQueue: completion = registry.completion_queue.get_nowait() assert completion["exit_code"] == -15 # from the first (kill) call + def test_kill_process_sets_completion_reason_and_source(self, registry): + s = _make_session(notify_on_complete=True, output="stopping") + s.process = MagicMock() + s.process.pid = 4242 + registry._running[s.id] = s + + class FakeProcess: + def __init__(self, pid): + self.pid = pid + + def children(self, recursive=False): + return [] + + def terminate(self): + pass + + import psutil as _psutil + + with patch.object(_psutil, "Process", side_effect=lambda pid: FakeProcess(pid)), \ + patch.object(registry, "_write_checkpoint"): + result = registry.kill_process(s.id) + + assert result["status"] == "killed" + assert result["completion_reason"] == "killed" + assert result["termination_source"] == "process.kill" + completion = registry.completion_queue.get_nowait() + assert completion["completion_reason"] == "killed" + assert completion["termination_source"] == "process.kill" + def test_output_truncated_to_2000(self, registry): """Long output is truncated to last 2000 chars.""" long_output = "x" * 5000 diff --git a/tests/tools/test_process_registry.py b/tests/tools/test_process_registry.py index da48183d462..967849a194a 100644 --- a/tests/tools/test_process_registry.py +++ b/tests/tools/test_process_registry.py @@ -1012,12 +1012,41 @@ def test_format_completion_event(): "output": "done", } result = format_process_notification(evt) - assert "[IMPORTANT: Background process proc_abc completed" in result + assert "[IMPORTANT: Background process proc_abc completed normally" in result assert "exit code 0" in result assert "Command: sleep 5" in result assert "Output:\ndone]" in result +def test_format_killed_completion_event_names_source_and_signal(): + evt = { + "type": "completion", + "session_id": "proc_killed", + "command": "sleep 5", + "exit_code": -15, + "completion_reason": "killed", + "termination_source": "process.kill", + "output": "", + } + result = format_process_notification(evt) + assert "proc_killed terminated by process.kill" in result + assert "exit code -15, SIGTERM" in result + + +def test_format_external_sigterm_does_not_claim_agent_kill(): + evt = { + "type": "completion", + "session_id": "proc_external", + "command": "sleep 5", + "exit_code": 143, + "output": "", + } + result = format_process_notification(evt) + assert "proc_external exited" in result + assert "terminated by" not in result + assert "exit code 143, SIGTERM" in result + + def test_format_watch_match_event(): evt = { "type": "watch_match", @@ -1087,7 +1116,7 @@ def test_drain_notifications_returns_pending_events(): results = process_registry.drain_notifications() assert len(results) == 2 assert results[0][0]["session_id"] == "proc_drain1" - assert "proc_drain1 completed" in results[0][1] + assert "proc_drain1 completed normally" in results[0][1] assert results[1][0]["session_id"] == "proc_drain2" assert "watch pattern" in results[1][1] finally: diff --git a/tools/process_registry.py b/tools/process_registry.py index 93266969983..6c3d61ce5f4 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -100,6 +100,8 @@ class ProcessSession: started_at: float = 0.0 # time.time() of spawn exited: bool = False # Whether the process has finished exit_code: Optional[int] = None # Exit code (None if still running) + completion_reason: str = "exited" # exited|killed|lost|failed_start|already_exited + termination_source: str = "" # process.kill|kill_all|backend_lost|failed_start output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS) max_output_chars: int = MAX_OUTPUT_CHARS detached: bool = False # True if recovered from crash (no pipe) @@ -720,10 +722,14 @@ class ProcessRegistry: session.exit_code = int(result.get("returncode", -1)) if session.exit_code == 0: session.exit_code = -1 + session.completion_reason = "failed_start" + session.termination_source = "failed_start" session.output_buffer = result.get("output", "").strip() except Exception as e: session.exited = True session.exit_code = -1 + session.completion_reason = "failed_start" + session.termination_source = "failed_start" session.output_buffer = f"Failed to start: {e}" if not session.exited: @@ -774,7 +780,9 @@ class ProcessRegistry: except Exception as e: logger.debug("Process wait timed out or failed: %s", e) session.exited = True - session.exit_code = session.process.returncode + if session.completion_reason != "killed": + session.exit_code = session.process.returncode + session.completion_reason = "exited" self._move_to_finished(session) def _env_poller_loop( @@ -820,6 +828,8 @@ class ProcessRegistry: except (ValueError, IndexError): session.exit_code = -1 session.exited = True + if session.completion_reason != "killed": + session.completion_reason = "exited" self._move_to_finished(session) return @@ -827,6 +837,8 @@ class ProcessRegistry: # Environment might be gone (sandbox reaped, etc.) session.exited = True session.exit_code = -1 + session.completion_reason = "lost" + session.termination_source = "backend_lost" self._move_to_finished(session) return @@ -858,7 +870,9 @@ class ProcessRegistry: except Exception as e: logger.debug("PTY wait timed out or failed: %s", e) session.exited = True - session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1 + if session.completion_reason != "killed": + session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1 + session.completion_reason = "exited" self._move_to_finished(session) def _move_to_finished(self, session: ProcessSession): @@ -886,6 +900,8 @@ class ProcessRegistry: "session_key": session.session_key, "command": session.command, "exit_code": session.exit_code, + "completion_reason": session.completion_reason, + "termination_source": session.termination_source, "output": output_tail, }) @@ -985,7 +1001,9 @@ class ProcessRegistry: if len(session.output_buffer) > session.max_output_chars: session.output_buffer = session.output_buffer[-session.max_output_chars:] session.exited = True - session.exit_code = rc + if session.completion_reason != "killed": + session.exit_code = rc + session.completion_reason = "exited" logger.info( "Reconciled session %s: direct child exited with code %s but reader " "was still blocked (orphaned pipe). Flipped to exited.", @@ -1018,6 +1036,8 @@ class ProcessRegistry: } if session.exited: result["exit_code"] = session.exit_code + result["completion_reason"] = session.completion_reason + result["termination_source"] = session.termination_source self._completion_consumed.add(session_id) if session.detached: result["detached"] = True @@ -1106,6 +1126,8 @@ class ProcessRegistry: result = { "status": "exited", "exit_code": session.exit_code, + "completion_reason": session.completion_reason, + "termination_source": session.termination_source, "output": strip_ansi(session.output_buffer[-2000:]), } if timeout_note: @@ -1137,7 +1159,7 @@ class ProcessRegistry: result["timeout_note"] = f"Waited {effective_timeout}s, process still running" return result - def kill_process(self, session_id: str) -> dict: + def kill_process(self, session_id: str, *, source: str = "process.kill") -> dict: """Kill a background process.""" session = self.get(session_id) if session is None: @@ -1201,9 +1223,16 @@ class ProcessRegistry: } session.exited = True session.exit_code = -15 # SIGTERM + session.completion_reason = "killed" + session.termination_source = source self._move_to_finished(session) self._write_checkpoint() - return {"status": "killed", "session_id": session.id} + return { + "status": "killed", + "session_id": session.id, + "completion_reason": session.completion_reason, + "termination_source": session.termination_source, + } except Exception as e: return {"status": "error", "error": str(e)} @@ -1347,7 +1376,7 @@ class ProcessRegistry: killed = 0 for session in targets: - result = self.kill_process(session.id) + result = self.kill_process(session.id, source="kill_all") if result.get("status") in {"killed", "already_exited"}: killed += 1 return killed @@ -1532,9 +1561,24 @@ def format_process_notification(evt: dict) -> "str | None": _exit = evt.get("exit_code", "?") _out = evt.get("output", "") + _reason = evt.get("completion_reason") or "exited" + _source = evt.get("termination_source") or "" + _signal = "" + if _exit in {-15, 143, "-15", "143"}: + _signal = ", SIGTERM" + if _reason == "killed": + _status = f"terminated by {_source or 'Hermes'}" + elif _reason == "lost": + _status = "marked lost because the process backend disappeared" + elif _reason == "failed_start": + _status = "failed to start" + elif _exit == 0: + _status = "completed normally" + else: + _status = "exited" return ( - f"[IMPORTANT: Background process {_sid} completed " - f"(exit code {_exit}).\n" + f"[IMPORTANT: Background process {_sid} {_status} " + f"(exit code {_exit}{_signal}).\n" f"Command: {_cmd}\n" f"Output:\n{_out}]" )