From b6322901664c0af138035d6f753feedeb1d2e8b7 Mon Sep 17 00:00:00 2001 From: helix4u <4317663+helix4u@users.noreply.github.com> Date: Mon, 4 May 2026 13:35:23 -0600 Subject: [PATCH] fix(gateway): handle planned service stops --- gateway/run.py | 41 ++++-- gateway/status.py | 158 +++++++++++++++-------- hermes_cli/gateway.py | 20 +++ tests/gateway/test_status.py | 85 ++++++++++++ tests/hermes_cli/test_gateway_service.py | 54 ++++++-- 5 files changed, 286 insertions(+), 72 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 2b085d9915..ebfd2731fe 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -15003,15 +15003,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = runner = GatewayRunner(config) - # Track whether a signal initiated the shutdown (vs. internal request). - # When an unexpected SIGTERM kills the gateway, we exit non-zero so - # systemd's Restart=on-failure revives the process. systemctl stop - # is safe: systemd tracks stop-requested state independently of exit - # code, so Restart= never fires for a deliberate stop. + # Track whether an unexpected signal initiated the shutdown. When an + # unexpected SIGTERM kills the gateway, we exit non-zero so service + # managers can revive the process. Planned stop paths write a marker + # before signalling us so they can exit cleanly instead. _signal_initiated_shutdown = False # Set up signal handlers - def shutdown_signal_handler(): + def shutdown_signal_handler(received_signal=None): nonlocal _signal_initiated_shutdown # Planned --replace takeover check: when a sibling gateway is # taking over via --replace, it wrote a marker naming this PID @@ -15027,10 +15026,28 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = except Exception as e: logger.debug("Takeover marker check failed: %s", e) + # Planned stop check: service managers and `hermes gateway stop` + # also send SIGTERM, which is indistinguishable from an unexpected + # external kill unless the CLI marks it first. SIGINT comes from an + # interactive Ctrl+C and is likewise an intentional foreground stop. + planned_stop = False + if received_signal == signal.SIGINT: + planned_stop = True + elif not planned_takeover: + try: + from gateway.status import consume_planned_stop_marker_for_self + planned_stop = consume_planned_stop_marker_for_self() + except Exception as e: + logger.debug("Planned stop marker check failed: %s", e) + if planned_takeover: logger.info( "Received SIGTERM as a planned --replace takeover — exiting cleanly" ) + elif planned_stop: + logger.info( + "Received SIGTERM/SIGINT as a planned gateway stop — exiting cleanly" + ) else: _signal_initiated_shutdown = True logger.info("Received SIGTERM/SIGINT — initiating shutdown") @@ -15066,7 +15083,7 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = if threading.current_thread() is threading.main_thread(): for sig in (signal.SIGINT, signal.SIGTERM): try: - loop.add_signal_handler(sig, shutdown_signal_handler) + loop.add_signal_handler(sig, shutdown_signal_handler, sig) except NotImplementedError: pass if hasattr(signal, "SIGUSR1"): @@ -15164,14 +15181,14 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = if runner.exit_code is not None: raise SystemExit(runner.exit_code) - # When a signal (SIGTERM/SIGINT) caused the shutdown and it wasn't a - # planned restart (/restart, /update, SIGUSR1), exit non-zero so - # systemd's Restart=on-failure revives the process. This covers: + # When an unexpected SIGTERM caused the shutdown and it wasn't a planned + # restart (/restart, /update, SIGUSR1), exit non-zero so systemd's + # Restart=on-failure revives the process. This covers: # - hermes update killing the gateway mid-work # - External kill commands # - WSL2/container runtime sending unexpected signals - # systemctl stop is safe: systemd tracks "stop requested" state - # independently of exit code, so Restart= never fires for it. + # `hermes gateway stop` and interactive Ctrl+C are handled above as + # planned stops and should not trigger service-manager revival. if _signal_initiated_shutdown and not runner._restart_requested: logger.info( "Exiting with code 1 (signal-initiated shutdown without restart " diff --git a/gateway/status.py b/gateway/status.py index f329b25f08..bdff9aa988 100644 --- a/gateway/status.py +++ b/gateway/status.py @@ -637,6 +637,8 @@ def release_all_scoped_locks( _TAKEOVER_MARKER_FILENAME = ".gateway-takeover.json" _TAKEOVER_MARKER_TTL_S = 60 # Marker older than this is treated as stale +_PLANNED_STOP_MARKER_FILENAME = ".gateway-planned-stop.json" +_PLANNED_STOP_MARKER_TTL_S = 60 def _get_takeover_marker_path() -> Path: @@ -645,6 +647,67 @@ def _get_takeover_marker_path() -> Path: return home / _TAKEOVER_MARKER_FILENAME +def _get_planned_stop_marker_path() -> Path: + """Return the path to the intentional gateway stop marker file.""" + home = get_hermes_home() + return home / _PLANNED_STOP_MARKER_FILENAME + + +def _marker_is_stale(written_at: str, ttl_s: int) -> bool: + try: + written_dt = datetime.fromisoformat(written_at) + age = (datetime.now(timezone.utc) - written_dt).total_seconds() + return age > ttl_s + except (TypeError, ValueError): + return True + + +def _consume_pid_marker_for_self( + path: Path, + *, + pid_field: str, + start_time_field: str, + ttl_s: int, +) -> bool: + record = _read_json_file(path) + if not record: + return False + + try: + target_pid = int(record[pid_field]) + target_start_time = record.get(start_time_field) + written_at = record.get("written_at") or "" + except (KeyError, TypeError, ValueError): + try: + path.unlink(missing_ok=True) + except OSError: + pass + return False + + if _marker_is_stale(written_at, ttl_s): + try: + path.unlink(missing_ok=True) + except OSError: + pass + return False + + our_pid = os.getpid() + our_start_time = _get_process_start_time(our_pid) + matches = ( + target_pid == our_pid + and target_start_time is not None + and our_start_time is not None + and target_start_time == our_start_time + ) + + try: + path.unlink(missing_ok=True) + except OSError: + pass + + return matches + + def write_takeover_marker(target_pid: int) -> bool: """Record that ``target_pid`` is being replaced by the current process. @@ -681,59 +744,13 @@ def consume_takeover_marker_for_self() -> bool: Always unlinks the marker on match (and on detected staleness) so subsequent unrelated signals don't re-trigger. """ - path = _get_takeover_marker_path() - record = _read_json_file(path) - if not record: - return False - - # Any malformed or stale marker → drop it and return False - try: - target_pid = int(record["target_pid"]) - target_start_time = record.get("target_start_time") - written_at = record.get("written_at") or "" - except (KeyError, TypeError, ValueError): - try: - path.unlink(missing_ok=True) - except OSError: - pass - return False - - # TTL guard: a stale marker older than _TAKEOVER_MARKER_TTL_S is ignored. - stale = False - try: - written_dt = datetime.fromisoformat(written_at) - age = (datetime.now(timezone.utc) - written_dt).total_seconds() - if age > _TAKEOVER_MARKER_TTL_S: - stale = True - except (TypeError, ValueError): - stale = True # Unparseable timestamp — treat as stale - - if stale: - try: - path.unlink(missing_ok=True) - except OSError: - pass - return False - - # Does the marker name THIS process? - our_pid = os.getpid() - our_start_time = _get_process_start_time(our_pid) - matches = ( - target_pid == our_pid - and target_start_time is not None - and our_start_time is not None - and target_start_time == our_start_time + return _consume_pid_marker_for_self( + _get_takeover_marker_path(), + pid_field="target_pid", + start_time_field="target_start_time", + ttl_s=_TAKEOVER_MARKER_TTL_S, ) - # Consume the marker whether it matched or not — a marker that doesn't - # match our identity is stale-for-us anyway. - try: - path.unlink(missing_ok=True) - except OSError: - pass - - return matches - def clear_takeover_marker() -> None: """Remove the takeover marker unconditionally. Safe to call repeatedly.""" @@ -743,6 +760,45 @@ def clear_takeover_marker() -> None: pass +def write_planned_stop_marker(target_pid: int) -> bool: + """Record that ``target_pid`` is being stopped intentionally. + + The gateway exits non-zero for unexpected SIGTERM so service managers can + revive it. Service stop commands send the same SIGTERM, so the CLI writes + this short-lived marker first to let the target process exit cleanly. + """ + try: + target_start_time = _get_process_start_time(target_pid) + record = { + "target_pid": target_pid, + "target_start_time": target_start_time, + "stopper_pid": os.getpid(), + "written_at": _utc_now_iso(), + } + _write_json_file(_get_planned_stop_marker_path(), record) + return True + except (OSError, PermissionError): + return False + + +def consume_planned_stop_marker_for_self() -> bool: + """Return True when the current process is being intentionally stopped.""" + return _consume_pid_marker_for_self( + _get_planned_stop_marker_path(), + pid_field="target_pid", + start_time_field="target_start_time", + ttl_s=_PLANNED_STOP_MARKER_TTL_S, + ) + + +def clear_planned_stop_marker() -> None: + """Remove the planned-stop marker unconditionally.""" + try: + _get_planned_stop_marker_path().unlink(missing_ok=True) + except OSError: + pass + + def get_running_pid( pid_path: Optional[Path] = None, *, diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index c1804f9c7f..846736a2cc 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -785,6 +785,12 @@ def stop_profile_gateway() -> bool: if pid is None: return False + try: + from gateway.status import write_planned_stop_marker + write_planned_stop_marker(pid) + except Exception: + pass + try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: @@ -2043,6 +2049,13 @@ def systemd_stop(system: bool = False): if system: _require_root_for_system_service("stop") _require_service_installed("stop", system=system) + try: + from gateway.status import get_running_pid, write_planned_stop_marker + pid = get_running_pid(cleanup_stale=False) + if pid is not None: + write_planned_stop_marker(pid) + except Exception: + pass _run_systemctl(["stop", get_service_name()], system=system, check=True, timeout=90) print(f"✓ {_service_scope_label(system).capitalize()} service stopped") @@ -2404,6 +2417,13 @@ def launchd_start(): def launchd_stop(): label = get_launchd_label() target = f"{_launchd_domain()}/{label}" + try: + from gateway.status import get_running_pid, write_planned_stop_marker + pid = get_running_pid(cleanup_stale=False) + if pid is not None: + write_planned_stop_marker(pid) + except Exception: + pass # bootout unloads the service definition so KeepAlive doesn't respawn # the process. A plain `kill SIGTERM` only signals the process — launchd # immediately restarts it because KeepAlive.SuccessfulExit = false. diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index 7138b6514e..e7cd0dc060 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -702,3 +702,88 @@ class TestTakeoverMarker: # We are not the target — must NOT consume as planned assert result is False + + +class TestPlannedStopMarker: + """Tests for intentional service/manual gateway stop markers.""" + + def test_write_marker_records_target_identity(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 42) + + ok = status.write_planned_stop_marker(target_pid=12345) + + assert ok is True + marker = tmp_path / ".gateway-planned-stop.json" + assert marker.exists() + payload = json.loads(marker.read_text()) + assert payload["target_pid"] == 12345 + assert payload["target_start_time"] == 42 + assert payload["stopper_pid"] == os.getpid() + assert "written_at" in payload + + def test_consume_returns_true_when_marker_names_self(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + ok = status.write_planned_stop_marker(target_pid=os.getpid()) + assert ok is True + + result = status.consume_planned_stop_marker_for_self() + + assert result is True + assert not (tmp_path / ".gateway-planned-stop.json").exists() + + def test_consume_returns_false_for_different_pid(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + ok = status.write_planned_stop_marker(target_pid=os.getpid() + 9999) + assert ok is True + + result = status.consume_planned_stop_marker_for_self() + + assert result is False + assert not (tmp_path / ".gateway-planned-stop.json").exists() + + def test_consume_returns_false_for_stale_marker(self, tmp_path, monkeypatch): + from datetime import datetime, timezone, timedelta + + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + marker_path = tmp_path / ".gateway-planned-stop.json" + stale_time = (datetime.now(timezone.utc) - timedelta(minutes=2)).isoformat() + marker_path.write_text(json.dumps({ + "target_pid": os.getpid(), + "target_start_time": 123, + "stopper_pid": 99999, + "written_at": stale_time, + })) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) + + result = status.consume_planned_stop_marker_for_self() + + assert result is False + assert not marker_path.exists() + + def test_clear_planned_stop_marker_is_idempotent(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 100) + + status.clear_planned_stop_marker() + status.write_planned_stop_marker(target_pid=12345) + assert (tmp_path / ".gateway-planned-stop.json").exists() + + status.clear_planned_stop_marker() + + assert not (tmp_path / ".gateway-planned-stop.json").exists() + status.clear_planned_stop_marker() + + def test_write_marker_returns_false_on_write_failure(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + def raise_oserror(*args, **kwargs): + raise OSError("simulated write failure") + + monkeypatch.setattr(status, "_write_json_file", raise_oserror) + + ok = status.write_planned_stop_marker(target_pid=12345) + + assert ok is False diff --git a/tests/hermes_cli/test_gateway_service.py b/tests/hermes_cli/test_gateway_service.py index 3e9a4d3720..994e8d0284 100644 --- a/tests/hermes_cli/test_gateway_service.py +++ b/tests/hermes_cli/test_gateway_service.py @@ -8,6 +8,7 @@ from types import SimpleNamespace import pytest import hermes_cli.gateway as gateway_cli +from gateway import status from gateway.restart import ( DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT, GATEWAY_SERVICE_RESTART_EXIT_CODE, @@ -106,6 +107,30 @@ class TestSystemdServiceRefresh: ["systemctl", "--user", "reload-or-restart", gateway_cli.get_service_name()], ] + def test_systemd_stop_marks_running_gateway_as_planned_stop(self, monkeypatch): + calls = [] + markers = [] + + monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False) + monkeypatch.setattr(gateway_cli, "_require_service_installed", lambda action, system=False: None) + monkeypatch.setattr(status, "get_running_pid", lambda cleanup_stale=True: 321) + monkeypatch.setattr( + status, + "write_planned_stop_marker", + lambda pid: markers.append(pid) or True, + ) + + def fake_run_systemctl(args, **kwargs): + calls.append(args) + return SimpleNamespace(returncode=0, stdout="", stderr="") + + monkeypatch.setattr(gateway_cli, "_run_systemctl", fake_run_systemctl) + + gateway_cli.systemd_stop() + + assert markers == [321] + assert calls == [["stop", gateway_cli.get_service_name()]] + def test_run_gateway_refreshes_outdated_unit_on_boot(self, tmp_path, monkeypatch): """run_gateway() should refresh the systemd unit on boot so that @@ -127,11 +152,8 @@ class TestSystemdServiceRefresh: monkeypatch.setattr(gateway_cli.subprocess, "run", fake_run) # Prevent run_gateway from actually starting the gateway - def fake_start_gateway(**kwargs): - import asyncio - f = asyncio.Future() - f.set_result(True) - return f + async def fake_start_gateway(**kwargs): + return True monkeypatch.setattr("gateway.run.start_gateway", fake_start_gateway) @@ -163,7 +185,16 @@ class TestRequireServiceInstalled: class TestGeneratedSystemdUnits: - def test_user_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self): + def _expected_timeout_stop_sec(self) -> str: + timeout = int(max(60, DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT) + 30) + return f"TimeoutStopSec={timeout}" + + def test_user_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self, monkeypatch): + monkeypatch.setattr( + gateway_cli, + "_get_restart_drain_timeout", + lambda: DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT, + ) unit = gateway_cli.generate_systemd_unit(system=False) assert "ExecStart=" in unit @@ -173,7 +204,7 @@ class TestGeneratedSystemdUnits: # TimeoutStopSec must exceed the default drain_timeout (60s) so # systemd doesn't SIGKILL the cgroup before post-interrupt cleanup # (tool subprocess kill, adapter disconnect) runs — issue #8202. - assert "TimeoutStopSec=90" in unit + assert self._expected_timeout_stop_sec() in unit def test_user_unit_includes_resolved_node_directory_in_path(self, monkeypatch): monkeypatch.setattr(gateway_cli.shutil, "which", lambda cmd: "/home/test/.nvm/versions/node/v24.14.0/bin/node" if cmd == "node" else None) @@ -219,7 +250,12 @@ class TestGeneratedSystemdUnits: assert "/mnt/c/WINDOWS/system32" in unit - def test_system_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self): + def test_system_unit_avoids_recursive_execstop_and_uses_extended_stop_timeout(self, monkeypatch): + monkeypatch.setattr( + gateway_cli, + "_get_restart_drain_timeout", + lambda: DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT, + ) unit = gateway_cli.generate_systemd_unit(system=True) assert "ExecStart=" in unit @@ -229,7 +265,7 @@ class TestGeneratedSystemdUnits: # TimeoutStopSec must exceed the default drain_timeout (60s) so # systemd doesn't SIGKILL the cgroup before post-interrupt cleanup # (tool subprocess kill, adapter disconnect) runs — issue #8202. - assert "TimeoutStopSec=90" in unit + assert self._expected_timeout_stop_sec() in unit assert "WantedBy=multi-user.target" in unit