feat: notify_on_complete for background processes (#5779)

* feat: notify_on_complete for background processes

When terminal(background=true, notify_on_complete=true), the system
auto-triggers a new agent turn when the process exits — no polling needed.

Changes:
- ProcessSession: add notify_on_complete field
- ProcessRegistry: add completion_queue, populate on _move_to_finished()
- Terminal tool: add notify_on_complete parameter to schema + handler
- CLI: drain completion_queue after agent turn AND during idle loop
- Gateway: enhanced _run_process_watcher injects synthetic MessageEvent
  on completion, triggering a full agent turn
- Checkpoint persistence includes notify_on_complete for crash recovery
- code_execution_tool: block notify_on_complete in sandbox scripts
- 15 new tests covering queue mechanics, checkpoint round-trip, schema

* docs: update terminal tool descriptions for notify_on_complete

- background: remove 'ONLY for servers' language, describe both patterns
  (long-lived processes AND long-running tasks with notify_on_complete)
- notify_on_complete: more prescriptive about when to use it
- TERMINAL_TOOL_DESCRIPTION: remove 'Do NOT use background for builds'
  guidance that contradicted the new feature
This commit is contained in:
Teknium 2026-04-07 02:40:16 -07:00 committed by GitHub
parent 1c425f219e
commit e120d2afac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 398 additions and 10 deletions

43
cli.py
View file

@ -8134,6 +8134,25 @@ class HermesCLI:
# Periodic config watcher — auto-reload MCP on mcp_servers change
if not self._agent_running:
self._check_config_mcp_changes()
# Check for background process completion notifications
# while the agent is idle (user hasn't typed anything yet).
try:
from tools.process_registry import process_registry
if not process_registry.completion_queue.empty():
completion = process_registry.completion_queue.get_nowait()
_exit = completion.get("exit_code", "?")
_cmd = completion.get("command", "unknown")
_sid = completion.get("session_id", "unknown")
_out = completion.get("output", "")
_synth = (
f"[SYSTEM: Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)
self._pending_input.put(_synth)
except Exception:
pass
continue
if not user_input:
@ -8247,7 +8266,29 @@ class HermesCLI:
except Exception as e:
_cprint(f"{_DIM}Voice auto-restart failed: {e}{_RST}")
threading.Thread(target=_restart_recording, daemon=True).start()
# Drain process completion notifications — any background
# process that finished with notify_on_complete while the
# agent was running (or before) gets auto-injected as a
# new user message so the agent can react to it.
try:
from tools.process_registry import process_registry
while not process_registry.completion_queue.empty():
completion = process_registry.completion_queue.get_nowait()
_exit = completion.get("exit_code", "?")
_cmd = completion.get("command", "unknown")
_sid = completion.get("session_id", "unknown")
_out = completion.get("output", "")
_synth = (
f"[SYSTEM: Background process {_sid} completed "
f"(exit code {_exit}).\n"
f"Command: {_cmd}\n"
f"Output:\n{_out}]"
)
self._pending_input.put(_synth)
except Exception:
pass # Non-fatal — don't break the main loop
except Exception as e:
print(f"Error: {e}")

View file

@ -6048,12 +6048,13 @@ class GatewayRunner:
platform_name = watcher.get("platform", "")
chat_id = watcher.get("chat_id", "")
thread_id = watcher.get("thread_id", "")
agent_notify = watcher.get("notify_on_complete", False)
notify_mode = self._load_background_notifications_mode()
logger.debug("Process watcher started: %s (every %ss, notify=%s)",
session_id, interval, notify_mode)
logger.debug("Process watcher started: %s (every %ss, notify=%s, agent_notify=%s)",
session_id, interval, notify_mode, agent_notify)
if notify_mode == "off":
if notify_mode == "off" and not agent_notify:
# Still wait for the process to exit so we can log it, but don't
# push any messages to the user.
while True:
@ -6077,6 +6078,47 @@ class GatewayRunner:
last_output_len = current_output_len
if session.exited:
# --- Agent-triggered completion: inject synthetic message ---
if agent_notify:
from tools.ansi_strip import strip_ansi
_out = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
synth_text = (
f"[SYSTEM: Background process {session_id} completed "
f"(exit code {session.exit_code}).\n"
f"Command: {session.command}\n"
f"Output:\n{_out}]"
)
adapter = None
for p, a in self.adapters.items():
if p.value == platform_name:
adapter = a
break
if adapter and chat_id:
try:
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource
from gateway.config import Platform
_platform_enum = Platform(platform_name)
_source = SessionSource(
platform=_platform_enum,
chat_id=chat_id,
thread_id=thread_id or None,
)
synth_event = MessageEvent(
text=synth_text,
message_type=MessageType.TEXT,
source=_source,
)
logger.info(
"Process %s finished — injecting agent notification for session %s",
session_id, session_key,
)
await adapter.handle_message(synth_event)
except Exception as e:
logger.error("Agent notify injection error: %s", e)
break
# --- Normal text-only notification ---
# Decide whether to notify based on mode
should_notify = (
notify_mode in ("all", "result")
@ -6101,8 +6143,9 @@ class GatewayRunner:
logger.error("Watcher delivery error: %s", e)
break
elif has_new_output and notify_mode == "all":
elif has_new_output and notify_mode == "all" and not agent_notify:
# New output available -- deliver status update (only in "all" mode)
# Skip periodic updates for agent_notify watchers (they only care about completion)
new_output = session.output_buffer[-500:] if session.output_buffer else ""
message_text = (
f"[Background process {session_id} is still running~ "

View file

@ -0,0 +1,247 @@
"""Tests for notify_on_complete background process feature.
Covers:
- ProcessSession.notify_on_complete field
- ProcessRegistry.completion_queue population on _move_to_finished()
- Checkpoint persistence of notify_on_complete
- Terminal tool schema includes notify_on_complete
- Terminal tool handler passes notify_on_complete through
"""
import json
import os
import queue
import time
import pytest
from pathlib import Path
from unittest.mock import MagicMock, patch
from tools.process_registry import (
ProcessRegistry,
ProcessSession,
)
@pytest.fixture()
def registry():
"""Create a fresh ProcessRegistry."""
return ProcessRegistry()
def _make_session(
sid="proc_test_notify",
command="echo hello",
task_id="t1",
exited=False,
exit_code=None,
output="",
notify_on_complete=False,
) -> ProcessSession:
s = ProcessSession(
id=sid,
command=command,
task_id=task_id,
started_at=time.time(),
exited=exited,
exit_code=exit_code,
output_buffer=output,
notify_on_complete=notify_on_complete,
)
return s
# =========================================================================
# ProcessSession field
# =========================================================================
class TestProcessSessionField:
def test_default_false(self):
s = ProcessSession(id="proc_1", command="echo hi")
assert s.notify_on_complete is False
def test_set_true(self):
s = ProcessSession(id="proc_1", command="echo hi", notify_on_complete=True)
assert s.notify_on_complete is True
# =========================================================================
# Completion queue
# =========================================================================
class TestCompletionQueue:
def test_queue_exists(self, registry):
assert hasattr(registry, "completion_queue")
assert registry.completion_queue.empty()
def test_move_to_finished_no_notify(self, registry):
"""Processes without notify_on_complete don't enqueue."""
s = _make_session(notify_on_complete=False, output="done")
s.exited = True
s.exit_code = 0
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
assert registry.completion_queue.empty()
def test_move_to_finished_with_notify(self, registry):
"""Processes with notify_on_complete push to queue."""
s = _make_session(
notify_on_complete=True,
output="build succeeded",
exit_code=0,
)
s.exited = True
s.exit_code = 0
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
assert not registry.completion_queue.empty()
completion = registry.completion_queue.get_nowait()
assert completion["session_id"] == s.id
assert completion["command"] == "echo hello"
assert completion["exit_code"] == 0
assert "build succeeded" in completion["output"]
def test_move_to_finished_nonzero_exit(self, registry):
"""Nonzero exit codes are captured correctly."""
s = _make_session(
notify_on_complete=True,
output="FAILED",
exit_code=1,
)
s.exited = True
s.exit_code = 1
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
completion = registry.completion_queue.get_nowait()
assert completion["exit_code"] == 1
assert "FAILED" in completion["output"]
def test_output_truncated_to_2000(self, registry):
"""Long output is truncated to last 2000 chars."""
long_output = "x" * 5000
s = _make_session(
notify_on_complete=True,
output=long_output,
)
s.exited = True
s.exit_code = 0
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
completion = registry.completion_queue.get_nowait()
assert len(completion["output"]) == 2000
def test_multiple_completions_queued(self, registry):
"""Multiple notify processes all push to the same queue."""
for i in range(3):
s = _make_session(
sid=f"proc_{i}",
notify_on_complete=True,
output=f"output_{i}",
)
s.exited = True
s.exit_code = 0
registry._running[s.id] = s
with patch.object(registry, "_write_checkpoint"):
registry._move_to_finished(s)
completions = []
while not registry.completion_queue.empty():
completions.append(registry.completion_queue.get_nowait())
assert len(completions) == 3
ids = {c["session_id"] for c in completions}
assert ids == {"proc_0", "proc_1", "proc_2"}
# =========================================================================
# Checkpoint persistence
# =========================================================================
class TestCheckpointNotify:
def test_checkpoint_includes_notify(self, registry, tmp_path):
with patch("tools.process_registry.CHECKPOINT_PATH", tmp_path / "procs.json"):
s = _make_session(notify_on_complete=True)
registry._running[s.id] = s
registry._write_checkpoint()
data = json.loads((tmp_path / "procs.json").read_text())
assert len(data) == 1
assert data[0]["notify_on_complete"] is True
def test_checkpoint_without_notify(self, registry, tmp_path):
with patch("tools.process_registry.CHECKPOINT_PATH", tmp_path / "procs.json"):
s = _make_session(notify_on_complete=False)
registry._running[s.id] = s
registry._write_checkpoint()
data = json.loads((tmp_path / "procs.json").read_text())
assert data[0]["notify_on_complete"] is False
def test_recover_preserves_notify(self, registry, tmp_path):
checkpoint = tmp_path / "procs.json"
checkpoint.write_text(json.dumps([{
"session_id": "proc_live",
"command": "sleep 999",
"pid": os.getpid(),
"task_id": "t1",
"notify_on_complete": True,
}]))
with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint):
recovered = registry.recover_from_checkpoint()
assert recovered == 1
s = registry.get("proc_live")
assert s.notify_on_complete is True
def test_recover_defaults_false(self, registry, tmp_path):
"""Old checkpoint entries without the field default to False."""
checkpoint = tmp_path / "procs.json"
checkpoint.write_text(json.dumps([{
"session_id": "proc_live",
"command": "sleep 999",
"pid": os.getpid(),
"task_id": "t1",
}]))
with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint):
recovered = registry.recover_from_checkpoint()
assert recovered == 1
s = registry.get("proc_live")
assert s.notify_on_complete is False
# =========================================================================
# Terminal tool schema
# =========================================================================
class TestTerminalSchema:
def test_schema_has_notify_on_complete(self):
from tools.terminal_tool import TERMINAL_SCHEMA
props = TERMINAL_SCHEMA["parameters"]["properties"]
assert "notify_on_complete" in props
assert props["notify_on_complete"]["type"] == "boolean"
assert props["notify_on_complete"]["default"] is False
def test_handler_passes_notify(self):
"""_handle_terminal passes notify_on_complete to terminal_tool."""
from tools.terminal_tool import _handle_terminal
with patch("tools.terminal_tool.terminal_tool", return_value='{"ok":true}') as mock_tt:
_handle_terminal(
{"command": "echo hi", "background": True, "notify_on_complete": True},
task_id="t1",
)
_, kwargs = mock_tt.call_args
assert kwargs["notify_on_complete"] is True
# =========================================================================
# Code execution blocked params
# =========================================================================
class TestCodeExecutionBlocked:
def test_notify_on_complete_blocked_in_sandbox(self):
from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS
assert "notify_on_complete" in _TERMINAL_BLOCKED_PARAMS

View file

@ -300,7 +300,7 @@ def _call(tool_name, args):
# ---------------------------------------------------------------------------
# Terminal parameters that must not be used from ephemeral sandbox scripts
_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty"}
_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete"}
def _rpc_server_loop(

View file

@ -81,6 +81,7 @@ class ProcessSession:
watcher_chat_id: str = ""
watcher_thread_id: str = ""
watcher_interval: int = 0 # 0 = no watcher configured
notify_on_complete: bool = False # Queue agent notification on exit
_lock: threading.Lock = field(default_factory=threading.Lock)
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
@ -112,6 +113,12 @@ class ProcessRegistry:
# Side-channel for check_interval watchers (gateway reads after agent run)
self.pending_watchers: List[Dict[str, Any]] = []
# Completion notifications — processes with notify_on_complete push here
# on exit. CLI process_loop and gateway drain this after each agent turn
# to auto-trigger a new agent turn with the process results.
import queue as _queue_mod
self.completion_queue: _queue_mod.Queue = _queue_mod.Queue()
@staticmethod
def _clean_shell_noise(text: str) -> str:
"""Strip shell startup warnings from the beginning of output."""
@ -415,6 +422,18 @@ class ProcessRegistry:
self._finished[session.id] = session
self._write_checkpoint()
# If the caller requested agent notification, enqueue the completion
# so the CLI/gateway can auto-trigger a new agent turn.
if session.notify_on_complete:
from tools.ansi_strip import strip_ansi
output_tail = strip_ansi(session.output_buffer[-2000:]) if session.output_buffer else ""
self.completion_queue.put({
"session_id": session.id,
"command": session.command,
"exit_code": session.exit_code,
"output": output_tail,
})
# ----- Query Methods -----
def get(self, session_id: str) -> Optional[ProcessSession]:
@ -721,6 +740,7 @@ class ProcessRegistry:
"watcher_chat_id": s.watcher_chat_id,
"watcher_thread_id": s.watcher_thread_id,
"watcher_interval": s.watcher_interval,
"notify_on_complete": s.notify_on_complete,
})
# Atomic write to avoid corruption on crash
@ -771,6 +791,7 @@ class ProcessRegistry:
watcher_chat_id=entry.get("watcher_chat_id", ""),
watcher_thread_id=entry.get("watcher_thread_id", ""),
watcher_interval=entry.get("watcher_interval", 0),
notify_on_complete=entry.get("notify_on_complete", False),
)
with self._lock:
self._running[session.id] = session

