mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
feat(gateway): shutdown forensics — non-blocking diag, per-phase timing, stale-unit warning (#23285)
When the gateway received SIGTERM, the shutdown_signal_handler ran a
synchronous 'ps aux' (3s timeout) inside the asyncio event loop, then
asyncio.create_task(runner.stop()). On a busy host that ate 1-3s of
the teardown budget before draining could even start, and the resulting
log line was a multi-line ps dump that didn't tell us who sent the
signal. The shutdown path itself logged 'Stopping gateway...' and then
nothing until 'Gateway stopped' — when systemd SIGKILLed mid-drain,
there was no way to see which phase wedged.
Changes:
- New gateway/shutdown_forensics.py:
* snapshot_shutdown_context(sig) — sub-millisecond /proc-only capture
of signal name, parent pid+name+cmdline, INVOCATION_ID (systemd
marker), loadavg_1m, TracerPid, takeover/planned-stop marker
presence + whether-it-names-self. Pure stdlib, never raises.
* spawn_async_diagnostic(log_path, sig) — detached subprocess with
its own 'timeout 5s', start_new_session=True, writes ps auxf +
pstree + dmesg to ~/.hermes/logs/gateway-shutdown-diag.log.
Returns immediately, can't block the event loop or the cgroup
teardown.
* check_systemd_timing_alignment(drain_timeout) — reads
/proc/self/cgroup for our unit, asks systemctl show for
TimeoutStopUSec, returns mismatch info when the unit's stop
timeout is smaller than restart_drain_timeout + 30s headroom
(the case where systemd SIGKILLs mid-drain).
* _parse_systemd_duration_to_us — covers '90s', '1min 30s',
'500ms', '1h' style values from systemctl show.
* format_context_for_log — single scannable key=value line, parent
cmdline last.
- gateway/run.py shutdown_signal_handler:
* Replaces synchronous ps aux + ad-hoc 'hermes-related lines' filter
with snapshot + detached spawn.
* Always logs 'Shutdown context: signal=... parent_pid=...
parent_cmdline=...' regardless of planned/unexpected so we can
correlate signal source even on planned restarts.
- gateway/run.py _stop_impl:
* Per-phase '+X.XXs' timing for notify_active_sessions, drain
(with drain_seconds, active_at_start, active_now, timed_out),
post-interrupt tool kill, each adapter disconnect (Xs),
all adapters disconnected, final-cleanup tool kill, SessionDB
close, total teardown.
- gateway/run.py start():
* Stale-unit warning at startup when the running systemd unit's
TimeoutStopSec is smaller than the configured drain timeout.
Points the user at 'hermes gateway service install --replace'
to regenerate, or at shortening agent.restart_drain_timeout.
Tests: 30 new in tests/gateway/test_shutdown_forensics.py — snapshot
speed bound, signal name resolution, marker detection self-vs-other,
async diag spawn doesn't block caller, systemd duration parser, and
alignment check returns None outside systemd. Wider tests/gateway/
suite: 5258 passing, 3 pre-existing TTS-routing failures unchanged
on main.
This commit is contained in:
parent
1f5983c4c8
commit
cede612987
3 changed files with 828 additions and 26 deletions
141
gateway/run.py
141
gateway/run.py
|
|
@ -3206,6 +3206,28 @@ class GatewayRunner:
|
|||
except RuntimeError:
|
||||
self._gateway_loop = None
|
||||
logger.info("Session storage: %s", self.config.sessions_dir)
|
||||
|
||||
# Sanity-check that systemd's TimeoutStopSec covers our drain
|
||||
# window. When the user upgraded hermes-agent without re-running
|
||||
# ``hermes setup``, their unit file may still encode the old
|
||||
# default — in which case SIGKILL hits mid-drain and looks like
|
||||
# a phantom kill in the journal. Best-effort, never raises.
|
||||
try:
|
||||
from gateway.shutdown_forensics import check_systemd_timing_alignment
|
||||
_alignment = check_systemd_timing_alignment(self._restart_drain_timeout)
|
||||
if _alignment is not None and _alignment.get("mismatch"):
|
||||
logger.warning(
|
||||
"Stale systemd unit detected: %s has TimeoutStopSec=%.0fs but "
|
||||
"drain_timeout=%.0fs (expected >=%.0fs). systemd may SIGKILL the "
|
||||
"gateway mid-drain. Run `hermes gateway service install --replace` "
|
||||
"to regenerate the unit, or shorten agent.restart_drain_timeout.",
|
||||
_alignment.get("unit", "(unknown)"),
|
||||
_alignment["timeout_stop_sec"],
|
||||
_alignment["drain_timeout"],
|
||||
_alignment["expected_min"],
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("check_systemd_timing_alignment failed: %s", _e)
|
||||
# Log the resolved max_iterations budget so operators can verify the
|
||||
# config.yaml → env bridge did the right thing at a glance (instead
|
||||
# of silently running at a stale .env value for weeks).
|
||||
|
|
@ -4498,15 +4520,34 @@ class GatewayRunner:
|
|||
"Stopping gateway%s...",
|
||||
" for restart" if self._restart_requested else "",
|
||||
)
|
||||
_stop_started_at = time.monotonic()
|
||||
|
||||
def _phase_elapsed() -> float:
|
||||
return time.monotonic() - _stop_started_at
|
||||
|
||||
self._running = False
|
||||
self._draining = True
|
||||
|
||||
# Notify all chats with active agents BEFORE draining.
|
||||
# Adapters are still connected here, so messages can be sent.
|
||||
await self._notify_active_sessions_of_shutdown()
|
||||
logger.info(
|
||||
"Shutdown phase: notify_active_sessions done at +%.2fs",
|
||||
_phase_elapsed(),
|
||||
)
|
||||
|
||||
timeout = self._restart_drain_timeout
|
||||
_drain_started_at = time.monotonic()
|
||||
active_agents, timed_out = await self._drain_active_agents(timeout)
|
||||
logger.info(
|
||||
"Shutdown phase: drain done at +%.2fs (drain took %.2fs, "
|
||||
"timed_out=%s, active_at_start=%d, active_now=%d)",
|
||||
_phase_elapsed(),
|
||||
time.monotonic() - _drain_started_at,
|
||||
timed_out,
|
||||
len(active_agents),
|
||||
self._running_agent_count(),
|
||||
)
|
||||
if timed_out:
|
||||
logger.warning(
|
||||
"Gateway drain timed out after %.1fs with %d active agent(s); interrupting remaining work.",
|
||||
|
|
@ -4564,6 +4605,10 @@ class GatewayRunner:
|
|||
# killed by systemd instead of us (issue #8202). The final
|
||||
# catch-all cleanup below still runs for the graceful path.
|
||||
_kill_tool_subprocesses("post-interrupt")
|
||||
logger.info(
|
||||
"Shutdown phase: post-interrupt tool kill done at +%.2fs",
|
||||
_phase_elapsed(),
|
||||
)
|
||||
|
||||
if self._restart_requested and self._restart_detached:
|
||||
try:
|
||||
|
|
@ -4591,15 +4636,29 @@ class GatewayRunner:
|
|||
self._cleanup_agent_resources(_agent)
|
||||
|
||||
for platform, adapter in list(self.adapters.items()):
|
||||
_adapter_started_at = time.monotonic()
|
||||
try:
|
||||
await adapter.cancel_background_tasks()
|
||||
except Exception as e:
|
||||
logger.debug("✗ %s background-task cancel error: %s", platform.value, e)
|
||||
try:
|
||||
await adapter.disconnect()
|
||||
logger.info("✓ %s disconnected", platform.value)
|
||||
logger.info(
|
||||
"✓ %s disconnected (%.2fs)",
|
||||
platform.value,
|
||||
time.monotonic() - _adapter_started_at,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("✗ %s disconnect error: %s", platform.value, e)
|
||||
logger.error(
|
||||
"✗ %s disconnect error after %.2fs: %s",
|
||||
platform.value,
|
||||
time.monotonic() - _adapter_started_at,
|
||||
e,
|
||||
)
|
||||
logger.info(
|
||||
"Shutdown phase: all adapters disconnected at +%.2fs",
|
||||
_phase_elapsed(),
|
||||
)
|
||||
|
||||
for _task in list(self._background_tasks):
|
||||
if _task is self._stop_task:
|
||||
|
|
@ -4624,6 +4683,10 @@ class GatewayRunner:
|
|||
# that got respawned between the earlier call and adapter
|
||||
# disconnect (defense in depth; safe to call repeatedly).
|
||||
_kill_tool_subprocesses("final-cleanup")
|
||||
logger.info(
|
||||
"Shutdown phase: final-cleanup tool kill done at +%.2fs",
|
||||
_phase_elapsed(),
|
||||
)
|
||||
|
||||
# Reap the process-global auxiliary-client cache once at the very
|
||||
# end of teardown. Per-turn cleanup runs in _cleanup_agent_resources
|
||||
|
|
@ -4651,6 +4714,10 @@ class GatewayRunner:
|
|||
_db.close()
|
||||
except Exception as _e:
|
||||
logger.debug("SessionDB close error: %s", _e)
|
||||
logger.info(
|
||||
"Shutdown phase: SessionDB close done at +%.2fs",
|
||||
_phase_elapsed(),
|
||||
)
|
||||
|
||||
from gateway.status import remove_pid_file, release_gateway_runtime_lock
|
||||
remove_pid_file()
|
||||
|
|
@ -4690,7 +4757,7 @@ class GatewayRunner:
|
|||
|
||||
self._draining = False
|
||||
self._update_runtime_status("stopped", self._exit_reason)
|
||||
logger.info("Gateway stopped")
|
||||
logger.info("Gateway stopped (total teardown %.2fs)", _phase_elapsed())
|
||||
|
||||
self._stop_task = asyncio.create_task(_stop_impl())
|
||||
await self._stop_task
|
||||
|
|
@ -15762,40 +15829,62 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
|
|||
except Exception as e:
|
||||
logger.debug("Planned stop marker check failed: %s", e)
|
||||
|
||||
# Fast (<10ms) snapshot of who's asking us to shut down — runs
|
||||
# synchronously inside the asyncio signal handler, so we keep it
|
||||
# purely stdlib + /proc reads, no subprocesses. See PR #15826
|
||||
# (May 2026): the previous implementation called `ps aux` here
|
||||
# synchronously, blocking the event loop for up to 3s while
|
||||
# adapter teardown couldn't begin.
|
||||
try:
|
||||
from gateway.shutdown_forensics import (
|
||||
format_context_for_log,
|
||||
snapshot_shutdown_context,
|
||||
spawn_async_diagnostic,
|
||||
)
|
||||
_shutdown_ctx = snapshot_shutdown_context(received_signal)
|
||||
except Exception as _e:
|
||||
_shutdown_ctx = None
|
||||
logger.debug("snapshot_shutdown_context failed: %s", _e)
|
||||
|
||||
if planned_takeover:
|
||||
logger.info(
|
||||
"Received SIGTERM as a planned --replace takeover — exiting cleanly"
|
||||
"Received %s as a planned --replace takeover — exiting cleanly",
|
||||
_shutdown_ctx["signal"] if _shutdown_ctx else "SIGTERM",
|
||||
)
|
||||
elif planned_stop:
|
||||
logger.info(
|
||||
"Received SIGTERM/SIGINT as a planned gateway stop — exiting cleanly"
|
||||
"Received %s as a planned gateway stop — exiting cleanly",
|
||||
_shutdown_ctx["signal"] if _shutdown_ctx else "SIGTERM/SIGINT",
|
||||
)
|
||||
else:
|
||||
_signal_initiated_shutdown = True
|
||||
logger.info("Received SIGTERM/SIGINT — initiating shutdown")
|
||||
# Diagnostic: log all hermes-related processes so we can identify
|
||||
# what triggered the signal (hermes update, hermes gateway restart,
|
||||
# a stale detached subprocess, etc.).
|
||||
try:
|
||||
import subprocess as _sp
|
||||
_ps = _sp.run(
|
||||
["ps", "aux"],
|
||||
capture_output=True, text=True, timeout=3,
|
||||
logger.info(
|
||||
"Received %s — initiating shutdown",
|
||||
_shutdown_ctx["signal"] if _shutdown_ctx else "SIGTERM/SIGINT",
|
||||
)
|
||||
_hermes_procs = [
|
||||
line for line in _ps.stdout.splitlines()
|
||||
if ("hermes" in line.lower() or "gateway" in line.lower())
|
||||
and str(os.getpid()) not in line.split()[1:2] # exclude self
|
||||
]
|
||||
if _hermes_procs:
|
||||
|
||||
# Always log who/what triggered the signal — most useful single
|
||||
# line when diagnosing "the gateway keeps dying" tickets. Format
|
||||
# is one line, key=value, parent_cmdline last (often long).
|
||||
if _shutdown_ctx is not None:
|
||||
try:
|
||||
logger.warning(
|
||||
"Shutdown diagnostic — other hermes processes running:\n %s",
|
||||
"\n ".join(_hermes_procs),
|
||||
"Shutdown context: %s", format_context_for_log(_shutdown_ctx)
|
||||
)
|
||||
else:
|
||||
logger.info("Shutdown diagnostic — no other hermes processes found")
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as _e:
|
||||
logger.debug("format_context_for_log failed: %s", _e)
|
||||
|
||||
# Spawn the heavyweight diagnostic (ps auxf, pstree, dmesg) in
|
||||
# a detached subprocess so it can finish writing to disk even
|
||||
# if our cgroup is being torn down. Bounded by an internal
|
||||
# timeout; never blocks the event loop here.
|
||||
try:
|
||||
_diag_log = _hermes_home / "logs" / "gateway-shutdown-diag.log"
|
||||
spawn_async_diagnostic(
|
||||
_diag_log, _shutdown_ctx["signal"], timeout_seconds=5.0
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("spawn_async_diagnostic failed: %s", _e)
|
||||
asyncio.create_task(runner.stop())
|
||||
|
||||
def restart_signal_handler():
|
||||
|
|
|
|||
463
gateway/shutdown_forensics.py
Normal file
463
gateway/shutdown_forensics.py
Normal file
|
|
@ -0,0 +1,463 @@
|
|||
"""Shutdown forensics — capture context when the gateway receives SIGTERM/SIGINT.
|
||||
|
||||
The gateway's ``shutdown_signal_handler`` runs synchronously inside the
|
||||
asyncio event loop. We can't safely block it for long, but we DO want a
|
||||
durable record of who/what triggered the shutdown so that "the gateway
|
||||
keeps dying" incidents can be diagnosed after the fact.
|
||||
|
||||
This module exposes :func:`snapshot_shutdown_context`, a fast (<10ms),
|
||||
non-blocking probe that returns a structured dict the signal handler can
|
||||
log immediately, plus :func:`spawn_async_diagnostic`, a fire-and-forget
|
||||
``ps`` walk that runs as a detached subprocess so it can't block teardown
|
||||
even if /proc is wedged.
|
||||
|
||||
Anything that needs to wait (e.g. shelling out to ``ps aux``) belongs in
|
||||
the async helper, never in the synchronous probe.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
_SIGNAL_NAME_BY_NUM: Dict[int, str] = {}
|
||||
for _name in ("SIGTERM", "SIGINT", "SIGHUP", "SIGQUIT", "SIGUSR1", "SIGUSR2"):
|
||||
_val = getattr(signal, _name, None)
|
||||
if _val is not None:
|
||||
_SIGNAL_NAME_BY_NUM[int(_val)] = _name
|
||||
|
||||
|
||||
def _signal_name(sig: Any) -> str:
|
||||
"""Return a human-readable signal name (or ``str(sig)`` as fallback)."""
|
||||
if sig is None:
|
||||
return "UNKNOWN"
|
||||
try:
|
||||
sig_int = int(sig)
|
||||
except (TypeError, ValueError):
|
||||
return str(sig)
|
||||
return _SIGNAL_NAME_BY_NUM.get(sig_int, f"signal#{sig_int}")
|
||||
|
||||
|
||||
def _read_proc_field(pid: int, key: str) -> Optional[str]:
|
||||
"""Read a single field from /proc/<pid>/status. Linux only; None elsewhere."""
|
||||
try:
|
||||
with open(f"/proc/{pid}/status", encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
if line.startswith(key + ":"):
|
||||
return line.split(":", 1)[1].strip()
|
||||
except (FileNotFoundError, PermissionError, OSError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _read_proc_cmdline(pid: int) -> Optional[str]:
|
||||
"""Read /proc/<pid>/cmdline as a printable string. Linux only; None elsewhere."""
|
||||
try:
|
||||
with open(f"/proc/{pid}/cmdline", "rb") as fh:
|
||||
data = fh.read()
|
||||
except (FileNotFoundError, PermissionError, OSError):
|
||||
return None
|
||||
if not data:
|
||||
return None
|
||||
# cmdline uses NUL separators
|
||||
return data.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()
|
||||
|
||||
|
||||
def _proc_summary(pid: int) -> Dict[str, Any]:
|
||||
"""Compact /proc/<pid> snapshot: pid, ppid, state, uid, cmdline.
|
||||
|
||||
Best-effort. Missing fields are simply omitted rather than raising.
|
||||
"""
|
||||
summary: Dict[str, Any] = {"pid": pid}
|
||||
if pid <= 0:
|
||||
return summary
|
||||
name = _read_proc_field(pid, "Name")
|
||||
if name is not None:
|
||||
summary["name"] = name
|
||||
state = _read_proc_field(pid, "State")
|
||||
if state is not None:
|
||||
summary["state"] = state
|
||||
ppid = _read_proc_field(pid, "PPid")
|
||||
if ppid is not None:
|
||||
try:
|
||||
summary["ppid"] = int(ppid)
|
||||
except ValueError:
|
||||
pass
|
||||
uid = _read_proc_field(pid, "Uid")
|
||||
if uid is not None:
|
||||
# "real effective saved fs"
|
||||
summary["uid"] = uid.split()[0] if uid else uid
|
||||
cmdline = _read_proc_cmdline(pid)
|
||||
if cmdline:
|
||||
# Truncate aggressively — these can be 4KB
|
||||
summary["cmdline"] = cmdline[:300]
|
||||
return summary
|
||||
|
||||
|
||||
def snapshot_shutdown_context(received_signal: Any = None) -> Dict[str, Any]:
|
||||
"""Fast (<10ms) snapshot of who/what is asking us to shut down.
|
||||
|
||||
Captures:
|
||||
|
||||
* The signal number/name (so SIGINT vs SIGTERM is visible)
|
||||
* Our own PID/ppid + parent process info from /proc (Linux)
|
||||
* Whether systemd is our parent (``ppid==1`` or ``INVOCATION_ID`` set)
|
||||
* Whether takeover/planned-stop markers exist (consumed lazily by the caller)
|
||||
* /proc/self limits + load average (1-min)
|
||||
* Wall-clock and monotonic timestamps for cross-correlating later phases
|
||||
|
||||
Pure stdlib, never raises, never blocks on subprocesses.
|
||||
"""
|
||||
now = time.time()
|
||||
monotonic = time.monotonic()
|
||||
pid = os.getpid()
|
||||
ppid = os.getppid()
|
||||
|
||||
ctx: Dict[str, Any] = {
|
||||
"ts": now,
|
||||
"ts_monotonic": monotonic,
|
||||
"signal": _signal_name(received_signal),
|
||||
"signal_num": int(received_signal) if received_signal is not None else None,
|
||||
"pid": pid,
|
||||
"ppid": ppid,
|
||||
"parent": _proc_summary(ppid),
|
||||
"self": _proc_summary(pid),
|
||||
}
|
||||
|
||||
# systemd context. If we were started by a systemd unit, INVOCATION_ID
|
||||
# is set in our env. ppid==1 (init) is also a strong signal that
|
||||
# systemd reaped+forwarded the SIGTERM.
|
||||
invocation_id = os.environ.get("INVOCATION_ID")
|
||||
if invocation_id:
|
||||
ctx["systemd_invocation_id"] = invocation_id
|
||||
journal_stream = os.environ.get("JOURNAL_STREAM")
|
||||
if journal_stream:
|
||||
ctx["systemd_journal_stream"] = journal_stream
|
||||
ctx["under_systemd"] = bool(invocation_id) or ppid == 1
|
||||
|
||||
# Load average — high load points the finger at "something else
|
||||
# crushing the box" rather than "external killer".
|
||||
try:
|
||||
ctx["loadavg_1m"] = os.getloadavg()[0]
|
||||
except (OSError, AttributeError):
|
||||
pass
|
||||
|
||||
# /proc/self/status TracerPid: nonzero means a debugger / strace is
|
||||
# attached. Useful when "phantom SIGKILL" turns out to be a manual
|
||||
# gdb session.
|
||||
try:
|
||||
tracer = _read_proc_field(pid, "TracerPid")
|
||||
if tracer is not None and tracer != "0":
|
||||
ctx["tracer_pid"] = int(tracer) if tracer.isdigit() else tracer
|
||||
ctx["tracer"] = _proc_summary(int(tracer)) if tracer.isdigit() else None
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
|
||||
# Race-detection hint: did somebody recently start a sibling gateway
|
||||
# with --replace? We can't see the new process directly here, but if
|
||||
# there's a takeover marker on disk that DOESN'T name us, that's a
|
||||
# smoking gun for "another --replace instance is killing us".
|
||||
# Filenames mirror gateway.status (._TAKEOVER_MARKER_FILENAME /
|
||||
# _PLANNED_STOP_MARKER_FILENAME); we use string literals here so the
|
||||
# signal-handler path stays import-light.
|
||||
try:
|
||||
hermes_home_str = os.environ.get("HERMES_HOME")
|
||||
if hermes_home_str:
|
||||
takeover_path = Path(hermes_home_str) / ".gateway-takeover.json"
|
||||
if takeover_path.exists():
|
||||
try:
|
||||
raw = takeover_path.read_text(encoding="utf-8")
|
||||
ctx["takeover_marker"] = raw[:300]
|
||||
ctx["takeover_marker_for_self"] = (
|
||||
f'"target_pid": {pid}' in raw
|
||||
or f"'target_pid': {pid}" in raw
|
||||
)
|
||||
except OSError:
|
||||
pass
|
||||
planned_stop_path = Path(hermes_home_str) / ".gateway-planned-stop.json"
|
||||
if planned_stop_path.exists():
|
||||
try:
|
||||
raw = planned_stop_path.read_text(encoding="utf-8")
|
||||
ctx["planned_stop_marker"] = raw[:300]
|
||||
except OSError:
|
||||
pass
|
||||
except Exception: # noqa: BLE001 — never raise from a signal handler
|
||||
pass
|
||||
|
||||
return ctx
|
||||
|
||||
|
||||
def spawn_async_diagnostic(
|
||||
log_path: Path,
|
||||
signal_name: str,
|
||||
*,
|
||||
timeout_seconds: float = 5.0,
|
||||
) -> Optional[int]:
|
||||
"""Fire-and-forget ``ps``-style snapshot written to ``log_path``.
|
||||
|
||||
Runs as a detached subprocess so it can't block the asyncio event loop
|
||||
or compete with platform teardown. The subprocess uses its own
|
||||
``timeout`` so a wedged ``ps`` still self-cleans within
|
||||
``timeout_seconds``.
|
||||
|
||||
Returns the subprocess PID on success, ``None`` on failure. Never
|
||||
raises.
|
||||
|
||||
We deliberately avoid ``subprocess.run(["ps", "aux"])`` from inside the
|
||||
signal handler (the pre-existing pattern): on a busy host with hundreds
|
||||
of processes, ``ps aux`` can take >2s to walk /proc, during which the
|
||||
asyncio loop is frozen and adapter teardown can't begin.
|
||||
"""
|
||||
try:
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
# Inline shell so we don't have to ship a helper script. bash -c is
|
||||
# available on every POSIX target we support; on Windows we just skip
|
||||
# the snapshot (the platform doesn't ship ps anyway).
|
||||
if sys.platform == "win32":
|
||||
return None
|
||||
|
||||
script = (
|
||||
f"echo '=== shutdown diagnostic @ {signal_name} ==='; "
|
||||
"echo '--- date ---'; date -u +%Y-%m-%dT%H:%M:%SZ; "
|
||||
"echo '--- ps auxf (top 60 by cpu) ---'; "
|
||||
"ps auxf --sort=-pcpu 2>/dev/null | head -60; "
|
||||
"echo '--- pstree of self ---'; "
|
||||
f"pstree -plau {os.getpid()} 2>/dev/null | head -40 || true; "
|
||||
"echo '--- /proc/loadavg ---'; "
|
||||
"cat /proc/loadavg 2>/dev/null || true; "
|
||||
"echo '--- recent dmesg (oom/killed) ---'; "
|
||||
"dmesg -T 2>/dev/null | tail -20 || journalctl --user -n 20 --no-pager 2>/dev/null | tail -20 || true; "
|
||||
"echo '=== end ==='"
|
||||
)
|
||||
|
||||
try:
|
||||
# Open the log file in append mode and let the subprocess inherit.
|
||||
# We use os.O_APPEND so concurrent diagnostics from rapid signals
|
||||
# don't trample each other.
|
||||
fd = os.open(str(log_path), os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
try:
|
||||
# Detach from our process group so the subprocess survives even
|
||||
# if systemd kills our cgroup with KillMode=control-group (which
|
||||
# would also reap us anyway, but defense in depth). Without
|
||||
# start_new_session, a SIGKILL on our cgroup takes the diag down
|
||||
# before it can flush.
|
||||
proc = subprocess.Popen(
|
||||
["timeout", f"{timeout_seconds:.0f}", "bash", "-c", script],
|
||||
stdout=fd,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdin=subprocess.DEVNULL,
|
||||
start_new_session=True,
|
||||
close_fds=True,
|
||||
)
|
||||
except (FileNotFoundError, OSError):
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
return None
|
||||
finally:
|
||||
# Subprocess inherited the fd; we can drop our handle.
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
return proc.pid
|
||||
|
||||
|
||||
def format_context_for_log(ctx: Dict[str, Any]) -> str:
|
||||
"""Render a shutdown context dict as a single, scannable log line."""
|
||||
sig = ctx.get("signal", "?")
|
||||
parent = ctx.get("parent") or {}
|
||||
parent_cmd = parent.get("cmdline", "(unknown)")
|
||||
parent_name = parent.get("name") or "?"
|
||||
parent_pid = parent.get("pid") or "?"
|
||||
under_systemd = "yes" if ctx.get("under_systemd") else "no"
|
||||
load = ctx.get("loadavg_1m")
|
||||
load_str = f"{load:.2f}" if isinstance(load, (int, float)) else "?"
|
||||
extras: List[str] = []
|
||||
if ctx.get("takeover_marker") is not None:
|
||||
for_self = ctx.get("takeover_marker_for_self")
|
||||
extras.append(
|
||||
f"takeover_marker_present={'self' if for_self else 'other'}"
|
||||
)
|
||||
if ctx.get("planned_stop_marker") is not None:
|
||||
extras.append("planned_stop_marker_present=yes")
|
||||
if ctx.get("tracer_pid"):
|
||||
extras.append(f"tracer_pid={ctx['tracer_pid']}")
|
||||
extras_str = (" " + " ".join(extras)) if extras else ""
|
||||
# Parent cmdline is the most useful single signal — log it prominently.
|
||||
return (
|
||||
f"signal={sig} "
|
||||
f"under_systemd={under_systemd} "
|
||||
f"parent_pid={parent_pid} "
|
||||
f"parent_name={parent_name} "
|
||||
f"loadavg_1m={load_str}"
|
||||
f"{extras_str} "
|
||||
f"parent_cmdline={parent_cmd!r}"
|
||||
)
|
||||
|
||||
|
||||
def context_as_json(ctx: Dict[str, Any]) -> str:
|
||||
"""JSON-serialise a context dict for structured ingestion. Never raises."""
|
||||
try:
|
||||
return json.dumps(ctx, default=str, sort_keys=True)
|
||||
except (TypeError, ValueError):
|
||||
return "{}"
|
||||
|
||||
|
||||
def check_systemd_timing_alignment(drain_timeout: float) -> Optional[Dict[str, Any]]:
|
||||
"""At startup, sanity-check that systemd's TimeoutStopSec >= drain_timeout.
|
||||
|
||||
When the gateway is run under a stale systemd unit file (e.g. the user
|
||||
upgraded hermes-agent but never re-ran ``hermes setup`` to regenerate
|
||||
the unit), ``TimeoutStopSec`` can be smaller than the configured
|
||||
``restart_drain_timeout``. Result: SIGTERM arrives, the drain starts,
|
||||
and systemd SIGKILLs the cgroup mid-drain — looks like a phantom kill
|
||||
in the journal because the journal only logs ``code=killed status=9``.
|
||||
|
||||
Returns ``None`` when the alignment is fine OR we can't determine it
|
||||
(not running under systemd, ``systemctl`` unavailable, etc.). Returns
|
||||
a dict with ``timeout_stop_sec`` + ``drain_timeout`` + ``mismatch``
|
||||
bool when we have data to report.
|
||||
|
||||
Best-effort. Never raises.
|
||||
"""
|
||||
invocation_id = os.environ.get("INVOCATION_ID")
|
||||
if not invocation_id:
|
||||
return None # Not running under systemd (or at least not directly)
|
||||
|
||||
# Try to identify our unit name and ask systemctl for its config.
|
||||
unit_name: Optional[str] = None
|
||||
try:
|
||||
# /proc/self/cgroup gives us "0::/user.slice/.../hermes-gateway.service"
|
||||
with open("/proc/self/cgroup", encoding="utf-8") as fh:
|
||||
for line in fh:
|
||||
# systemd cgroup line ends with the unit name
|
||||
if ".service" in line:
|
||||
parts = line.strip().split("/")
|
||||
for p in reversed(parts):
|
||||
if p.endswith(".service"):
|
||||
unit_name = p
|
||||
break
|
||||
if unit_name:
|
||||
break
|
||||
except (OSError, FileNotFoundError):
|
||||
pass
|
||||
if not unit_name:
|
||||
return None
|
||||
|
||||
# Query systemctl for TimeoutStopUSec. Use --user OR system depending
|
||||
# on which manager actually owns the unit. Try user first since
|
||||
# that's the common case for hermes.
|
||||
timeout_us: Optional[int] = None
|
||||
for flag in (["--user"], []):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["systemctl", *flag, "show", unit_name, "--property=TimeoutStopUSec"],
|
||||
capture_output=True, text=True, timeout=2.0,
|
||||
)
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||||
continue
|
||||
if result.returncode != 0:
|
||||
continue
|
||||
# Output: "TimeoutStopUSec=1min 30s" or "TimeoutStopUSec=90000000"
|
||||
for line in result.stdout.splitlines():
|
||||
if line.startswith("TimeoutStopUSec="):
|
||||
value = line.split("=", 1)[1].strip()
|
||||
# Try numeric microseconds first
|
||||
if value.isdigit():
|
||||
timeout_us = int(value)
|
||||
else:
|
||||
timeout_us = _parse_systemd_duration_to_us(value)
|
||||
if timeout_us is not None:
|
||||
break
|
||||
if timeout_us is not None:
|
||||
break
|
||||
|
||||
if timeout_us is None:
|
||||
return None
|
||||
|
||||
timeout_stop_sec = timeout_us / 1_000_000.0
|
||||
# systemd needs headroom for: post-interrupt kill, adapter disconnect,
|
||||
# SessionDB close, file unlinks, etc. 30s matches the unit-template
|
||||
# constant in hermes_cli/gateway.py.
|
||||
headroom = 30.0
|
||||
expected = drain_timeout + headroom
|
||||
return {
|
||||
"unit": unit_name,
|
||||
"timeout_stop_sec": timeout_stop_sec,
|
||||
"drain_timeout": drain_timeout,
|
||||
"expected_min": expected,
|
||||
"mismatch": timeout_stop_sec < expected,
|
||||
}
|
||||
|
||||
|
||||
def _parse_systemd_duration_to_us(raw: str) -> Optional[int]:
|
||||
"""Parse 'TimeoutStopUSec=1min 30s' / '90s' style values to microseconds.
|
||||
|
||||
systemd accepts a wide grammar; we cover the common cases (s, ms, min,
|
||||
h) and return None on anything unexpected. Never raises.
|
||||
"""
|
||||
if not raw:
|
||||
return None
|
||||
units = {
|
||||
"us": 1,
|
||||
"ms": 1_000,
|
||||
"s": 1_000_000,
|
||||
"sec": 1_000_000,
|
||||
"min": 60_000_000,
|
||||
"h": 3_600_000_000,
|
||||
"hr": 3_600_000_000,
|
||||
}
|
||||
total_us = 0
|
||||
token = ""
|
||||
digits = ""
|
||||
for ch in raw + " ":
|
||||
if ch.isdigit() or ch == ".":
|
||||
if token:
|
||||
# End previous unit, start new number
|
||||
multiplier = units.get(token.lower())
|
||||
if multiplier is None or not digits:
|
||||
return None
|
||||
try:
|
||||
total_us += int(float(digits) * multiplier)
|
||||
except ValueError:
|
||||
return None
|
||||
digits = ""
|
||||
token = ""
|
||||
digits += ch
|
||||
elif ch.isalpha():
|
||||
token += ch
|
||||
else:
|
||||
if digits and token:
|
||||
multiplier = units.get(token.lower())
|
||||
if multiplier is None:
|
||||
return None
|
||||
try:
|
||||
total_us += int(float(digits) * multiplier)
|
||||
except ValueError:
|
||||
return None
|
||||
digits = ""
|
||||
token = ""
|
||||
elif digits and not token:
|
||||
# Bare number = seconds (rare but valid)
|
||||
try:
|
||||
total_us += int(float(digits) * 1_000_000)
|
||||
except ValueError:
|
||||
return None
|
||||
digits = ""
|
||||
return total_us if total_us > 0 else None
|
||||
250
tests/gateway/test_shutdown_forensics.py
Normal file
250
tests/gateway/test_shutdown_forensics.py
Normal file
|
|
@ -0,0 +1,250 @@
|
|||
"""Tests for gateway.shutdown_forensics — fast snapshot + async diag spawn."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway import shutdown_forensics as sf
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _signal_name
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSignalName:
|
||||
def test_known_signals_resolve_to_names(self):
|
||||
assert sf._signal_name(signal.SIGTERM) == "SIGTERM"
|
||||
assert sf._signal_name(signal.SIGINT) == "SIGINT"
|
||||
|
||||
def test_unknown_int_returns_signal_num_token(self):
|
||||
# Pick an integer extremely unlikely to ever be a real signal alias
|
||||
assert sf._signal_name(9999) == "signal#9999"
|
||||
|
||||
def test_none_returns_unknown(self):
|
||||
assert sf._signal_name(None) == "UNKNOWN"
|
||||
|
||||
def test_non_integer_falls_back_to_str(self):
|
||||
assert sf._signal_name("SIGTERM") == "SIGTERM"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# snapshot_shutdown_context
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSnapshotShutdownContext:
|
||||
def test_includes_self_pid_and_signal(self):
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert ctx["pid"] == os.getpid()
|
||||
assert ctx["signal"] == "SIGTERM"
|
||||
assert ctx["signal_num"] == int(signal.SIGTERM)
|
||||
|
||||
def test_handles_none_signal(self):
|
||||
ctx = sf.snapshot_shutdown_context(None)
|
||||
assert ctx["signal"] == "UNKNOWN"
|
||||
assert ctx["signal_num"] is None
|
||||
|
||||
def test_includes_timestamps(self):
|
||||
before = time.time()
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
after = time.time()
|
||||
assert before <= ctx["ts"] <= after
|
||||
assert isinstance(ctx["ts_monotonic"], float)
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="Linux /proc not present")
|
||||
def test_includes_parent_summary_on_linux(self):
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert "parent" in ctx
|
||||
assert ctx["parent"]["pid"] == os.getppid()
|
||||
|
||||
def test_under_systemd_flag_uses_invocation_id(self, monkeypatch):
|
||||
monkeypatch.setenv("INVOCATION_ID", "abc123")
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert ctx["under_systemd"] is True
|
||||
assert ctx["systemd_invocation_id"] == "abc123"
|
||||
|
||||
def test_under_systemd_false_without_invocation_id_and_normal_ppid(
|
||||
self, monkeypatch
|
||||
):
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
# We can't actually change ppid; skip if we happen to be reaped
|
||||
# by init (e.g. running under tini).
|
||||
if os.getppid() == 1:
|
||||
pytest.skip("test process is reaped by init")
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert ctx["under_systemd"] is False
|
||||
|
||||
def test_completes_quickly(self):
|
||||
"""Snapshot must NOT block — it runs inside the asyncio signal handler."""
|
||||
start = time.monotonic()
|
||||
sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
elapsed = time.monotonic() - start
|
||||
# Generous bound; the function should be sub-millisecond in practice.
|
||||
assert elapsed < 0.5, f"snapshot took {elapsed:.3f}s — too slow"
|
||||
|
||||
def test_detects_takeover_marker_for_self(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
marker = tmp_path / ".gateway-takeover.json"
|
||||
marker.write_text(
|
||||
f'{{"target_pid": {os.getpid()}, "replacer_pid": 99999}}',
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert "takeover_marker" in ctx
|
||||
assert ctx["takeover_marker_for_self"] is True
|
||||
|
||||
def test_detects_takeover_marker_for_other(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
marker = tmp_path / ".gateway-takeover.json"
|
||||
marker.write_text(
|
||||
'{"target_pid": 1, "replacer_pid": 99999}', encoding="utf-8"
|
||||
)
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert ctx["takeover_marker_for_self"] is False
|
||||
|
||||
def test_detects_planned_stop_marker(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
marker = tmp_path / ".gateway-planned-stop.json"
|
||||
marker.write_text(
|
||||
f'{{"target_pid": {os.getpid()}}}', encoding="utf-8"
|
||||
)
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
assert "planned_stop_marker" in ctx
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# format_context_for_log / context_as_json
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFormatters:
|
||||
def test_format_context_for_log_includes_signal_and_parent(self):
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
line = sf.format_context_for_log(ctx)
|
||||
assert "signal=SIGTERM" in line
|
||||
assert "parent_pid=" in line
|
||||
assert "parent_cmdline=" in line
|
||||
|
||||
def test_context_as_json_round_trips(self):
|
||||
ctx = sf.snapshot_shutdown_context(signal.SIGTERM)
|
||||
payload = sf.context_as_json(ctx)
|
||||
decoded = json.loads(payload)
|
||||
assert decoded["pid"] == os.getpid()
|
||||
assert decoded["signal"] == "SIGTERM"
|
||||
|
||||
def test_context_as_json_handles_unserialisable_values(self):
|
||||
ctx = {"signal": "SIGTERM", "weird": object()}
|
||||
payload = sf.context_as_json(ctx)
|
||||
# default=str means objects get repr'd, JSON stays valid
|
||||
decoded = json.loads(payload)
|
||||
assert decoded["signal"] == "SIGTERM"
|
||||
assert "weird" in decoded
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# spawn_async_diagnostic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSpawnAsyncDiagnostic:
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only diagnostic")
|
||||
def test_spawns_subprocess_and_writes_output(self, tmp_path):
|
||||
log_path = tmp_path / "diag.log"
|
||||
pid = sf.spawn_async_diagnostic(log_path, "SIGTERM", timeout_seconds=3.0)
|
||||
assert pid is not None and pid > 0
|
||||
|
||||
# Wait briefly for the subprocess to write — bounded by its own timeout.
|
||||
deadline = time.monotonic() + 5.0
|
||||
while time.monotonic() < deadline:
|
||||
if log_path.exists() and log_path.stat().st_size > 0:
|
||||
# Wait a touch longer for the script to finish writing
|
||||
time.sleep(0.5)
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
# Reap the subprocess so it doesn't show up as a zombie.
|
||||
try:
|
||||
os.waitpid(pid, 0)
|
||||
except (ChildProcessError, OSError):
|
||||
pass
|
||||
|
||||
assert log_path.exists()
|
||||
contents = log_path.read_text(encoding="utf-8", errors="replace")
|
||||
assert "shutdown diagnostic" in contents
|
||||
assert "SIGTERM" in contents
|
||||
|
||||
def test_returns_none_on_windows(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(sf, "sys", type("M", (), {"platform": "win32"})())
|
||||
result = sf.spawn_async_diagnostic(
|
||||
tmp_path / "diag.log", "SIGTERM", timeout_seconds=1.0
|
||||
)
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only diagnostic")
|
||||
def test_handles_unwritable_log_path_gracefully(self, tmp_path):
|
||||
# Point at a nonexistent parent that we can't create
|
||||
log_path = Path("/proc/cant-write-here/diag.log")
|
||||
result = sf.spawn_async_diagnostic(log_path, "SIGTERM", timeout_seconds=1.0)
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only diagnostic")
|
||||
def test_does_not_block_caller(self, tmp_path):
|
||||
"""The spawn must return immediately even if ``ps`` takes seconds."""
|
||||
log_path = tmp_path / "diag.log"
|
||||
start = time.monotonic()
|
||||
sf.spawn_async_diagnostic(log_path, "SIGTERM", timeout_seconds=10.0)
|
||||
elapsed = time.monotonic() - start
|
||||
# Spawning bash in detached mode takes a few ms; anything under 1s
|
||||
# is plenty of headroom and proves we're not waiting on it.
|
||||
assert elapsed < 1.0, f"spawn blocked for {elapsed:.2f}s"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _parse_systemd_duration_to_us
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParseSystemdDuration:
|
||||
def test_seconds(self):
|
||||
assert sf._parse_systemd_duration_to_us("90s") == 90 * 1_000_000
|
||||
|
||||
def test_minutes(self):
|
||||
assert sf._parse_systemd_duration_to_us("3min") == 180 * 1_000_000
|
||||
|
||||
def test_combined_min_sec(self):
|
||||
assert sf._parse_systemd_duration_to_us("1min 30s") == 90 * 1_000_000
|
||||
|
||||
def test_hours(self):
|
||||
assert sf._parse_systemd_duration_to_us("1h") == 3600 * 1_000_000
|
||||
|
||||
def test_milliseconds(self):
|
||||
assert sf._parse_systemd_duration_to_us("500ms") == 500_000
|
||||
|
||||
def test_empty_returns_none(self):
|
||||
assert sf._parse_systemd_duration_to_us("") is None
|
||||
|
||||
def test_unknown_unit_returns_none(self):
|
||||
assert sf._parse_systemd_duration_to_us("90weeks") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# check_systemd_timing_alignment
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCheckSystemdTimingAlignment:
|
||||
def test_returns_none_when_not_under_systemd(self, monkeypatch):
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
result = sf.check_systemd_timing_alignment(180.0)
|
||||
assert result is None
|
||||
|
||||
def test_returns_none_when_unit_undeterminable(self, monkeypatch):
|
||||
monkeypatch.setenv("INVOCATION_ID", "abc")
|
||||
# /proc/self/cgroup likely doesn't end in .service for the test runner
|
||||
result = sf.check_systemd_timing_alignment(180.0)
|
||||
# Either None (we couldn't find a unit) or a dict with mismatch info
|
||||
# for whatever unit pytest IS in. Both are valid; we just ensure
|
||||
# the function doesn't raise.
|
||||
assert result is None or isinstance(result, dict)
|
||||
Loading…
Add table
Add a link
Reference in a new issue