diff --git a/gateway/run.py b/gateway/run.py index 6d04ee81f2a..ba40995b859 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3206,6 +3206,28 @@ class GatewayRunner: except RuntimeError: self._gateway_loop = None logger.info("Session storage: %s", self.config.sessions_dir) + + # Sanity-check that systemd's TimeoutStopSec covers our drain + # window. When the user upgraded hermes-agent without re-running + # ``hermes setup``, their unit file may still encode the old + # default — in which case SIGKILL hits mid-drain and looks like + # a phantom kill in the journal. Best-effort, never raises. + try: + from gateway.shutdown_forensics import check_systemd_timing_alignment + _alignment = check_systemd_timing_alignment(self._restart_drain_timeout) + if _alignment is not None and _alignment.get("mismatch"): + logger.warning( + "Stale systemd unit detected: %s has TimeoutStopSec=%.0fs but " + "drain_timeout=%.0fs (expected >=%.0fs). systemd may SIGKILL the " + "gateway mid-drain. Run `hermes gateway service install --replace` " + "to regenerate the unit, or shorten agent.restart_drain_timeout.", + _alignment.get("unit", "(unknown)"), + _alignment["timeout_stop_sec"], + _alignment["drain_timeout"], + _alignment["expected_min"], + ) + except Exception as _e: + logger.debug("check_systemd_timing_alignment failed: %s", _e) # Log the resolved max_iterations budget so operators can verify the # config.yaml → env bridge did the right thing at a glance (instead # of silently running at a stale .env value for weeks). @@ -4498,15 +4520,34 @@ class GatewayRunner: "Stopping gateway%s...", " for restart" if self._restart_requested else "", ) + _stop_started_at = time.monotonic() + + def _phase_elapsed() -> float: + return time.monotonic() - _stop_started_at + self._running = False self._draining = True # Notify all chats with active agents BEFORE draining. # Adapters are still connected here, so messages can be sent. await self._notify_active_sessions_of_shutdown() + logger.info( + "Shutdown phase: notify_active_sessions done at +%.2fs", + _phase_elapsed(), + ) timeout = self._restart_drain_timeout + _drain_started_at = time.monotonic() active_agents, timed_out = await self._drain_active_agents(timeout) + logger.info( + "Shutdown phase: drain done at +%.2fs (drain took %.2fs, " + "timed_out=%s, active_at_start=%d, active_now=%d)", + _phase_elapsed(), + time.monotonic() - _drain_started_at, + timed_out, + len(active_agents), + self._running_agent_count(), + ) if timed_out: logger.warning( "Gateway drain timed out after %.1fs with %d active agent(s); interrupting remaining work.", @@ -4564,6 +4605,10 @@ class GatewayRunner: # killed by systemd instead of us (issue #8202). The final # catch-all cleanup below still runs for the graceful path. _kill_tool_subprocesses("post-interrupt") + logger.info( + "Shutdown phase: post-interrupt tool kill done at +%.2fs", + _phase_elapsed(), + ) if self._restart_requested and self._restart_detached: try: @@ -4591,15 +4636,29 @@ class GatewayRunner: self._cleanup_agent_resources(_agent) for platform, adapter in list(self.adapters.items()): + _adapter_started_at = time.monotonic() try: await adapter.cancel_background_tasks() except Exception as e: logger.debug("✗ %s background-task cancel error: %s", platform.value, e) try: await adapter.disconnect() - logger.info("✓ %s disconnected", platform.value) + logger.info( + "✓ %s disconnected (%.2fs)", + platform.value, + time.monotonic() - _adapter_started_at, + ) except Exception as e: - logger.error("✗ %s disconnect error: %s", platform.value, e) + logger.error( + "✗ %s disconnect error after %.2fs: %s", + platform.value, + time.monotonic() - _adapter_started_at, + e, + ) + logger.info( + "Shutdown phase: all adapters disconnected at +%.2fs", + _phase_elapsed(), + ) for _task in list(self._background_tasks): if _task is self._stop_task: @@ -4624,6 +4683,10 @@ class GatewayRunner: # that got respawned between the earlier call and adapter # disconnect (defense in depth; safe to call repeatedly). _kill_tool_subprocesses("final-cleanup") + logger.info( + "Shutdown phase: final-cleanup tool kill done at +%.2fs", + _phase_elapsed(), + ) # Reap the process-global auxiliary-client cache once at the very # end of teardown. Per-turn cleanup runs in _cleanup_agent_resources @@ -4651,6 +4714,10 @@ class GatewayRunner: _db.close() except Exception as _e: logger.debug("SessionDB close error: %s", _e) + logger.info( + "Shutdown phase: SessionDB close done at +%.2fs", + _phase_elapsed(), + ) from gateway.status import remove_pid_file, release_gateway_runtime_lock remove_pid_file() @@ -4690,7 +4757,7 @@ class GatewayRunner: self._draining = False self._update_runtime_status("stopped", self._exit_reason) - logger.info("Gateway stopped") + logger.info("Gateway stopped (total teardown %.2fs)", _phase_elapsed()) self._stop_task = asyncio.create_task(_stop_impl()) await self._stop_task @@ -15762,40 +15829,62 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = except Exception as e: logger.debug("Planned stop marker check failed: %s", e) + # Fast (<10ms) snapshot of who's asking us to shut down — runs + # synchronously inside the asyncio signal handler, so we keep it + # purely stdlib + /proc reads, no subprocesses. See PR #15826 + # (May 2026): the previous implementation called `ps aux` here + # synchronously, blocking the event loop for up to 3s while + # adapter teardown couldn't begin. + try: + from gateway.shutdown_forensics import ( + format_context_for_log, + snapshot_shutdown_context, + spawn_async_diagnostic, + ) + _shutdown_ctx = snapshot_shutdown_context(received_signal) + except Exception as _e: + _shutdown_ctx = None + logger.debug("snapshot_shutdown_context failed: %s", _e) + if planned_takeover: logger.info( - "Received SIGTERM as a planned --replace takeover — exiting cleanly" + "Received %s as a planned --replace takeover — exiting cleanly", + _shutdown_ctx["signal"] if _shutdown_ctx else "SIGTERM", ) elif planned_stop: logger.info( - "Received SIGTERM/SIGINT as a planned gateway stop — exiting cleanly" + "Received %s as a planned gateway stop — exiting cleanly", + _shutdown_ctx["signal"] if _shutdown_ctx else "SIGTERM/SIGINT", ) else: _signal_initiated_shutdown = True - logger.info("Received SIGTERM/SIGINT — initiating shutdown") - # Diagnostic: log all hermes-related processes so we can identify - # what triggered the signal (hermes update, hermes gateway restart, - # a stale detached subprocess, etc.). - try: - import subprocess as _sp - _ps = _sp.run( - ["ps", "aux"], - capture_output=True, text=True, timeout=3, + logger.info( + "Received %s — initiating shutdown", + _shutdown_ctx["signal"] if _shutdown_ctx else "SIGTERM/SIGINT", ) - _hermes_procs = [ - line for line in _ps.stdout.splitlines() - if ("hermes" in line.lower() or "gateway" in line.lower()) - and str(os.getpid()) not in line.split()[1:2] # exclude self - ] - if _hermes_procs: + + # Always log who/what triggered the signal — most useful single + # line when diagnosing "the gateway keeps dying" tickets. Format + # is one line, key=value, parent_cmdline last (often long). + if _shutdown_ctx is not None: + try: logger.warning( - "Shutdown diagnostic — other hermes processes running:\n %s", - "\n ".join(_hermes_procs), + "Shutdown context: %s", format_context_for_log(_shutdown_ctx) ) - else: - logger.info("Shutdown diagnostic — no other hermes processes found") - except Exception: - pass + except Exception as _e: + logger.debug("format_context_for_log failed: %s", _e) + + # Spawn the heavyweight diagnostic (ps auxf, pstree, dmesg) in + # a detached subprocess so it can finish writing to disk even + # if our cgroup is being torn down. Bounded by an internal + # timeout; never blocks the event loop here. + try: + _diag_log = _hermes_home / "logs" / "gateway-shutdown-diag.log" + spawn_async_diagnostic( + _diag_log, _shutdown_ctx["signal"], timeout_seconds=5.0 + ) + except Exception as _e: + logger.debug("spawn_async_diagnostic failed: %s", _e) asyncio.create_task(runner.stop()) def restart_signal_handler(): diff --git a/gateway/shutdown_forensics.py b/gateway/shutdown_forensics.py new file mode 100644 index 00000000000..9f102f24f80 --- /dev/null +++ b/gateway/shutdown_forensics.py @@ -0,0 +1,463 @@ +"""Shutdown forensics — capture context when the gateway receives SIGTERM/SIGINT. + +The gateway's ``shutdown_signal_handler`` runs synchronously inside the +asyncio event loop. We can't safely block it for long, but we DO want a +durable record of who/what triggered the shutdown so that "the gateway +keeps dying" incidents can be diagnosed after the fact. + +This module exposes :func:`snapshot_shutdown_context`, a fast (<10ms), +non-blocking probe that returns a structured dict the signal handler can +log immediately, plus :func:`spawn_async_diagnostic`, a fire-and-forget +``ps`` walk that runs as a detached subprocess so it can't block teardown +even if /proc is wedged. + +Anything that needs to wait (e.g. shelling out to ``ps aux``) belongs in +the async helper, never in the synchronous probe. +""" + +from __future__ import annotations + +import json +import os +import signal +import subprocess +import sys +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + + +_SIGNAL_NAME_BY_NUM: Dict[int, str] = {} +for _name in ("SIGTERM", "SIGINT", "SIGHUP", "SIGQUIT", "SIGUSR1", "SIGUSR2"): + _val = getattr(signal, _name, None) + if _val is not None: + _SIGNAL_NAME_BY_NUM[int(_val)] = _name + + +def _signal_name(sig: Any) -> str: + """Return a human-readable signal name (or ``str(sig)`` as fallback).""" + if sig is None: + return "UNKNOWN" + try: + sig_int = int(sig) + except (TypeError, ValueError): + return str(sig) + return _SIGNAL_NAME_BY_NUM.get(sig_int, f"signal#{sig_int}") + + +def _read_proc_field(pid: int, key: str) -> Optional[str]: + """Read a single field from /proc//status. Linux only; None elsewhere.""" + try: + with open(f"/proc/{pid}/status", encoding="utf-8") as fh: + for line in fh: + if line.startswith(key + ":"): + return line.split(":", 1)[1].strip() + except (FileNotFoundError, PermissionError, OSError): + pass + return None + + +def _read_proc_cmdline(pid: int) -> Optional[str]: + """Read /proc//cmdline as a printable string. Linux only; None elsewhere.""" + try: + with open(f"/proc/{pid}/cmdline", "rb") as fh: + data = fh.read() + except (FileNotFoundError, PermissionError, OSError): + return None + if not data: + return None + # cmdline uses NUL separators + return data.replace(b"\x00", b" ").decode("utf-8", errors="replace").strip() + + +def _proc_summary(pid: int) -> Dict[str, Any]: + """Compact /proc/ snapshot: pid, ppid, state, uid, cmdline. + + Best-effort. Missing fields are simply omitted rather than raising. + """ + summary: Dict[str, Any] = {"pid": pid} + if pid <= 0: + return summary + name = _read_proc_field(pid, "Name") + if name is not None: + summary["name"] = name + state = _read_proc_field(pid, "State") + if state is not None: + summary["state"] = state + ppid = _read_proc_field(pid, "PPid") + if ppid is not None: + try: + summary["ppid"] = int(ppid) + except ValueError: + pass + uid = _read_proc_field(pid, "Uid") + if uid is not None: + # "real effective saved fs" + summary["uid"] = uid.split()[0] if uid else uid + cmdline = _read_proc_cmdline(pid) + if cmdline: + # Truncate aggressively — these can be 4KB + summary["cmdline"] = cmdline[:300] + return summary + + +def snapshot_shutdown_context(received_signal: Any = None) -> Dict[str, Any]: + """Fast (<10ms) snapshot of who/what is asking us to shut down. + + Captures: + + * The signal number/name (so SIGINT vs SIGTERM is visible) + * Our own PID/ppid + parent process info from /proc (Linux) + * Whether systemd is our parent (``ppid==1`` or ``INVOCATION_ID`` set) + * Whether takeover/planned-stop markers exist (consumed lazily by the caller) + * /proc/self limits + load average (1-min) + * Wall-clock and monotonic timestamps for cross-correlating later phases + + Pure stdlib, never raises, never blocks on subprocesses. + """ + now = time.time() + monotonic = time.monotonic() + pid = os.getpid() + ppid = os.getppid() + + ctx: Dict[str, Any] = { + "ts": now, + "ts_monotonic": monotonic, + "signal": _signal_name(received_signal), + "signal_num": int(received_signal) if received_signal is not None else None, + "pid": pid, + "ppid": ppid, + "parent": _proc_summary(ppid), + "self": _proc_summary(pid), + } + + # systemd context. If we were started by a systemd unit, INVOCATION_ID + # is set in our env. ppid==1 (init) is also a strong signal that + # systemd reaped+forwarded the SIGTERM. + invocation_id = os.environ.get("INVOCATION_ID") + if invocation_id: + ctx["systemd_invocation_id"] = invocation_id + journal_stream = os.environ.get("JOURNAL_STREAM") + if journal_stream: + ctx["systemd_journal_stream"] = journal_stream + ctx["under_systemd"] = bool(invocation_id) or ppid == 1 + + # Load average — high load points the finger at "something else + # crushing the box" rather than "external killer". + try: + ctx["loadavg_1m"] = os.getloadavg()[0] + except (OSError, AttributeError): + pass + + # /proc/self/status TracerPid: nonzero means a debugger / strace is + # attached. Useful when "phantom SIGKILL" turns out to be a manual + # gdb session. + try: + tracer = _read_proc_field(pid, "TracerPid") + if tracer is not None and tracer != "0": + ctx["tracer_pid"] = int(tracer) if tracer.isdigit() else tracer + ctx["tracer"] = _proc_summary(int(tracer)) if tracer.isdigit() else None + except (TypeError, ValueError): + pass + + # Race-detection hint: did somebody recently start a sibling gateway + # with --replace? We can't see the new process directly here, but if + # there's a takeover marker on disk that DOESN'T name us, that's a + # smoking gun for "another --replace instance is killing us". + # Filenames mirror gateway.status (._TAKEOVER_MARKER_FILENAME / + # _PLANNED_STOP_MARKER_FILENAME); we use string literals here so the + # signal-handler path stays import-light. + try: + hermes_home_str = os.environ.get("HERMES_HOME") + if hermes_home_str: + takeover_path = Path(hermes_home_str) / ".gateway-takeover.json" + if takeover_path.exists(): + try: + raw = takeover_path.read_text(encoding="utf-8") + ctx["takeover_marker"] = raw[:300] + ctx["takeover_marker_for_self"] = ( + f'"target_pid": {pid}' in raw + or f"'target_pid': {pid}" in raw + ) + except OSError: + pass + planned_stop_path = Path(hermes_home_str) / ".gateway-planned-stop.json" + if planned_stop_path.exists(): + try: + raw = planned_stop_path.read_text(encoding="utf-8") + ctx["planned_stop_marker"] = raw[:300] + except OSError: + pass + except Exception: # noqa: BLE001 — never raise from a signal handler + pass + + return ctx + + +def spawn_async_diagnostic( + log_path: Path, + signal_name: str, + *, + timeout_seconds: float = 5.0, +) -> Optional[int]: + """Fire-and-forget ``ps``-style snapshot written to ``log_path``. + + Runs as a detached subprocess so it can't block the asyncio event loop + or compete with platform teardown. The subprocess uses its own + ``timeout`` so a wedged ``ps`` still self-cleans within + ``timeout_seconds``. + + Returns the subprocess PID on success, ``None`` on failure. Never + raises. + + We deliberately avoid ``subprocess.run(["ps", "aux"])`` from inside the + signal handler (the pre-existing pattern): on a busy host with hundreds + of processes, ``ps aux`` can take >2s to walk /proc, during which the + asyncio loop is frozen and adapter teardown can't begin. + """ + try: + log_path.parent.mkdir(parents=True, exist_ok=True) + except OSError: + return None + + # Inline shell so we don't have to ship a helper script. bash -c is + # available on every POSIX target we support; on Windows we just skip + # the snapshot (the platform doesn't ship ps anyway). + if sys.platform == "win32": + return None + + script = ( + f"echo '=== shutdown diagnostic @ {signal_name} ==='; " + "echo '--- date ---'; date -u +%Y-%m-%dT%H:%M:%SZ; " + "echo '--- ps auxf (top 60 by cpu) ---'; " + "ps auxf --sort=-pcpu 2>/dev/null | head -60; " + "echo '--- pstree of self ---'; " + f"pstree -plau {os.getpid()} 2>/dev/null | head -40 || true; " + "echo '--- /proc/loadavg ---'; " + "cat /proc/loadavg 2>/dev/null || true; " + "echo '--- recent dmesg (oom/killed) ---'; " + "dmesg -T 2>/dev/null | tail -20 || journalctl --user -n 20 --no-pager 2>/dev/null | tail -20 || true; " + "echo '=== end ==='" + ) + + try: + # Open the log file in append mode and let the subprocess inherit. + # We use os.O_APPEND so concurrent diagnostics from rapid signals + # don't trample each other. + fd = os.open(str(log_path), os.O_WRONLY | os.O_CREAT | os.O_APPEND, 0o644) + except OSError: + return None + + try: + # Detach from our process group so the subprocess survives even + # if systemd kills our cgroup with KillMode=control-group (which + # would also reap us anyway, but defense in depth). Without + # start_new_session, a SIGKILL on our cgroup takes the diag down + # before it can flush. + proc = subprocess.Popen( + ["timeout", f"{timeout_seconds:.0f}", "bash", "-c", script], + stdout=fd, + stderr=subprocess.STDOUT, + stdin=subprocess.DEVNULL, + start_new_session=True, + close_fds=True, + ) + except (FileNotFoundError, OSError): + try: + os.close(fd) + except OSError: + pass + return None + finally: + # Subprocess inherited the fd; we can drop our handle. + try: + os.close(fd) + except OSError: + pass + + return proc.pid + + +def format_context_for_log(ctx: Dict[str, Any]) -> str: + """Render a shutdown context dict as a single, scannable log line.""" + sig = ctx.get("signal", "?") + parent = ctx.get("parent") or {} + parent_cmd = parent.get("cmdline", "(unknown)") + parent_name = parent.get("name") or "?" + parent_pid = parent.get("pid") or "?" + under_systemd = "yes" if ctx.get("under_systemd") else "no" + load = ctx.get("loadavg_1m") + load_str = f"{load:.2f}" if isinstance(load, (int, float)) else "?" + extras: List[str] = [] + if ctx.get("takeover_marker") is not None: + for_self = ctx.get("takeover_marker_for_self") + extras.append( + f"takeover_marker_present={'self' if for_self else 'other'}" + ) + if ctx.get("planned_stop_marker") is not None: + extras.append("planned_stop_marker_present=yes") + if ctx.get("tracer_pid"): + extras.append(f"tracer_pid={ctx['tracer_pid']}") + extras_str = (" " + " ".join(extras)) if extras else "" + # Parent cmdline is the most useful single signal — log it prominently. + return ( + f"signal={sig} " + f"under_systemd={under_systemd} " + f"parent_pid={parent_pid} " + f"parent_name={parent_name} " + f"loadavg_1m={load_str}" + f"{extras_str} " + f"parent_cmdline={parent_cmd!r}" + ) + + +def context_as_json(ctx: Dict[str, Any]) -> str: + """JSON-serialise a context dict for structured ingestion. Never raises.""" + try: + return json.dumps(ctx, default=str, sort_keys=True) + except (TypeError, ValueError): + return "{}" + + +def check_systemd_timing_alignment(drain_timeout: float) -> Optional[Dict[str, Any]]: + """At startup, sanity-check that systemd's TimeoutStopSec >= drain_timeout. + + When the gateway is run under a stale systemd unit file (e.g. the user + upgraded hermes-agent but never re-ran ``hermes setup`` to regenerate + the unit), ``TimeoutStopSec`` can be smaller than the configured + ``restart_drain_timeout``. Result: SIGTERM arrives, the drain starts, + and systemd SIGKILLs the cgroup mid-drain — looks like a phantom kill + in the journal because the journal only logs ``code=killed status=9``. + + Returns ``None`` when the alignment is fine OR we can't determine it + (not running under systemd, ``systemctl`` unavailable, etc.). Returns + a dict with ``timeout_stop_sec`` + ``drain_timeout`` + ``mismatch`` + bool when we have data to report. + + Best-effort. Never raises. + """ + invocation_id = os.environ.get("INVOCATION_ID") + if not invocation_id: + return None # Not running under systemd (or at least not directly) + + # Try to identify our unit name and ask systemctl for its config. + unit_name: Optional[str] = None + try: + # /proc/self/cgroup gives us "0::/user.slice/.../hermes-gateway.service" + with open("/proc/self/cgroup", encoding="utf-8") as fh: + for line in fh: + # systemd cgroup line ends with the unit name + if ".service" in line: + parts = line.strip().split("/") + for p in reversed(parts): + if p.endswith(".service"): + unit_name = p + break + if unit_name: + break + except (OSError, FileNotFoundError): + pass + if not unit_name: + return None + + # Query systemctl for TimeoutStopUSec. Use --user OR system depending + # on which manager actually owns the unit. Try user first since + # that's the common case for hermes. + timeout_us: Optional[int] = None + for flag in (["--user"], []): + try: + result = subprocess.run( + ["systemctl", *flag, "show", unit_name, "--property=TimeoutStopUSec"], + capture_output=True, text=True, timeout=2.0, + ) + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + continue + if result.returncode != 0: + continue + # Output: "TimeoutStopUSec=1min 30s" or "TimeoutStopUSec=90000000" + for line in result.stdout.splitlines(): + if line.startswith("TimeoutStopUSec="): + value = line.split("=", 1)[1].strip() + # Try numeric microseconds first + if value.isdigit(): + timeout_us = int(value) + else: + timeout_us = _parse_systemd_duration_to_us(value) + if timeout_us is not None: + break + if timeout_us is not None: + break + + if timeout_us is None: + return None + + timeout_stop_sec = timeout_us / 1_000_000.0 + # systemd needs headroom for: post-interrupt kill, adapter disconnect, + # SessionDB close, file unlinks, etc. 30s matches the unit-template + # constant in hermes_cli/gateway.py. + headroom = 30.0 + expected = drain_timeout + headroom + return { + "unit": unit_name, + "timeout_stop_sec": timeout_stop_sec, + "drain_timeout": drain_timeout, + "expected_min": expected, + "mismatch": timeout_stop_sec < expected, + } + + +def _parse_systemd_duration_to_us(raw: str) -> Optional[int]: + """Parse 'TimeoutStopUSec=1min 30s' / '90s' style values to microseconds. + + systemd accepts a wide grammar; we cover the common cases (s, ms, min, + h) and return None on anything unexpected. Never raises. + """ + if not raw: + return None + units = { + "us": 1, + "ms": 1_000, + "s": 1_000_000, + "sec": 1_000_000, + "min": 60_000_000, + "h": 3_600_000_000, + "hr": 3_600_000_000, + } + total_us = 0 + token = "" + digits = "" + for ch in raw + " ": + if ch.isdigit() or ch == ".": + if token: + # End previous unit, start new number + multiplier = units.get(token.lower()) + if multiplier is None or not digits: + return None + try: + total_us += int(float(digits) * multiplier) + except ValueError: + return None + digits = "" + token = "" + digits += ch + elif ch.isalpha(): + token += ch + else: + if digits and token: + multiplier = units.get(token.lower()) + if multiplier is None: + return None + try: + total_us += int(float(digits) * multiplier) + except ValueError: + return None + digits = "" + token = "" + elif digits and not token: + # Bare number = seconds (rare but valid) + try: + total_us += int(float(digits) * 1_000_000) + except ValueError: + return None + digits = "" + return total_us if total_us > 0 else None diff --git a/tests/gateway/test_shutdown_forensics.py b/tests/gateway/test_shutdown_forensics.py new file mode 100644 index 00000000000..23e3d95fb88 --- /dev/null +++ b/tests/gateway/test_shutdown_forensics.py @@ -0,0 +1,250 @@ +"""Tests for gateway.shutdown_forensics — fast snapshot + async diag spawn.""" + +from __future__ import annotations + +import json +import os +import signal +import sys +import time +from pathlib import Path + +import pytest + +from gateway import shutdown_forensics as sf + + +# --------------------------------------------------------------------------- +# _signal_name +# --------------------------------------------------------------------------- + +class TestSignalName: + def test_known_signals_resolve_to_names(self): + assert sf._signal_name(signal.SIGTERM) == "SIGTERM" + assert sf._signal_name(signal.SIGINT) == "SIGINT" + + def test_unknown_int_returns_signal_num_token(self): + # Pick an integer extremely unlikely to ever be a real signal alias + assert sf._signal_name(9999) == "signal#9999" + + def test_none_returns_unknown(self): + assert sf._signal_name(None) == "UNKNOWN" + + def test_non_integer_falls_back_to_str(self): + assert sf._signal_name("SIGTERM") == "SIGTERM" + + +# --------------------------------------------------------------------------- +# snapshot_shutdown_context +# --------------------------------------------------------------------------- + +class TestSnapshotShutdownContext: + def test_includes_self_pid_and_signal(self): + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert ctx["pid"] == os.getpid() + assert ctx["signal"] == "SIGTERM" + assert ctx["signal_num"] == int(signal.SIGTERM) + + def test_handles_none_signal(self): + ctx = sf.snapshot_shutdown_context(None) + assert ctx["signal"] == "UNKNOWN" + assert ctx["signal_num"] is None + + def test_includes_timestamps(self): + before = time.time() + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + after = time.time() + assert before <= ctx["ts"] <= after + assert isinstance(ctx["ts_monotonic"], float) + + @pytest.mark.skipif(sys.platform == "win32", reason="Linux /proc not present") + def test_includes_parent_summary_on_linux(self): + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert "parent" in ctx + assert ctx["parent"]["pid"] == os.getppid() + + def test_under_systemd_flag_uses_invocation_id(self, monkeypatch): + monkeypatch.setenv("INVOCATION_ID", "abc123") + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert ctx["under_systemd"] is True + assert ctx["systemd_invocation_id"] == "abc123" + + def test_under_systemd_false_without_invocation_id_and_normal_ppid( + self, monkeypatch + ): + monkeypatch.delenv("INVOCATION_ID", raising=False) + # We can't actually change ppid; skip if we happen to be reaped + # by init (e.g. running under tini). + if os.getppid() == 1: + pytest.skip("test process is reaped by init") + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert ctx["under_systemd"] is False + + def test_completes_quickly(self): + """Snapshot must NOT block — it runs inside the asyncio signal handler.""" + start = time.monotonic() + sf.snapshot_shutdown_context(signal.SIGTERM) + elapsed = time.monotonic() - start + # Generous bound; the function should be sub-millisecond in practice. + assert elapsed < 0.5, f"snapshot took {elapsed:.3f}s — too slow" + + def test_detects_takeover_marker_for_self(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker = tmp_path / ".gateway-takeover.json" + marker.write_text( + f'{{"target_pid": {os.getpid()}, "replacer_pid": 99999}}', + encoding="utf-8", + ) + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert "takeover_marker" in ctx + assert ctx["takeover_marker_for_self"] is True + + def test_detects_takeover_marker_for_other(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker = tmp_path / ".gateway-takeover.json" + marker.write_text( + '{"target_pid": 1, "replacer_pid": 99999}', encoding="utf-8" + ) + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert ctx["takeover_marker_for_self"] is False + + def test_detects_planned_stop_marker(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker = tmp_path / ".gateway-planned-stop.json" + marker.write_text( + f'{{"target_pid": {os.getpid()}}}', encoding="utf-8" + ) + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + assert "planned_stop_marker" in ctx + + +# --------------------------------------------------------------------------- +# format_context_for_log / context_as_json +# --------------------------------------------------------------------------- + +class TestFormatters: + def test_format_context_for_log_includes_signal_and_parent(self): + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + line = sf.format_context_for_log(ctx) + assert "signal=SIGTERM" in line + assert "parent_pid=" in line + assert "parent_cmdline=" in line + + def test_context_as_json_round_trips(self): + ctx = sf.snapshot_shutdown_context(signal.SIGTERM) + payload = sf.context_as_json(ctx) + decoded = json.loads(payload) + assert decoded["pid"] == os.getpid() + assert decoded["signal"] == "SIGTERM" + + def test_context_as_json_handles_unserialisable_values(self): + ctx = {"signal": "SIGTERM", "weird": object()} + payload = sf.context_as_json(ctx) + # default=str means objects get repr'd, JSON stays valid + decoded = json.loads(payload) + assert decoded["signal"] == "SIGTERM" + assert "weird" in decoded + + +# --------------------------------------------------------------------------- +# spawn_async_diagnostic +# --------------------------------------------------------------------------- + +class TestSpawnAsyncDiagnostic: + @pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only diagnostic") + def test_spawns_subprocess_and_writes_output(self, tmp_path): + log_path = tmp_path / "diag.log" + pid = sf.spawn_async_diagnostic(log_path, "SIGTERM", timeout_seconds=3.0) + assert pid is not None and pid > 0 + + # Wait briefly for the subprocess to write — bounded by its own timeout. + deadline = time.monotonic() + 5.0 + while time.monotonic() < deadline: + if log_path.exists() and log_path.stat().st_size > 0: + # Wait a touch longer for the script to finish writing + time.sleep(0.5) + break + time.sleep(0.1) + + # Reap the subprocess so it doesn't show up as a zombie. + try: + os.waitpid(pid, 0) + except (ChildProcessError, OSError): + pass + + assert log_path.exists() + contents = log_path.read_text(encoding="utf-8", errors="replace") + assert "shutdown diagnostic" in contents + assert "SIGTERM" in contents + + def test_returns_none_on_windows(self, tmp_path, monkeypatch): + monkeypatch.setattr(sf, "sys", type("M", (), {"platform": "win32"})()) + result = sf.spawn_async_diagnostic( + tmp_path / "diag.log", "SIGTERM", timeout_seconds=1.0 + ) + assert result is None + + @pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only diagnostic") + def test_handles_unwritable_log_path_gracefully(self, tmp_path): + # Point at a nonexistent parent that we can't create + log_path = Path("/proc/cant-write-here/diag.log") + result = sf.spawn_async_diagnostic(log_path, "SIGTERM", timeout_seconds=1.0) + assert result is None + + @pytest.mark.skipif(sys.platform == "win32", reason="POSIX-only diagnostic") + def test_does_not_block_caller(self, tmp_path): + """The spawn must return immediately even if ``ps`` takes seconds.""" + log_path = tmp_path / "diag.log" + start = time.monotonic() + sf.spawn_async_diagnostic(log_path, "SIGTERM", timeout_seconds=10.0) + elapsed = time.monotonic() - start + # Spawning bash in detached mode takes a few ms; anything under 1s + # is plenty of headroom and proves we're not waiting on it. + assert elapsed < 1.0, f"spawn blocked for {elapsed:.2f}s" + + +# --------------------------------------------------------------------------- +# _parse_systemd_duration_to_us +# --------------------------------------------------------------------------- + +class TestParseSystemdDuration: + def test_seconds(self): + assert sf._parse_systemd_duration_to_us("90s") == 90 * 1_000_000 + + def test_minutes(self): + assert sf._parse_systemd_duration_to_us("3min") == 180 * 1_000_000 + + def test_combined_min_sec(self): + assert sf._parse_systemd_duration_to_us("1min 30s") == 90 * 1_000_000 + + def test_hours(self): + assert sf._parse_systemd_duration_to_us("1h") == 3600 * 1_000_000 + + def test_milliseconds(self): + assert sf._parse_systemd_duration_to_us("500ms") == 500_000 + + def test_empty_returns_none(self): + assert sf._parse_systemd_duration_to_us("") is None + + def test_unknown_unit_returns_none(self): + assert sf._parse_systemd_duration_to_us("90weeks") is None + + +# --------------------------------------------------------------------------- +# check_systemd_timing_alignment +# --------------------------------------------------------------------------- + +class TestCheckSystemdTimingAlignment: + def test_returns_none_when_not_under_systemd(self, monkeypatch): + monkeypatch.delenv("INVOCATION_ID", raising=False) + result = sf.check_systemd_timing_alignment(180.0) + assert result is None + + def test_returns_none_when_unit_undeterminable(self, monkeypatch): + monkeypatch.setenv("INVOCATION_ID", "abc") + # /proc/self/cgroup likely doesn't end in .service for the test runner + result = sf.check_systemd_timing_alignment(180.0) + # Either None (we couldn't find a unit) or a dict with mismatch info + # for whatever unit pytest IS in. Both are valid; we just ensure + # the function doesn't raise. + assert result is None or isinstance(result, dict)