From 14367930518eb67a31cb5f39f98e0da5a87d6f07 Mon Sep 17 00:00:00 2001 From: konsisumer Date: Mon, 8 Jun 2026 12:29:15 +0200 Subject: [PATCH] fix(gateway): block shell gateway run when a service supervises the profile --- hermes_cli/gateway.py | 123 +++++++++++++++- hermes_cli/subcommands/gateway.py | 10 ++ tests/hermes_cli/test_gateway.py | 223 ++++++++++++++++++++++++++++++ 3 files changed, 353 insertions(+), 3 deletions(-) diff --git a/hermes_cli/gateway.py b/hermes_cli/gateway.py index 69bf4ca6b25..7c56431fef8 100644 --- a/hermes_cli/gateway.py +++ b/hermes_cli/gateway.py @@ -1095,11 +1095,30 @@ def get_gateway_runtime_snapshot(system: bool = False) -> GatewayRuntimeSnapshot # Other container runtimes (or containers built before Phase 2) # still get the original "docker (foreground)" label. try: - from hermes_cli.service_manager import detect_service_manager + from hermes_cli.service_manager import detect_service_manager, get_service_manager if detect_service_manager() == "s6": + profile = _profile_suffix() or "default" + service_name = f"gateway-{profile}" + mgr = get_service_manager() + service_installed = False + service_running = False + try: + service_dir = getattr(mgr, "scandir", None) + if service_dir is not None: + service_installed = (service_dir / service_name).is_dir() + except Exception: + service_installed = False + if service_installed: + try: + service_running = bool(mgr.is_running(service_name)) + except Exception: + service_running = False return GatewayRuntimeSnapshot( manager="s6 (container supervisor)", + service_installed=service_installed, + service_running=service_running, gateway_pids=gateway_pids, + service_scope="s6", ) except Exception: pass # Fall through to the legacy label on any detection error. @@ -3776,6 +3795,99 @@ def _is_official_docker_checkout() -> bool: ) +def _running_under_gateway_supervisor() -> bool: + """Return True when this process IS the gateway a service manager launched. + + The conflict guard below must never fire on the service's own startup, or + it would wedge the unit into a respawn/refuse loop. Each supervisor exports + a reliable marker into the child's environment: + + - systemd sets ``INVOCATION_ID`` for every unit it launches (the same + marker ``gateway/run.py`` already uses to pick the restart path). + - launchd sets ``XPC_SERVICE_NAME`` to the job label for jobs it spawns; + interactive shells inherit the sentinel ``"0"`` instead. + - the s6-overlay container longrun exports ``HERMES_S6_SUPERVISED_CHILD``. + """ + if os.environ.get("INVOCATION_ID"): + return True + if os.environ.get("HERMES_S6_SUPERVISED_CHILD"): + return True + xpc_service = os.environ.get("XPC_SERVICE_NAME", "") + if xpc_service and xpc_service != "0": + return True + return False + + +def _guard_supervised_gateway_conflict(force: bool = False) -> None: + """Refuse a foreground gateway when a service manager already supervises one. + + Running ``hermes gateway run [--replace]`` (or the manual-restart fallback) + from a shell on a systemd/launchd host spawns a second, long-lived + dispatcher that escapes the service cgroup, survives + ``systemctl restart``, and becomes a silent concurrent writer on the shared + kanban DB — the documented root cause of multi-writer SQLite WAL corruption + (issue #35240). Pass ``--force`` to start anyway. + """ + if force or _running_under_gateway_supervisor(): + return + try: + snapshot = get_gateway_runtime_snapshot() + except Exception: + # Best-effort guard: a probe failure must never block a real startup. + logger.debug("Supervised-gateway conflict probe failed", exc_info=True) + return + if not (snapshot.service_installed and snapshot.service_running): + return + + print_error( + f"A gateway is already running under {snapshot.manager} for this profile." + ) + print( + " Starting another one from a shell leaves an orphan dispatcher that\n" + " escapes the service, survives restarts, and writes to the same kanban\n" + " DB concurrently — which can corrupt it. Restart the supervised gateway\n" + " instead:" + ) + print() + print(" hermes gateway restart") + print() + print( + " Pass --force to start a foreground gateway anyway (not recommended\n" + " while the service is running)." + ) + sys.exit(1) + + +def _guard_existing_gateway_process_conflict(replace: bool = False) -> None: + """Refuse duplicate foreground startup before importing gateway.run. + + ``gateway.run`` performs the authoritative PID/lock check, but importing it + is expensive: it pulls in model_tools/plugin discovery first. On small + instances, a supervisor or dashboard loop repeatedly running bare + ``hermes gateway run`` can burn memory/CPU just to fail with "already + running" after plugin discovery. This cheap CLI-side preflight preserves the + same user-facing contract while avoiding that startup work. + """ + if replace or _running_under_gateway_supervisor(): + return + try: + snapshot = get_gateway_runtime_snapshot() + except Exception: + logger.debug("Existing-gateway process probe failed", exc_info=True) + return + if not snapshot.gateway_pids: + return + + pid_text = _format_gateway_pids(snapshot.gateway_pids, limit=1) + print_error( + f"Another gateway instance is already running (PID {pid_text})." + ) + print(" Use 'hermes gateway restart' to replace it,") + print(" or 'hermes gateway stop' first.") + print(" Or use 'hermes gateway run --replace' to auto-replace.") + sys.exit(1) + + def _guard_official_docker_root_gateway() -> None: """Refuse gateway startup when the official Docker privilege drop was bypassed.""" if not hasattr(os, "geteuid") or os.geteuid() != 0: @@ -3803,7 +3915,7 @@ def _guard_official_docker_root_gateway() -> None: sys.exit(1) -def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False): +def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False, force: bool = False): """Run the gateway in foreground. Args: @@ -3812,8 +3924,12 @@ def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False): replace: If True, kill any existing gateway instance before starting. This prevents systemd restart loops when the old process hasn't fully exited yet. + force: Skip the supervised-gateway conflict guard and start even when a + systemd/launchd service is already supervising this profile. """ _guard_official_docker_root_gateway() + _guard_supervised_gateway_conflict(force=force) + _guard_existing_gateway_process_conflict(replace=replace) sys.path.insert(0, str(PROJECT_ROOT)) # Detached Windows gateway runs must ignore console-control broadcasts @@ -6359,7 +6475,8 @@ def _gateway_command_inner(args): verbose = getattr(args, "verbose", 0) quiet = getattr(args, "quiet", False) replace = getattr(args, "replace", False) - run_gateway(verbose, quiet=quiet, replace=replace) + force = getattr(args, "force", False) + run_gateway(verbose, quiet=quiet, replace=replace, force=force) return if subcmd == "setup": diff --git a/hermes_cli/subcommands/gateway.py b/hermes_cli/subcommands/gateway.py index 8f20ad8e9a0..ed199fac560 100644 --- a/hermes_cli/subcommands/gateway.py +++ b/hermes_cli/subcommands/gateway.py @@ -60,6 +60,16 @@ def build_gateway_parser(subparsers, *, cmd_gateway: Callable, cmd_proxy: Callab action="store_true", help="Replace any existing gateway instance (useful for systemd)", ) + gateway_run.add_argument( + "--force", + action="store_true", + help=( + "Start a foreground gateway even when a systemd/launchd/s6 service " + "already supervises this profile. Without --force, the command " + "refuses because a second dispatcher escapes the service and can " + "corrupt shared gateway state." + ), + ) gateway_run.add_argument( "--no-supervise", action="store_true", diff --git a/tests/hermes_cli/test_gateway.py b/tests/hermes_cli/test_gateway.py index e127ee2053d..4fe01da7b8d 100644 --- a/tests/hermes_cli/test_gateway.py +++ b/tests/hermes_cli/test_gateway.py @@ -1,5 +1,6 @@ """Tests for hermes_cli.gateway.""" +import argparse import sys from types import ModuleType, SimpleNamespace @@ -27,6 +28,15 @@ def _install_fake_gateway_run(monkeypatch, start_gateway): monkeypatch.setattr( gateway, "refresh_systemd_unit_if_needed", lambda system=False: False ) + # Neutralize the supervised-gateway conflict guard by default so these + # end-to-end tests don't trip over a launchd/systemd gateway that happens + # to be installed+running on the developer's machine. Conflict-guard tests + # override this snapshot after calling the helper. + monkeypatch.setattr( + gateway, + "get_gateway_runtime_snapshot", + lambda *a, **k: gateway.GatewayRuntimeSnapshot(manager="manual process"), + ) def test_run_gateway_exits_cleanly_on_keyboard_interrupt(monkeypatch, capsys): @@ -104,6 +114,219 @@ def test_run_gateway_root_guard_has_escape_hatch(monkeypatch): assert calls == [(True, 2)] +def _clear_supervisor_markers(monkeypatch): + """Make ``_running_under_gateway_supervisor()`` report a plain shell.""" + monkeypatch.delenv("INVOCATION_ID", raising=False) + monkeypatch.delenv("HERMES_S6_SUPERVISED_CHILD", raising=False) + # Interactive macOS shells inherit XPC_SERVICE_NAME="0"; launchd jobs get + # the real label. Default to the shell sentinel so the guard can fire. + monkeypatch.setenv("XPC_SERVICE_NAME", "0") + + +def _running_snapshot(manager="systemd (user)"): + return gateway.GatewayRuntimeSnapshot( + manager=manager, service_installed=True, service_running=True + ) + + +def _process_snapshot(*pids: int, manager="manual process"): + return gateway.GatewayRuntimeSnapshot( + manager=manager, + gateway_pids=tuple(pids), + ) + + +def test_run_gateway_refuses_when_service_supervising(monkeypatch, capsys): + """A shell `gateway run --replace` must not become a second writer.""" + calls = [] + + def fake_start_gateway(*, replace, verbosity): + calls.append((replace, verbosity)) + return object() + + _install_fake_gateway_run(monkeypatch, fake_start_gateway) + _clear_supervisor_markers(monkeypatch) + monkeypatch.setattr(gateway, "get_gateway_runtime_snapshot", _running_snapshot) + + with pytest.raises(SystemExit) as exc_info: + gateway.run_gateway(replace=True) + + assert exc_info.value.code == 1 + assert calls == [] # dispatcher never started + out = capsys.readouterr().out + assert "already running under systemd (user)" in out + assert "hermes gateway restart" in out + assert "--force" in out + + +def test_run_gateway_force_overrides_supervised_conflict(monkeypatch): + calls = [] + + def fake_start_gateway(*, replace, verbosity): + calls.append((replace, verbosity)) + return object() + + _install_fake_gateway_run(monkeypatch, fake_start_gateway) + _clear_supervisor_markers(monkeypatch) + monkeypatch.setattr(gateway, "get_gateway_runtime_snapshot", _running_snapshot) + monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True) + + gateway.run_gateway(replace=True, force=True) + + assert calls == [(True, 0)] + + +def test_run_gateway_allows_service_managed_startup(monkeypatch): + """systemd's own ExecStart (INVOCATION_ID set) must not be blocked.""" + calls = [] + + def fake_start_gateway(*, replace, verbosity): + calls.append((replace, verbosity)) + return object() + + _install_fake_gateway_run(monkeypatch, fake_start_gateway) + _clear_supervisor_markers(monkeypatch) + monkeypatch.setenv("INVOCATION_ID", "deadbeefcafe") + # Even with a "running" snapshot, the supervisor marker means *we* are it. + monkeypatch.setattr(gateway, "get_gateway_runtime_snapshot", _running_snapshot) + monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True) + + gateway.run_gateway(replace=True) + + assert calls == [(True, 0)] + + +def test_run_gateway_allows_when_service_not_running(monkeypatch): + """Installed-but-stopped service: a foreground run is not a conflict.""" + calls = [] + + def fake_start_gateway(*, replace, verbosity): + calls.append((replace, verbosity)) + return object() + + _install_fake_gateway_run(monkeypatch, fake_start_gateway) + _clear_supervisor_markers(monkeypatch) + monkeypatch.setattr( + gateway, + "get_gateway_runtime_snapshot", + lambda: gateway.GatewayRuntimeSnapshot( + manager="systemd (user)", service_installed=True, service_running=False + ), + ) + monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True) + + gateway.run_gateway() + + assert calls == [(False, 0)] + + +def test_run_gateway_refuses_existing_process_before_importing_gateway_run(monkeypatch, capsys): + """Bare `gateway run` should fail cheaply when another gateway owns the profile.""" + calls = [] + + def fake_start_gateway(*, replace, verbosity): + calls.append((replace, verbosity)) + return object() + + _install_fake_gateway_run(monkeypatch, fake_start_gateway) + _clear_supervisor_markers(monkeypatch) + monkeypatch.setattr( + gateway, + "get_gateway_runtime_snapshot", + lambda: _process_snapshot(17907), + ) + + with pytest.raises(SystemExit) as exc_info: + gateway.run_gateway() + + assert exc_info.value.code == 1 + assert calls == [] + out = capsys.readouterr().out + assert "Another gateway instance is already running (PID 17907)" in out + assert "hermes gateway run --replace" in out + + +def test_run_gateway_replace_skips_existing_process_preflight(monkeypatch): + calls = [] + + def fake_start_gateway(*, replace, verbosity): + calls.append((replace, verbosity)) + return object() + + _install_fake_gateway_run(monkeypatch, fake_start_gateway) + _clear_supervisor_markers(monkeypatch) + monkeypatch.setattr( + gateway, + "get_gateway_runtime_snapshot", + lambda: _process_snapshot(17907), + ) + monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True) + + gateway.run_gateway(replace=True) + + assert calls == [(True, 0)] + + +def test_s6_runtime_snapshot_reports_supervised_service(monkeypatch, tmp_path): + service_dir = tmp_path / "gateway-default" + service_dir.mkdir() + + class FakeS6Manager: + scandir = tmp_path + + def is_running(self, name): + assert name == "gateway-default" + return True + + monkeypatch.setattr(gateway, "is_linux", lambda: True) + monkeypatch.setattr("hermes_constants.is_container", lambda: True) + monkeypatch.setattr("hermes_cli.service_manager.detect_service_manager", lambda: "s6") + monkeypatch.setattr("hermes_cli.service_manager.get_service_manager", lambda: FakeS6Manager()) + monkeypatch.setattr(gateway, "find_gateway_pids", lambda: [123]) + monkeypatch.setattr(gateway, "_profile_suffix", lambda: "") + + snapshot = gateway.get_gateway_runtime_snapshot() + + assert snapshot.manager == "s6 (container supervisor)" + assert snapshot.service_installed is True + assert snapshot.service_running is True + assert snapshot.service_scope == "s6" + assert snapshot.gateway_pids == (123,) + + +def test_running_under_gateway_supervisor_markers(monkeypatch): + _clear_supervisor_markers(monkeypatch) + assert gateway._running_under_gateway_supervisor() is False + + monkeypatch.setenv("XPC_SERVICE_NAME", "org.nousresearch.hermes.gateway") + assert gateway._running_under_gateway_supervisor() is True + + monkeypatch.setenv("XPC_SERVICE_NAME", "0") + monkeypatch.setenv("INVOCATION_ID", "abc123") + assert gateway._running_under_gateway_supervisor() is True + + monkeypatch.delenv("INVOCATION_ID", raising=False) + monkeypatch.setenv("HERMES_S6_SUPERVISED_CHILD", "1") + assert gateway._running_under_gateway_supervisor() is True + + +def test_gateway_run_force_flag_survives_parser_extraction(): + from hermes_cli.subcommands.gateway import build_gateway_parser + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest="command") + + build_gateway_parser( + subparsers, + cmd_gateway=lambda _args: None, + cmd_proxy=lambda _args: None, + ) + + args = parser.parse_args(["gateway", "run", "--force"]) + + assert args.force is True + + def test_run_gateway_windows_foreground_keeps_ctrl_c_enabled(monkeypatch): calls = []