mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): drain-aware hermes update + faster still-working pings (#14736)
cmd_update no longer SIGKILLs in-flight agent runs, and users get 'still working' status every 3 min instead of 10. Two long-standing sources of '@user — agent gives up mid-task' reports on Telegram and other gateways. Drain-aware update: - New helper hermes_cli.gateway._graceful_restart_via_sigusr1(pid, drain_timeout) sends SIGUSR1 to the gateway and polls os.kill(pid, 0) until the process exits or the budget expires. - cmd_update's systemd loop now reads MainPID via 'systemctl show --property=MainPID --value' and tries the graceful path first. The gateway's existing SIGUSR1 handler -> request_restart(via_service= True) -> drain -> exit(75) is wired in gateway/run.py and is respawned by systemd's Restart=on-failure (and the explicit RestartForceExitStatus=75 on newer units). - Falls back to 'systemctl restart' when MainPID is unknown, the drain budget elapses, or the unit doesn't respawn after exit (older units missing Restart=on-failure). Old install behavior preserved. - Drain budget = max(restart_drain_timeout, 30s) + 15s margin so the drain loop in run_agent + final exit have room before fallback fires. Composes with #14728's tool-subprocess reaping. Notification interval: - agent.gateway_notify_interval default 600 -> 180. - HERMES_AGENT_NOTIFY_INTERVAL env-var fallback in gateway/run.py matched. - 9-minute weak-model spinning runs now ping at 3 min and 6 min instead of 27 seconds before completion, removing the 'is the bot dead?' reflex that drives gateway-restart cycles. Tests: - Two new tests in tests/hermes_cli/test_update_gateway_restart.py: one asserts SIGUSR1 is sent and 'systemctl restart' is NOT called when MainPID is known and the helper succeeds; one asserts the fallback fires when the helper returns False. - E2E: spawned detached bash processes confirm the helper returns True on SIGUSR1-handling exit (~0.5s) and False on SIGUSR1-ignoring processes (timeout). Verified non-existent PID and pid=0 edge cases. - 41/41 in test_update_gateway_restart.py (was 39, +2 new). - 154/154 in shutdown-related suites including #14728's new tests. Reported by @GeoffWellman and @ANT_1515 on X.
This commit is contained in:
parent
165b2e481a
commit
97b9b3d6a6
5 changed files with 339 additions and 49 deletions
|
|
@ -10373,9 +10373,9 @@ class GatewayRunner:
|
|||
# Periodic "still working" notifications for long-running tasks.
|
||||
# Fires every N seconds so the user knows the agent hasn't died.
|
||||
# Config: agent.gateway_notify_interval in config.yaml, or
|
||||
# HERMES_AGENT_NOTIFY_INTERVAL env var. Default 600s (10 min).
|
||||
# HERMES_AGENT_NOTIFY_INTERVAL env var. Default 180s (3 min).
|
||||
# 0 = disable notifications.
|
||||
_NOTIFY_INTERVAL_RAW = float(os.getenv("HERMES_AGENT_NOTIFY_INTERVAL", 600))
|
||||
_NOTIFY_INTERVAL_RAW = float(os.getenv("HERMES_AGENT_NOTIFY_INTERVAL", 180))
|
||||
_NOTIFY_INTERVAL = _NOTIFY_INTERVAL_RAW if _NOTIFY_INTERVAL_RAW > 0 else None
|
||||
_notify_start = time.time()
|
||||
|
||||
|
|
|
|||
|
|
@ -384,7 +384,11 @@ DEFAULT_CONFIG = {
|
|||
# Periodic "still working" notification interval (seconds).
|
||||
# Sends a status message every N seconds so the user knows the
|
||||
# agent hasn't died during long tasks. 0 = disable notifications.
|
||||
"gateway_notify_interval": 600,
|
||||
# Lower values mean faster feedback on slow tasks but more chat
|
||||
# noise; 180s is a compromise that catches spinning weak-model runs
|
||||
# (60+ tool iterations with tiny output) before users assume the
|
||||
# bot is dead and /restart.
|
||||
"gateway_notify_interval": 180,
|
||||
},
|
||||
|
||||
"terminal": {
|
||||
|
|
|
|||
|
|
@ -175,6 +175,60 @@ def _request_gateway_self_restart(pid: int) -> bool:
|
|||
return True
|
||||
|
||||
|
||||
def _graceful_restart_via_sigusr1(pid: int, drain_timeout: float) -> bool:
|
||||
"""Send SIGUSR1 to a gateway PID and wait for it to exit gracefully.
|
||||
|
||||
SIGUSR1 is wired in gateway/run.py to ``request_restart(via_service=True)``
|
||||
which drains in-flight agent runs (up to ``agent.restart_drain_timeout``
|
||||
seconds), then exits with code 75. Both systemd (``Restart=on-failure``
|
||||
+ ``RestartForceExitStatus=75``) and launchd (``KeepAlive.SuccessfulExit
|
||||
= false``) relaunch the process after the graceful exit.
|
||||
|
||||
This is the drain-aware alternative to ``systemctl restart`` / ``SIGTERM``,
|
||||
which SIGKILL in-flight agents after a short timeout.
|
||||
|
||||
Args:
|
||||
pid: Gateway process PID (systemd MainPID, launchd PID, or bare
|
||||
process PID).
|
||||
drain_timeout: Seconds to wait for the process to exit after sending
|
||||
SIGUSR1. Should be slightly larger than the gateway's
|
||||
``agent.restart_drain_timeout`` to allow the drain loop to
|
||||
finish cleanly.
|
||||
|
||||
Returns:
|
||||
True if the PID was signalled and exited within the timeout.
|
||||
False if SIGUSR1 couldn't be sent or the process didn't exit in
|
||||
time (caller should fall back to a harder restart path).
|
||||
"""
|
||||
if not hasattr(signal, "SIGUSR1"):
|
||||
return False
|
||||
if pid <= 0:
|
||||
return False
|
||||
try:
|
||||
os.kill(pid, signal.SIGUSR1)
|
||||
except ProcessLookupError:
|
||||
# Already gone — nothing to drain.
|
||||
return True
|
||||
except (PermissionError, OSError):
|
||||
return False
|
||||
|
||||
import time as _time
|
||||
|
||||
deadline = _time.monotonic() + max(drain_timeout, 1.0)
|
||||
while _time.monotonic() < deadline:
|
||||
try:
|
||||
os.kill(pid, 0) # signal 0 — probe liveness
|
||||
except ProcessLookupError:
|
||||
return True
|
||||
except PermissionError:
|
||||
# Process still exists but we can't signal it. Treat as alive
|
||||
# so the caller falls back.
|
||||
pass
|
||||
_time.sleep(0.5)
|
||||
# Drain didn't finish in time.
|
||||
return False
|
||||
|
||||
|
||||
def _append_unique_pid(pids: list[int], pid: int | None, exclude_pids: set[int]) -> None:
|
||||
if pid is None or pid <= 0:
|
||||
return
|
||||
|
|
|
|||
|
|
@ -5864,12 +5864,15 @@ def _cmd_update_impl(args, gateway_mode: bool):
|
|||
# Write exit code *before* the gateway restart attempt.
|
||||
# When running as ``hermes update --gateway`` (spawned by the gateway's
|
||||
# /update command), this process lives inside the gateway's systemd
|
||||
# cgroup. ``systemctl restart hermes-gateway`` kills everything in the
|
||||
# cgroup (KillMode=mixed → SIGKILL to remaining processes), including
|
||||
# us and the wrapping bash shell. The shell never reaches its
|
||||
# ``printf $status > .update_exit_code`` epilogue, so the exit-code
|
||||
# marker file is never created. The new gateway's update watcher then
|
||||
# polls for 30 minutes and sends a spurious timeout message.
|
||||
# cgroup. A graceful SIGUSR1 restart keeps the drain loop alive long
|
||||
# enough for the exit-code marker to be written below, but the
|
||||
# fallback ``systemctl restart`` path (see below) kills everything in
|
||||
# the cgroup (KillMode=mixed → SIGKILL to remaining processes),
|
||||
# including us and the wrapping bash shell. The shell never reaches
|
||||
# its ``printf $status > .update_exit_code`` epilogue, so the
|
||||
# exit-code marker file would never be created. The new gateway's
|
||||
# update watcher would then poll for 30 minutes and send a spurious
|
||||
# timeout message.
|
||||
#
|
||||
# Writing the marker here — after git pull + pip install succeed but
|
||||
# before we attempt the restart — ensures the new gateway sees it
|
||||
|
|
@ -5891,9 +5894,37 @@ def _cmd_update_impl(args, gateway_mode: bool):
|
|||
_ensure_user_systemd_env,
|
||||
find_gateway_pids,
|
||||
_get_service_pids,
|
||||
_graceful_restart_via_sigusr1,
|
||||
)
|
||||
import signal as _signal
|
||||
|
||||
# Drain budget for graceful SIGUSR1 restarts. The gateway drains
|
||||
# for up to ``agent.restart_drain_timeout`` (default 60s) before
|
||||
# exiting with code 75; we wait slightly longer so the drain
|
||||
# completes before we fall back to a hard restart. On older
|
||||
# systemd units without SIGUSR1 wiring this wait just times out
|
||||
# and we fall back to ``systemctl restart`` (the old behaviour).
|
||||
try:
|
||||
from hermes_constants import (
|
||||
DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT as _DEFAULT_DRAIN,
|
||||
)
|
||||
except Exception:
|
||||
_DEFAULT_DRAIN = 60.0
|
||||
_cfg_drain = None
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
_cfg_agent = (load_config().get("agent") or {})
|
||||
_cfg_drain = _cfg_agent.get("restart_drain_timeout")
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
_drain_budget = float(_cfg_drain) if _cfg_drain is not None else float(_DEFAULT_DRAIN)
|
||||
except (TypeError, ValueError):
|
||||
_drain_budget = float(_DEFAULT_DRAIN)
|
||||
# Add a 15s margin so the drain loop + final exit finish before
|
||||
# we escalate to ``systemctl restart`` / SIGTERM.
|
||||
_drain_budget = max(_drain_budget, 30.0) + 15.0
|
||||
|
||||
restarted_services = []
|
||||
killed_pids = set()
|
||||
|
||||
|
|
@ -5940,7 +5971,62 @@ def _cmd_update_impl(args, gateway_mode: bool):
|
|||
text=True,
|
||||
timeout=5,
|
||||
)
|
||||
if check.stdout.strip() == "active":
|
||||
if check.stdout.strip() != "active":
|
||||
continue
|
||||
|
||||
# Prefer a graceful SIGUSR1 restart so in-flight
|
||||
# agent runs drain instead of being SIGKILLed.
|
||||
# The gateway's SIGUSR1 handler calls
|
||||
# request_restart(via_service=True) → drain →
|
||||
# exit(75); systemd's Restart=on-failure (and
|
||||
# RestartForceExitStatus=75) respawns the unit.
|
||||
_main_pid = 0
|
||||
try:
|
||||
_show = subprocess.run(
|
||||
scope_cmd + [
|
||||
"show", svc_name,
|
||||
"--property=MainPID", "--value",
|
||||
],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
_main_pid = int((_show.stdout or "").strip() or 0)
|
||||
except (ValueError, subprocess.TimeoutExpired, FileNotFoundError):
|
||||
_main_pid = 0
|
||||
|
||||
_graceful_ok = False
|
||||
if _main_pid > 0:
|
||||
print(
|
||||
f" → {svc_name}: draining (up to {int(_drain_budget)}s)..."
|
||||
)
|
||||
_graceful_ok = _graceful_restart_via_sigusr1(
|
||||
_main_pid, drain_timeout=_drain_budget,
|
||||
)
|
||||
|
||||
if _graceful_ok:
|
||||
# Gateway exited 75; systemd should relaunch
|
||||
# via Restart=on-failure. Verify the new
|
||||
# process came up.
|
||||
_time.sleep(3)
|
||||
verify = subprocess.run(
|
||||
scope_cmd + ["is-active", svc_name],
|
||||
capture_output=True, text=True, timeout=5,
|
||||
)
|
||||
if verify.stdout.strip() == "active":
|
||||
restarted_services.append(svc_name)
|
||||
continue
|
||||
# Process exited but wasn't respawned (older
|
||||
# unit without Restart=on-failure or
|
||||
# RestartForceExitStatus=75). Fall through
|
||||
# to systemctl start/restart.
|
||||
print(
|
||||
f" ⚠ {svc_name} drained but didn't relaunch — forcing restart"
|
||||
)
|
||||
|
||||
# Fallback: blunt systemctl restart. This is
|
||||
# what the old code always did; we get here only
|
||||
# when the graceful path failed (unit missing
|
||||
# SIGUSR1 wiring, drain exceeded the budget,
|
||||
# restart-policy mismatch).
|
||||
restart = subprocess.run(
|
||||
scope_cmd + ["restart", svc_name],
|
||||
capture_output=True,
|
||||
|
|
|
|||
|
|
@ -422,6 +422,152 @@ class TestCmdUpdateLaunchdRestart:
|
|||
]
|
||||
assert len(restart_calls) == 1
|
||||
|
||||
@patch("shutil.which", return_value=None)
|
||||
@patch("subprocess.run")
|
||||
def test_update_prefers_sigusr1_over_systemctl_restart_when_mainpid_known(
|
||||
self, mock_run, _mock_which, mock_args, capsys, monkeypatch,
|
||||
):
|
||||
"""Drain-aware update: when systemctl show reports a MainPID, the
|
||||
update path sends SIGUSR1 and waits for graceful exit + respawn,
|
||||
instead of ``systemctl restart`` (which SIGKILLs in-flight agents).
|
||||
"""
|
||||
monkeypatch.setattr(gateway_cli, "is_macos", lambda: False)
|
||||
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True)
|
||||
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
|
||||
|
||||
# Track state: before kill → "active" (old PID),
|
||||
# after kill + exit → briefly inactive, then "active" again (new PID).
|
||||
state = {"killed": False}
|
||||
|
||||
def side_effect(cmd, **kwargs):
|
||||
joined = " ".join(str(c) for c in cmd)
|
||||
|
||||
if "rev-parse" in joined and "--abbrev-ref" in joined:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="main\n", stderr="")
|
||||
if "rev-parse" in joined and "--verify" in joined:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
if "rev-list" in joined:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="3\n", stderr="")
|
||||
|
||||
# Only expose a user-scope service.
|
||||
if "systemctl" in joined and "list-units" in joined:
|
||||
if "--user" in joined:
|
||||
return subprocess.CompletedProcess(
|
||||
cmd, 0,
|
||||
stdout="hermes-gateway.service loaded active running\n",
|
||||
stderr="",
|
||||
)
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
|
||||
if "systemctl" in joined and "is-active" in joined:
|
||||
# Pre-kill: active. Post-kill: active again (respawned by
|
||||
# Restart=on-failure). The drain loop verifies liveness
|
||||
# separately via os.kill(pid, 0).
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="active\n", stderr="")
|
||||
|
||||
# The new code path.
|
||||
if "systemctl" in joined and "show" in joined and "MainPID" in joined:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="4242\n", stderr="")
|
||||
|
||||
# If systemctl restart is called, this test fails its intent —
|
||||
# but still let it succeed so we can assert it was NOT called.
|
||||
if "systemctl" in joined and "restart" in joined:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="", stderr="")
|
||||
|
||||
mock_run.side_effect = side_effect
|
||||
|
||||
# Track SIGUSR1 delivery and simulate the gateway draining + exiting.
|
||||
sigusr1_sent = {"value": False}
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
import signal as _s
|
||||
if pid == 4242 and sig == _s.SIGUSR1:
|
||||
sigusr1_sent["value"] = True
|
||||
state["killed"] = True
|
||||
return
|
||||
if pid == 4242 and sig == 0:
|
||||
# Liveness probe — report dead once SIGUSR1 has been sent.
|
||||
if state["killed"]:
|
||||
raise ProcessLookupError()
|
||||
return
|
||||
# For any other PID/sig combination, succeed silently.
|
||||
return
|
||||
|
||||
monkeypatch.setattr("os.kill", fake_kill)
|
||||
|
||||
with patch.object(gateway_cli, "find_gateway_pids", return_value=[]):
|
||||
cmd_update(mock_args)
|
||||
|
||||
# SIGUSR1 must have been delivered to the gateway MainPID.
|
||||
assert sigusr1_sent["value"], "Expected SIGUSR1 to be sent to MainPID"
|
||||
|
||||
# And `systemctl restart` must NOT have been used (that's the
|
||||
# non-draining kill-everything path we're moving away from).
|
||||
restart_calls = [
|
||||
c for c in mock_run.call_args_list
|
||||
if "systemctl" in " ".join(str(a) for a in c.args[0])
|
||||
and "restart" in " ".join(str(a) for a in c.args[0])
|
||||
]
|
||||
assert restart_calls == [], (
|
||||
"Graceful SIGUSR1 succeeded; `systemctl restart` should not "
|
||||
f"have been called. Got: {restart_calls}"
|
||||
)
|
||||
|
||||
captured = capsys.readouterr().out
|
||||
assert "draining" in captured.lower()
|
||||
assert "Restarted hermes-gateway" in captured
|
||||
|
||||
@patch("shutil.which", return_value=None)
|
||||
@patch("subprocess.run")
|
||||
def test_update_falls_back_to_systemctl_restart_when_sigusr1_times_out(
|
||||
self, mock_run, _mock_which, mock_args, capsys, monkeypatch,
|
||||
):
|
||||
"""If the gateway doesn't exit within the drain budget (e.g. old unit
|
||||
missing ``Restart=on-failure`` or an agent ignoring SIGUSR1), the
|
||||
update path falls back to ``systemctl restart``.
|
||||
"""
|
||||
monkeypatch.setattr(gateway_cli, "is_macos", lambda: False)
|
||||
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True)
|
||||
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
|
||||
|
||||
mock_run.side_effect = _make_run_side_effect(
|
||||
commit_count="3",
|
||||
systemd_active=True,
|
||||
)
|
||||
|
||||
# Patch systemctl show to report MainPID=4242 so cmd_update attempts
|
||||
# the graceful path.
|
||||
orig = mock_run.side_effect
|
||||
def wrapped(cmd, **kwargs):
|
||||
joined = " ".join(str(c) for c in cmd)
|
||||
if "systemctl" in joined and "show" in joined and "MainPID" in joined:
|
||||
return subprocess.CompletedProcess(cmd, 0, stdout="4242\n", stderr="")
|
||||
return orig(cmd, **kwargs)
|
||||
mock_run.side_effect = wrapped
|
||||
|
||||
# Simulate the drain helper failing to confirm a clean exit — either
|
||||
# because the gateway ignored SIGUSR1 or the drain budget was
|
||||
# exceeded. cmd_update() should detect this and escalate.
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.gateway._graceful_restart_via_sigusr1",
|
||||
lambda pid, drain_timeout: False,
|
||||
)
|
||||
|
||||
with patch.object(gateway_cli, "find_gateway_pids", return_value=[]):
|
||||
cmd_update(mock_args)
|
||||
|
||||
# Fallback kicked in → systemctl restart was called.
|
||||
restart_calls = [
|
||||
c for c in mock_run.call_args_list
|
||||
if "systemctl" in " ".join(str(a) for a in c.args[0])
|
||||
and "restart" in " ".join(str(a) for a in c.args[0])
|
||||
]
|
||||
assert len(restart_calls) >= 1, (
|
||||
"Drain path failed; expected fallback `systemctl restart`."
|
||||
)
|
||||
|
||||
@patch("shutil.which", return_value=None)
|
||||
@patch("subprocess.run")
|
||||
def test_update_no_gateway_running_skips_restart(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue