fix(gateway): handle planned service stops

This commit is contained in:
helix4u 2026-05-04 13:35:23 -06:00 committed by Teknium
parent 20428f5e60
commit b632290166
5 changed files with 286 additions and 72 deletions

View file

@ -15003,15 +15003,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
runner = GatewayRunner(config) runner = GatewayRunner(config)
# Track whether a signal initiated the shutdown (vs. internal request). # Track whether an unexpected signal initiated the shutdown. When an
# When an unexpected SIGTERM kills the gateway, we exit non-zero so # unexpected SIGTERM kills the gateway, we exit non-zero so service
# systemd's Restart=on-failure revives the process. systemctl stop # managers can revive the process. Planned stop paths write a marker
# is safe: systemd tracks stop-requested state independently of exit # before signalling us so they can exit cleanly instead.
# code, so Restart= never fires for a deliberate stop.
_signal_initiated_shutdown = False _signal_initiated_shutdown = False
# Set up signal handlers # Set up signal handlers
def shutdown_signal_handler(): def shutdown_signal_handler(received_signal=None):
nonlocal _signal_initiated_shutdown nonlocal _signal_initiated_shutdown
# Planned --replace takeover check: when a sibling gateway is # Planned --replace takeover check: when a sibling gateway is
# taking over via --replace, it wrote a marker naming this PID # taking over via --replace, it wrote a marker naming this PID
@ -15027,10 +15026,28 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
except Exception as e: except Exception as e:
logger.debug("Takeover marker check failed: %s", e) logger.debug("Takeover marker check failed: %s", e)
# Planned stop check: service managers and `hermes gateway stop`
# also send SIGTERM, which is indistinguishable from an unexpected
# external kill unless the CLI marks it first. SIGINT comes from an
# interactive Ctrl+C and is likewise an intentional foreground stop.
planned_stop = False
if received_signal == signal.SIGINT:
planned_stop = True
elif not planned_takeover:
try:
from gateway.status import consume_planned_stop_marker_for_self
planned_stop = consume_planned_stop_marker_for_self()
except Exception as e:
logger.debug("Planned stop marker check failed: %s", e)
if planned_takeover: if planned_takeover:
logger.info( logger.info(
"Received SIGTERM as a planned --replace takeover — exiting cleanly" "Received SIGTERM as a planned --replace takeover — exiting cleanly"
) )
elif planned_stop:
logger.info(
"Received SIGTERM/SIGINT as a planned gateway stop — exiting cleanly"
)
else: else:
_signal_initiated_shutdown = True _signal_initiated_shutdown = True
logger.info("Received SIGTERM/SIGINT — initiating shutdown") logger.info("Received SIGTERM/SIGINT — initiating shutdown")
@ -15066,7 +15083,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
if threading.current_thread() is threading.main_thread(): if threading.current_thread() is threading.main_thread():
for sig in (signal.SIGINT, signal.SIGTERM): for sig in (signal.SIGINT, signal.SIGTERM):
try: try:
loop.add_signal_handler(sig, shutdown_signal_handler) loop.add_signal_handler(sig, shutdown_signal_handler, sig)
except NotImplementedError: except NotImplementedError:
pass pass
if hasattr(signal, "SIGUSR1"): if hasattr(signal, "SIGUSR1"):
@ -15164,14 +15181,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
if runner.exit_code is not None: if runner.exit_code is not None:
raise SystemExit(runner.exit_code) raise SystemExit(runner.exit_code)
# When a signal (SIGTERM/SIGINT) caused the shutdown and it wasn't a # When an unexpected SIGTERM caused the shutdown and it wasn't a planned
# planned restart (/restart, /update, SIGUSR1), exit non-zero so # restart (/restart, /update, SIGUSR1), exit non-zero so systemd's
# systemd's Restart=on-failure revives the process. This covers: # Restart=on-failure revives the process. This covers:
# - hermes update killing the gateway mid-work # - hermes update killing the gateway mid-work
# - External kill commands # - External kill commands
# - WSL2/container runtime sending unexpected signals # - WSL2/container runtime sending unexpected signals
# systemctl stop is safe: systemd tracks "stop requested" state # `hermes gateway stop` and interactive Ctrl+C are handled above as
# independently of exit code, so Restart= never fires for it. # planned stops and should not trigger service-manager revival.
if _signal_initiated_shutdown and not runner._restart_requested: if _signal_initiated_shutdown and not runner._restart_requested:
logger.info( logger.info(
"Exiting with code 1 (signal-initiated shutdown without restart " "Exiting with code 1 (signal-initiated shutdown without restart "

View file

@ -637,6 +637,8 @@ def release_all_scoped_locks(
_TAKEOVER_MARKER_FILENAME = ".gateway-takeover.json" _TAKEOVER_MARKER_FILENAME = ".gateway-takeover.json"
_TAKEOVER_MARKER_TTL_S = 60 # Marker older than this is treated as stale _TAKEOVER_MARKER_TTL_S = 60 # Marker older than this is treated as stale
_PLANNED_STOP_MARKER_FILENAME = ".gateway-planned-stop.json"
_PLANNED_STOP_MARKER_TTL_S = 60
def _get_takeover_marker_path() -> Path: def _get_takeover_marker_path() -> Path:
@ -645,6 +647,67 @@ def _get_takeover_marker_path() -> Path:
return home / _TAKEOVER_MARKER_FILENAME return home / _TAKEOVER_MARKER_FILENAME
def _get_planned_stop_marker_path() -> Path:
"""Return the path to the intentional gateway stop marker file."""
home = get_hermes_home()
return home / _PLANNED_STOP_MARKER_FILENAME
def _marker_is_stale(written_at: str, ttl_s: int) -> bool:
try:
written_dt = datetime.fromisoformat(written_at)
age = (datetime.now(timezone.utc) - written_dt).total_seconds()
return age > ttl_s
except (TypeError, ValueError):
return True
def _consume_pid_marker_for_self(
path: Path,
*,
pid_field: str,
start_time_field: str,
ttl_s: int,
) -> bool:
record = _read_json_file(path)
if not record:
return False
try:
target_pid = int(record[pid_field])
target_start_time = record.get(start_time_field)
written_at = record.get("written_at") or ""
except (KeyError, TypeError, ValueError):
try:
path.unlink(missing_ok=True)
except OSError:
pass
return False
if _marker_is_stale(written_at, ttl_s):
try:
path.unlink(missing_ok=True)
except OSError:
pass
return False
our_pid = os.getpid()
our_start_time = _get_process_start_time(our_pid)
matches = (
target_pid == our_pid
and target_start_time is not None
and our_start_time is not None
and target_start_time == our_start_time
)
try:
path.unlink(missing_ok=True)
except OSError:
pass
return matches
def write_takeover_marker(target_pid: int) -> bool: def write_takeover_marker(target_pid: int) -> bool:
"""Record that ``target_pid`` is being replaced by the current process. """Record that ``target_pid`` is being replaced by the current process.
@ -681,59 +744,13 @@ def consume_takeover_marker_for_self() -> bool:
Always unlinks the marker on match (and on detected staleness) so Always unlinks the marker on match (and on detected staleness) so
subsequent unrelated signals don't re-trigger. subsequent unrelated signals don't re-trigger.
""" """
path = _get_takeover_marker_path() return _consume_pid_marker_for_self(
record = _read_json_file(path) _get_takeover_marker_path(),
if not record: pid_field="target_pid",
return False start_time_field="target_start_time",
ttl_s=_TAKEOVER_MARKER_TTL_S,
# Any malformed or stale marker → drop it and return False
try:
target_pid = int(record["target_pid"])
target_start_time = record.get("target_start_time")
written_at = record.get("written_at") or ""
except (KeyError, TypeError, ValueError):
try:
path.unlink(missing_ok=True)
except OSError:
pass
return False
# TTL guard: a stale marker older than _TAKEOVER_MARKER_TTL_S is ignored.
stale = False
try:
written_dt = datetime.fromisoformat(written_at)
age = (datetime.now(timezone.utc) - written_dt).total_seconds()
if age > _TAKEOVER_MARKER_TTL_S:
stale = True
except (TypeError, ValueError):
stale = True # Unparseable timestamp — treat as stale
if stale:
try:
path.unlink(missing_ok=True)
except OSError:
pass
return False
# Does the marker name THIS process?
our_pid = os.getpid()
our_start_time = _get_process_start_time(our_pid)
matches = (
target_pid == our_pid
and target_start_time is not None
and our_start_time is not None
and target_start_time == our_start_time
) )
# Consume the marker whether it matched or not — a marker that doesn't
# match our identity is stale-for-us anyway.
try:
path.unlink(missing_ok=True)
except OSError:
pass
return matches
def clear_takeover_marker() -> None: def clear_takeover_marker() -> None:
"""Remove the takeover marker unconditionally. Safe to call repeatedly.""" """Remove the takeover marker unconditionally. Safe to call repeatedly."""
@ -743,6 +760,45 @@ def clear_takeover_marker() -> None:
pass pass
def write_planned_stop_marker(target_pid: int) -> bool:
"""Record that ``target_pid`` is being stopped intentionally.
The gateway exits non-zero for unexpected SIGTERM so service managers can
revive it. Service stop commands send the same SIGTERM, so the CLI writes
this short-lived marker first to let the target process exit cleanly.
"""
try:
target_start_time = _get_process_start_time(target_pid)
record = {
"target_pid": target_pid,
"target_start_time": target_start_time,
"stopper_pid": os.getpid(),
"written_at": _utc_now_iso(),
}
_write_json_file(_get_planned_stop_marker_path(), record)
return True
except (OSError, PermissionError):
return False
def consume_planned_stop_marker_for_self() -> bool:
"""Return True when the current process is being intentionally stopped."""
return _consume_pid_marker_for_self(
_get_planned_stop_marker_path(),
pid_field="target_pid",
start_time_field="target_start_time",
ttl_s=_PLANNED_STOP_MARKER_TTL_S,
)
def clear_planned_stop_marker() -> None:
"""Remove the planned-stop marker unconditionally."""
try:
_get_planned_stop_marker_path().unlink(missing_ok=True)
except OSError:
pass
def get_running_pid( def get_running_pid(
pid_path: Optional[Path] = None, pid_path: Optional[Path] = None,
*, *,

View file

@ -785,6 +785,12 @@ def stop_profile_gateway() -> bool:
if pid is None: if pid is None:
return False return False
try:
from gateway.status import write_planned_stop_marker
write_planned_stop_marker(pid)
except Exception:
pass
try: try:
os.kill(pid, signal.SIGTERM) os.kill(pid, signal.SIGTERM)
except ProcessLookupError: except ProcessLookupError:
@ -2043,6 +2049,13 @@ def systemd_stop(system: bool = False):
if system: if system:
_require_root_for_system_service("stop") _require_root_for_system_service("stop")
_require_service_installed("stop", system=system) _require_service_installed("stop", system=system)
try:
from gateway.status import get_running_pid, write_planned_stop_marker
pid = get_running_pid(cleanup_stale=False)
if pid is not None:
write_planned_stop_marker(pid)
except Exception:
pass
_run_systemctl(["stop", get_service_name()], system=system, check=True, timeout=90) _run_systemctl(["stop", get_service_name()], system=system, check=True, timeout=90)
print(f"{_service_scope_label(system).capitalize()} service stopped") print(f"{_service_scope_label(system).capitalize()} service stopped")
@ -2404,6 +2417,13 @@ def launchd_start():
def launchd_stop(): def launchd_stop():
label = get_launchd_label() label = get_launchd_label()
target = f"{_launchd_domain()}/{label}" target = f"{_launchd_domain()}/{label}"
try:
from gateway.status import get_running_pid, write_planned_stop_marker
pid = get_running_pid(cleanup_stale=False)
if pid is not None:
write_planned_stop_marker(pid)
except Exception:
pass
# bootout unloads the service definition so KeepAlive doesn't respawn # bootout unloads the service definition so KeepAlive doesn't respawn
# the process. A plain `kill SIGTERM` only signals the process — launchd # the process. A plain `kill SIGTERM` only signals the process — launchd
# immediately restarts it because KeepAlive.SuccessfulExit = false. # immediately restarts it because KeepAlive.SuccessfulExit = false.

View file

@ -702,3 +702,88 @@ class TestTakeoverMarker:
# We are not the target — must NOT consume as planned # We are not the target — must NOT consume as planned
assert result is False assert result is False
class TestPlannedStopMarker:
"""Tests for intentional service/manual gateway stop markers."""
def test_write_marker_records_target_identity(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42)
ok = status.write_planned_stop_marker(target_pid=12345)
assert ok is True
marker = tmp_path / ".gateway-planned-stop.json"
assert marker.exists()
payload = json.loads(marker.read_text())
assert payload["target_pid"] == 12345
assert payload["target_start_time"] == 42
assert payload["stopper_pid"] == os.getpid()
assert "written_at" in payload
def test_consume_returns_true_when_marker_names_self(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
ok = status.write_planned_stop_marker(target_pid=os.getpid())
assert ok is True
result = status.consume_planned_stop_marker_for_self()
assert result is True
assert not (tmp_path / ".gateway-planned-stop.json").exists()
def test_consume_returns_false_for_different_pid(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
ok = status.write_planned_stop_marker(target_pid=os.getpid() + 9999)
assert ok is True
result = status.consume_planned_stop_marker_for_self()
assert result is False
assert not (tmp_path / ".gateway-planned-stop.json").exists()
def test_consume_returns_false_for_stale_marker(self, tmp_path, monkeypatch):
from datetime import datetime, timezone, timedelta
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
marker_path = tmp_path / ".gateway-planned-stop.json"
stale_time = (datetime.now(timezone.utc) - timedelta(minutes=2)).isoformat()
marker_path.write_text(json.dumps({
"target_pid": os.getpid(),
"target_start_time": 123,
"stopper_pid": 99999,
"written_at": stale_time,
}))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
result = status.consume_planned_stop_marker_for_self()
assert result is False
assert not marker_path.exists()
def test_clear_planned_stop_marker_is_idempotent(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100)
status.clear_planned_stop_marker()
status.write_planned_stop_marker(target_pid=12345)
assert (tmp_path / ".gateway-planned-stop.json").exists()
status.clear_planned_stop_marker()
assert not (tmp_path / ".gateway-planned-stop.json").exists()
status.clear_planned_stop_marker()
def test_write_marker_returns_false_on_write_failure(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
def raise_oserror(*args, **kwargs):
raise OSError("simulated write failure")
monkeypatch.setattr(status, "_write_json_file", raise_oserror)
ok = status.write_planned_stop_marker(target_pid=12345)
assert ok is False

View file

@ -8,6 +8,7 @@ from types import SimpleNamespace
import pytest import pytest
import hermes_cli.gateway as gateway_cli import hermes_cli.gateway as gateway_cli
from gateway import status
from gateway.restart import ( from gateway.restart import (
DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT, DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT,
GATEWAY_SERVICE_RESTART_EXIT_CODE, GATEWAY_SERVICE_RESTART_EXIT_CODE,
@ -106,6 +107,30 @@ class TestSystemdServiceRefresh:
["systemctl", "--user", "reload-or-restart", gateway_cli.get_service_name()], ["systemctl", "--user", "reload-or-restart", gateway_cli.get_service_name()],
] ]
def test_systemd_stop_marks_running_gateway_as_planned_stop(self, monkeypatch):
calls = []
markers = []
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
monkeypatch.setattr(gateway_cli, "_require_service_installed", lambda action, system=False: None)
monkeypatch.setattr(status, "get_running_pid", lambda cleanup_stale=True: 321)
monkeypatch.setattr(
status,
"write_planned_stop_marker",
lambda pid: markers.append(pid) or True,
)
def fake_run_systemctl(args, **kwargs):
calls.append(args)
return SimpleNamespace(returncode=0, stdout="", stderr="")
monkeypatch.setattr(gateway_cli, "_run_systemctl", fake_run_systemctl)
gateway_cli.systemd_stop()
assert markers == [321]
assert calls == [["stop", gateway_cli.get_service_name()]]
def test_run_gateway_refreshes_outdated_unit_on_boot(self, tmp_path, monkeypatch): def test_run_gateway_refreshes_outdated_unit_on_boot(self, tmp_path, monkeypatch):
"""run_gateway() should refresh the systemd unit on boot so that """run_gateway() should refresh the systemd unit on boot so that
@ -127,11 +152,8 @@ class TestSystemdServiceRefresh:
monkeypatch.setattr(gateway_cli.subprocess, "run", fake_run) monkeypatch.setattr(gateway_cli.subprocess, "run", fake_run)
# Prevent run_gateway from actually starting the gateway # Prevent run_gateway from actually starting the gateway
def fake_start_gateway(**kwargs): async def fake_start_gateway(**kwargs):
import asyncio return True
f = asyncio.Future()
f.set_result(True)
return f
monkeypatch.setattr("gateway.run.start_gateway", fake_start_gateway) monkeypatch.setattr("gateway.run.start_gateway", fake_start_gateway)
@ -163,7 +185,16 @@ class TestRequireServiceInstalled:
class TestGeneratedSystemdUnits: class TestGeneratedSystemdUnits:
def test_user_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self): def _expected_timeout_stop_sec(self) -> str:
timeout = int(max(60, DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT) + 30)
return f"TimeoutStopSec={timeout}"
def test_user_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self, monkeypatch):
monkeypatch.setattr(
gateway_cli,
"_get_restart_drain_timeout",
lambda: DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT,
)
unit = gateway_cli.generate_systemd_unit(system=False) unit = gateway_cli.generate_systemd_unit(system=False)
assert "ExecStart=" in unit assert "ExecStart=" in unit
@ -173,7 +204,7 @@ class TestGeneratedSystemdUnits:
# TimeoutStopSec must exceed the default drain_timeout (60s) so # TimeoutStopSec must exceed the default drain_timeout (60s) so
# systemd doesn't SIGKILL the cgroup before post-interrupt cleanup # systemd doesn't SIGKILL the cgroup before post-interrupt cleanup
# (tool subprocess kill, adapter disconnect) runs — issue #8202. # (tool subprocess kill, adapter disconnect) runs — issue #8202.
assert "TimeoutStopSec=90" in unit assert self._expected_timeout_stop_sec() in unit
def test_user_unit_includes_resolved_node_directory_in_path(self, monkeypatch): def test_user_unit_includes_resolved_node_directory_in_path(self, monkeypatch):
monkeypatch.setattr(gateway_cli.shutil, "which", lambda cmd: "/home/test/.nvm/versions/node/v24.14.0/bin/node" if cmd == "node" else None) monkeypatch.setattr(gateway_cli.shutil, "which", lambda cmd: "/home/test/.nvm/versions/node/v24.14.0/bin/node" if cmd == "node" else None)
@ -219,7 +250,12 @@ class TestGeneratedSystemdUnits:
assert "/mnt/c/WINDOWS/system32" in unit assert "/mnt/c/WINDOWS/system32" in unit
def test_system_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self): def test_system_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self, monkeypatch):
monkeypatch.setattr(
gateway_cli,
"_get_restart_drain_timeout",
lambda: DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT,
)
unit = gateway_cli.generate_systemd_unit(system=True) unit = gateway_cli.generate_systemd_unit(system=True)
assert "ExecStart=" in unit assert "ExecStart=" in unit
@ -229,7 +265,7 @@ class TestGeneratedSystemdUnits:
# TimeoutStopSec must exceed the default drain_timeout (60s) so # TimeoutStopSec must exceed the default drain_timeout (60s) so
# systemd doesn't SIGKILL the cgroup before post-interrupt cleanup # systemd doesn't SIGKILL the cgroup before post-interrupt cleanup
# (tool subprocess kill, adapter disconnect) runs — issue #8202. # (tool subprocess kill, adapter disconnect) runs — issue #8202.
assert "TimeoutStopSec=90" in unit assert self._expected_timeout_stop_sec() in unit
assert "WantedBy=multi-user.target" in unit assert "WantedBy=multi-user.target" in unit