fix(process): label background completion causes (#46659)

Track why a background process finished and include that source in notify-on-complete messages so SIGTERM from process.kill, kill_all, backend loss, and ordinary exits are distinguishable.
This commit is contained in:
Teknium 2026-06-15 07:08:24 -07:00 committed by GitHub
parent 733472952a
commit be7c919bf9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 128 additions and 19 deletions

View file

@ -12318,7 +12318,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if session.exited:
# --- Agent-triggered completion: inject synthetic message ---
# Skip if the agent already consumed the result via wait/poll/log
from tools.process_registry import process_registry as _pr_check
from tools.process_registry import format_process_notification, process_registry as _pr_check
if agent_notify and not _pr_check.is_completion_consumed(session_id):
from tools.ansi_strip import strip_ansi
_raw = strip_ansi(session.output_buffer) if session.output_buffer else ""
@ -12334,12 +12334,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_out = f"[… output truncated — showing last {len(_tail)} chars]\n{_tail}"
else:
_out = _raw
synth_text = (
f"[IMPORTANT: Background process {session_id} completed "
f"(exit code {session.exit_code}).\n"
f"Command: {session.command}\n"
f"Output:\n{_out}]"
)
synth_text = format_process_notification({
"type": "completion",
"session_id": session_id,
"command": session.command,
"exit_code": session.exit_code,
"completion_reason": getattr(session, "completion_reason", "exited"),
"termination_source": getattr(session, "termination_source", ""),
"output": _out,
})
if not synth_text:
break
source = self._build_process_event_source({
"session_id": session_id,
"session_key": session_key,

View file

@ -6652,7 +6652,7 @@ def test_notification_poller_delivers_completion(monkeypatch):
# Should have triggered an agent turn
assert len(turns) == 1
assert "[IMPORTANT: Background process proc_poller_test completed" in turns[0]
assert "[IMPORTANT: Background process proc_poller_test completed normally" in turns[0]
finally:
server._sessions.pop("sid_poll", None)
while not process_registry.completion_queue.empty():

View file

@ -12,7 +12,7 @@ import json
import os
import time
import pytest
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from tools.process_registry import (
ProcessRegistry,
@ -99,6 +99,8 @@ class TestCompletionQueue:
assert completion["session_id"] == s.id
assert completion["command"] == "echo hello"
assert completion["exit_code"] == 0
assert completion["completion_reason"] == "exited"
assert completion["termination_source"] == ""
assert "build succeeded" in completion["output"]
def test_move_to_finished_nonzero_exit(self, registry):
@ -138,6 +140,35 @@ class TestCompletionQueue:
completion = registry.completion_queue.get_nowait()
assert completion["exit_code"] == -15 # from the first (kill) call
def test_kill_process_sets_completion_reason_and_source(self, registry):
s = _make_session(notify_on_complete=True, output="stopping")
s.process = MagicMock()
s.process.pid = 4242
registry._running[s.id] = s
class FakeProcess:
def __init__(self, pid):
self.pid = pid
def children(self, recursive=False):
return []
def terminate(self):
pass
import psutil as _psutil
with patch.object(_psutil, "Process", side_effect=lambda pid: FakeProcess(pid)), \
patch.object(registry, "_write_checkpoint"):
result = registry.kill_process(s.id)
assert result["status"] == "killed"
assert result["completion_reason"] == "killed"
assert result["termination_source"] == "process.kill"
completion = registry.completion_queue.get_nowait()
assert completion["completion_reason"] == "killed"
assert completion["termination_source"] == "process.kill"
def test_output_truncated_to_2000(self, registry):
"""Long output is truncated to last 2000 chars."""
long_output = "x" * 5000

View file

@ -1012,12 +1012,41 @@ def test_format_completion_event():
"output": "done",
}
result = format_process_notification(evt)
assert "[IMPORTANT: Background process proc_abc completed" in result
assert "[IMPORTANT: Background process proc_abc completed normally" in result
assert "exit code 0" in result
assert "Command: sleep 5" in result
assert "Output:\ndone]" in result
def test_format_killed_completion_event_names_source_and_signal():
evt = {
"type": "completion",
"session_id": "proc_killed",
"command": "sleep 5",
"exit_code": -15,
"completion_reason": "killed",
"termination_source": "process.kill",
"output": "",
}
result = format_process_notification(evt)
assert "proc_killed terminated by process.kill" in result
assert "exit code -15, SIGTERM" in result
def test_format_external_sigterm_does_not_claim_agent_kill():
evt = {
"type": "completion",
"session_id": "proc_external",
"command": "sleep 5",
"exit_code": 143,
"output": "",
}
result = format_process_notification(evt)
assert "proc_external exited" in result
assert "terminated by" not in result
assert "exit code 143, SIGTERM" in result
def test_format_watch_match_event():
evt = {
"type": "watch_match",
@ -1087,7 +1116,7 @@ def test_drain_notifications_returns_pending_events():
results = process_registry.drain_notifications()
assert len(results) == 2
assert results[0][0]["session_id"] == "proc_drain1"
assert "proc_drain1 completed" in results[0][1]
assert "proc_drain1 completed normally" in results[0][1]
assert results[1][0]["session_id"] == "proc_drain2"
assert "watch pattern" in results[1][1]
finally:

View file

@ -100,6 +100,8 @@ class ProcessSession:
started_at: float = 0.0 # time.time() of spawn
exited: bool = False # Whether the process has finished
exit_code: Optional[int] = None # Exit code (None if still running)
completion_reason: str = "exited" # exited|killed|lost|failed_start|already_exited
termination_source: str = "" # process.kill|kill_all|backend_lost|failed_start
output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS)
max_output_chars: int = MAX_OUTPUT_CHARS
detached: bool = False # True if recovered from crash (no pipe)
@ -720,10 +722,14 @@ class ProcessRegistry:
session.exit_code = int(result.get("returncode", -1))
if session.exit_code == 0:
session.exit_code = -1
session.completion_reason = "failed_start"
session.termination_source = "failed_start"
session.output_buffer = result.get("output", "").strip()
except Exception as e:
session.exited = True
session.exit_code = -1
session.completion_reason = "failed_start"
session.termination_source = "failed_start"
session.output_buffer = f"Failed to start: {e}"
if not session.exited:
@ -774,7 +780,9 @@ class ProcessRegistry:
except Exception as e:
logger.debug("Process wait timed out or failed: %s", e)
session.exited = True
session.exit_code = session.process.returncode
if session.completion_reason != "killed":
session.exit_code = session.process.returncode
session.completion_reason = "exited"
self._move_to_finished(session)
def _env_poller_loop(
@ -820,6 +828,8 @@ class ProcessRegistry:
except (ValueError, IndexError):
session.exit_code = -1
session.exited = True
if session.completion_reason != "killed":
session.completion_reason = "exited"
self._move_to_finished(session)
return
@ -827,6 +837,8 @@ class ProcessRegistry:
# Environment might be gone (sandbox reaped, etc.)
session.exited = True
session.exit_code = -1
session.completion_reason = "lost"
session.termination_source = "backend_lost"
self._move_to_finished(session)
return
@ -858,7 +870,9 @@ class ProcessRegistry:
except Exception as e:
logger.debug("PTY wait timed out or failed: %s", e)
session.exited = True
session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1
if session.completion_reason != "killed":
session.exit_code = pty.exitstatus if hasattr(pty, 'exitstatus') else -1
session.completion_reason = "exited"
self._move_to_finished(session)
def _move_to_finished(self, session: ProcessSession):
@ -886,6 +900,8 @@ class ProcessRegistry:
"session_key": session.session_key,
"command": session.command,
"exit_code": session.exit_code,
"completion_reason": session.completion_reason,
"termination_source": session.termination_source,
"output": output_tail,
})
@ -985,7 +1001,9 @@ class ProcessRegistry:
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
session.exited = True
session.exit_code = rc
if session.completion_reason != "killed":
session.exit_code = rc
session.completion_reason = "exited"
logger.info(
"Reconciled session %s: direct child exited with code %s but reader "
"was still blocked (orphaned pipe). Flipped to exited.",
@ -1018,6 +1036,8 @@ class ProcessRegistry:
}
if session.exited:
result["exit_code"] = session.exit_code
result["completion_reason"] = session.completion_reason
result["termination_source"] = session.termination_source
self._completion_consumed.add(session_id)
if session.detached:
result["detached"] = True
@ -1106,6 +1126,8 @@ class ProcessRegistry:
result = {
"status": "exited",
"exit_code": session.exit_code,
"completion_reason": session.completion_reason,
"termination_source": session.termination_source,
"output": strip_ansi(session.output_buffer[-2000:]),
}
if timeout_note:
@ -1137,7 +1159,7 @@ class ProcessRegistry:
result["timeout_note"] = f"Waited {effective_timeout}s, process still running"
return result
def kill_process(self, session_id: str) -> dict:
def kill_process(self, session_id: str, *, source: str = "process.kill") -> dict:
"""Kill a background process."""
session = self.get(session_id)
if session is None:
@ -1201,9 +1223,16 @@ class ProcessRegistry:
}
session.exited = True
session.exit_code = -15 # SIGTERM
session.completion_reason = "killed"
session.termination_source = source
self._move_to_finished(session)
self._write_checkpoint()
return {"status": "killed", "session_id": session.id}
return {
"status": "killed",
"session_id": session.id,
"completion_reason": session.completion_reason,
"termination_source": session.termination_source,
}
except Exception as e:
return {"status": "error", "error": str(e)}
@ -1347,7 +1376,7 @@ class ProcessRegistry:
killed = 0
for session in targets:
result = self.kill_process(session.id)
result = self.kill_process(session.id, source="kill_all")
if result.get("status") in {"killed", "already_exited"}:
killed += 1
return killed
@ -1532,9 +1561,24 @@ def format_process_notification(evt: dict) -> "str | None":
_exit = evt.get("exit_code", "?")
_out = evt.get("output", "")
_reason = evt.get("completion_reason") or "exited"
_source = evt.get("termination_source") or ""
_signal = ""
if _exit in {-15, 143, "-15", "143"}:
_signal = ", SIGTERM"
if _reason == "killed":
_status = f"terminated by {_source or 'Hermes'}"
elif _reason == "lost":
_status = "marked lost because the process backend disappeared"
elif _reason == "failed_start":
_status = "failed to start"
elif _exit == 0:
_status = "completed normally"
else:
_status = "exited"
return (
f"[IMPORTANT: Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"[IMPORTANT: Background process {_sid} {_status} "
f"(exit code {_exit}{_signal}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)