View file

@ -421,9 +421,11 @@ Do NOT use sed/awk to edit files — use patch instead.
Do NOT use echo/cat heredoc to create files use write_file instead.
Reserve terminal for: builds, installs, git, processes, scripts, network, package managers, and anything that needs a shell.
Foreground (default): Commands return INSTANTLY when done, even if the timeout is high. Set timeout=300 for long builds/scripts you'll still get the result in seconds if it's fast. Prefer foreground for everything that finishes.
Background: ONLY for long-running servers, watchers, or processes that never exit. Set background=true to get a session_id, then use process(action="wait") to block until done it returns instantly on completion, same as foreground. Use process(action="poll") only when you need a progress check without blocking.
Do NOT use background for scripts, builds, or installs foreground with a generous timeout is always better (fewer tool calls, instant results).
Foreground (default): Commands return INSTANTLY when done, even if the timeout is high. Set timeout=300 for long builds/scripts you'll still get the result in seconds if it's fast. Prefer foreground for short commands.
Background: Set background=true to get a session_id. Two patterns:
(1) Long-lived processes that never exit (servers, watchers).
(2) Long-running tasks with notify_on_complete=true you can keep working on other things and the system auto-notifies you when the task finishes. Great for test suites, builds, deployments, or anything that takes more than a minute.
Use process(action="poll") for progress checks, process(action="wait") to block until done.
Working directory: Use 'workdir' for per-command cwd.
PTY mode: Set pty=true for interactive CLI tools (Codex, Claude Code, Python REPL).
@ -1009,6 +1011,7 @@ def terminal_tool(
workdir: Optional[str] = None,
check_interval: Optional[int] = None,
pty: bool = False,
notify_on_complete: bool = False,
) -> str:
"""
Execute a command in the configured terminal environment.
@ -1022,6 +1025,7 @@ def terminal_tool(
workdir: Working directory for this command (optional, uses session cwd if not set)
check_interval: Seconds between auto-checks for background processes (gateway only, min 30)
pty: If True, use pseudo-terminal for interactive CLI tools (local backend only)
notify_on_complete: If True and background=True, auto-notify the agent when the process exits
Returns:
str: JSON string with output, exit_code, and error fields
@ -1254,6 +1258,32 @@ def terminal_tool(
f"configured limit of {max_timeout}s"
)
# Mark for agent notification on completion
if notify_on_complete and background:
proc_session.notify_on_complete = True
result_data["notify_on_complete"] = True
# In gateway mode, auto-register a fast watcher so the
# gateway can detect completion and trigger a new agent
# turn. CLI mode uses the completion_queue directly.
_gw_platform = os.getenv("HERMES_SESSION_PLATFORM", "")
if _gw_platform and not check_interval:
_gw_chat_id = os.getenv("HERMES_SESSION_CHAT_ID", "")
_gw_thread_id = os.getenv("HERMES_SESSION_THREAD_ID", "")
proc_session.watcher_platform = _gw_platform
proc_session.watcher_chat_id = _gw_chat_id
proc_session.watcher_thread_id = _gw_thread_id
proc_session.watcher_interval = 5
process_registry.pending_watchers.append({
"session_id": proc_session.id,
"check_interval": 5,
"session_key": session_key,
"platform": _gw_platform,
"chat_id": _gw_chat_id,
"thread_id": _gw_thread_id,
"notify_on_complete": True,
})
# Register check_interval watcher (gateway picks this up after agent run)
if check_interval and background:
effective_interval = max(30, check_interval)
@ -1550,7 +1580,7 @@ TERMINAL_SCHEMA = {
},
"background": {
"type": "boolean",
"description": "ONLY for servers/watchers that never exit. For scripts, builds, installs — use foreground with timeout instead (it returns instantly when done).",
"description": "Run the command in the background. Two patterns: (1) Long-lived processes that never exit (servers, watchers). (2) Long-running tasks paired with notify_on_complete=true — you can keep working and get notified when the task finishes. For short commands, prefer foreground with a generous timeout instead.",
"default": False
},
"timeout": {
@ -1571,6 +1601,11 @@ TERMINAL_SCHEMA = {
"type": "boolean",
"description": "Run in pseudo-terminal (PTY) mode for interactive CLI tools like Codex, Claude Code, or Python REPL. Only works with local and SSH backends. Default: false.",
"default": False
},
"notify_on_complete": {
"type": "boolean",
"description": "When true (and background=true), you'll be automatically notified when the process finishes — no polling needed. Use this for tasks that take a while (tests, builds, deployments) so you can keep working on other things in the meantime.",
"default": False
}
},
"required": ["command"]
@ -1587,6 +1622,7 @@ def _handle_terminal(args, **kw):
workdir=args.get("workdir"),
check_interval=args.get("check_interval"),
pty=args.get("pty", False),
notify_on_complete=args.get("notify_on_complete", False),
)