mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-19 10:02:16 +00:00
fix(gateway): block shell gateway run when a service supervises the profile
This commit is contained in:
parent
08d89e7aba
commit
1436793051
3 changed files with 353 additions and 3 deletions
|
|
@ -1095,11 +1095,30 @@ def get_gateway_runtime_snapshot(system: bool = False) -> GatewayRuntimeSnapshot
|
|||
# Other container runtimes (or containers built before Phase 2)
|
||||
# still get the original "docker (foreground)" label.
|
||||
try:
|
||||
from hermes_cli.service_manager import detect_service_manager
|
||||
from hermes_cli.service_manager import detect_service_manager, get_service_manager
|
||||
if detect_service_manager() == "s6":
|
||||
profile = _profile_suffix() or "default"
|
||||
service_name = f"gateway-{profile}"
|
||||
mgr = get_service_manager()
|
||||
service_installed = False
|
||||
service_running = False
|
||||
try:
|
||||
service_dir = getattr(mgr, "scandir", None)
|
||||
if service_dir is not None:
|
||||
service_installed = (service_dir / service_name).is_dir()
|
||||
except Exception:
|
||||
service_installed = False
|
||||
if service_installed:
|
||||
try:
|
||||
service_running = bool(mgr.is_running(service_name))
|
||||
except Exception:
|
||||
service_running = False
|
||||
return GatewayRuntimeSnapshot(
|
||||
manager="s6 (container supervisor)",
|
||||
service_installed=service_installed,
|
||||
service_running=service_running,
|
||||
gateway_pids=gateway_pids,
|
||||
service_scope="s6",
|
||||
)
|
||||
except Exception:
|
||||
pass # Fall through to the legacy label on any detection error.
|
||||
|
|
@ -3776,6 +3795,99 @@ def _is_official_docker_checkout() -> bool:
|
|||
)
|
||||
|
||||
|
||||
def _running_under_gateway_supervisor() -> bool:
|
||||
"""Return True when this process IS the gateway a service manager launched.
|
||||
|
||||
The conflict guard below must never fire on the service's own startup, or
|
||||
it would wedge the unit into a respawn/refuse loop. Each supervisor exports
|
||||
a reliable marker into the child's environment:
|
||||
|
||||
- systemd sets ``INVOCATION_ID`` for every unit it launches (the same
|
||||
marker ``gateway/run.py`` already uses to pick the restart path).
|
||||
- launchd sets ``XPC_SERVICE_NAME`` to the job label for jobs it spawns;
|
||||
interactive shells inherit the sentinel ``"0"`` instead.
|
||||
- the s6-overlay container longrun exports ``HERMES_S6_SUPERVISED_CHILD``.
|
||||
"""
|
||||
if os.environ.get("INVOCATION_ID"):
|
||||
return True
|
||||
if os.environ.get("HERMES_S6_SUPERVISED_CHILD"):
|
||||
return True
|
||||
xpc_service = os.environ.get("XPC_SERVICE_NAME", "")
|
||||
if xpc_service and xpc_service != "0":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _guard_supervised_gateway_conflict(force: bool = False) -> None:
|
||||
"""Refuse a foreground gateway when a service manager already supervises one.
|
||||
|
||||
Running ``hermes gateway run [--replace]`` (or the manual-restart fallback)
|
||||
from a shell on a systemd/launchd host spawns a second, long-lived
|
||||
dispatcher that escapes the service cgroup, survives
|
||||
``systemctl restart``, and becomes a silent concurrent writer on the shared
|
||||
kanban DB — the documented root cause of multi-writer SQLite WAL corruption
|
||||
(issue #35240). Pass ``--force`` to start anyway.
|
||||
"""
|
||||
if force or _running_under_gateway_supervisor():
|
||||
return
|
||||
try:
|
||||
snapshot = get_gateway_runtime_snapshot()
|
||||
except Exception:
|
||||
# Best-effort guard: a probe failure must never block a real startup.
|
||||
logger.debug("Supervised-gateway conflict probe failed", exc_info=True)
|
||||
return
|
||||
if not (snapshot.service_installed and snapshot.service_running):
|
||||
return
|
||||
|
||||
print_error(
|
||||
f"A gateway is already running under {snapshot.manager} for this profile."
|
||||
)
|
||||
print(
|
||||
" Starting another one from a shell leaves an orphan dispatcher that\n"
|
||||
" escapes the service, survives restarts, and writes to the same kanban\n"
|
||||
" DB concurrently — which can corrupt it. Restart the supervised gateway\n"
|
||||
" instead:"
|
||||
)
|
||||
print()
|
||||
print(" hermes gateway restart")
|
||||
print()
|
||||
print(
|
||||
" Pass --force to start a foreground gateway anyway (not recommended\n"
|
||||
" while the service is running)."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _guard_existing_gateway_process_conflict(replace: bool = False) -> None:
|
||||
"""Refuse duplicate foreground startup before importing gateway.run.
|
||||
|
||||
``gateway.run`` performs the authoritative PID/lock check, but importing it
|
||||
is expensive: it pulls in model_tools/plugin discovery first. On small
|
||||
instances, a supervisor or dashboard loop repeatedly running bare
|
||||
``hermes gateway run`` can burn memory/CPU just to fail with "already
|
||||
running" after plugin discovery. This cheap CLI-side preflight preserves the
|
||||
same user-facing contract while avoiding that startup work.
|
||||
"""
|
||||
if replace or _running_under_gateway_supervisor():
|
||||
return
|
||||
try:
|
||||
snapshot = get_gateway_runtime_snapshot()
|
||||
except Exception:
|
||||
logger.debug("Existing-gateway process probe failed", exc_info=True)
|
||||
return
|
||||
if not snapshot.gateway_pids:
|
||||
return
|
||||
|
||||
pid_text = _format_gateway_pids(snapshot.gateway_pids, limit=1)
|
||||
print_error(
|
||||
f"Another gateway instance is already running (PID {pid_text})."
|
||||
)
|
||||
print(" Use 'hermes gateway restart' to replace it,")
|
||||
print(" or 'hermes gateway stop' first.")
|
||||
print(" Or use 'hermes gateway run --replace' to auto-replace.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _guard_official_docker_root_gateway() -> None:
|
||||
"""Refuse gateway startup when the official Docker privilege drop was bypassed."""
|
||||
if not hasattr(os, "geteuid") or os.geteuid() != 0:
|
||||
|
|
@ -3803,7 +3915,7 @@ def _guard_official_docker_root_gateway() -> None:
|
|||
sys.exit(1)
|
||||
|
||||
|
||||
def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False):
|
||||
def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False, force: bool = False):
|
||||
"""Run the gateway in foreground.
|
||||
|
||||
Args:
|
||||
|
|
@ -3812,8 +3924,12 @@ def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False):
|
|||
replace: If True, kill any existing gateway instance before starting.
|
||||
This prevents systemd restart loops when the old process
|
||||
hasn't fully exited yet.
|
||||
force: Skip the supervised-gateway conflict guard and start even when a
|
||||
systemd/launchd service is already supervising this profile.
|
||||
"""
|
||||
_guard_official_docker_root_gateway()
|
||||
_guard_supervised_gateway_conflict(force=force)
|
||||
_guard_existing_gateway_process_conflict(replace=replace)
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
# Detached Windows gateway runs must ignore console-control broadcasts
|
||||
|
|
@ -6359,7 +6475,8 @@ def _gateway_command_inner(args):
|
|||
verbose = getattr(args, "verbose", 0)
|
||||
quiet = getattr(args, "quiet", False)
|
||||
replace = getattr(args, "replace", False)
|
||||
run_gateway(verbose, quiet=quiet, replace=replace)
|
||||
force = getattr(args, "force", False)
|
||||
run_gateway(verbose, quiet=quiet, replace=replace, force=force)
|
||||
return
|
||||
|
||||
if subcmd == "setup":
|
||||
|
|
|
|||
|
|
@ -60,6 +60,16 @@ def build_gateway_parser(subparsers, *, cmd_gateway: Callable, cmd_proxy: Callab
|
|||
action="store_true",
|
||||
help="Replace any existing gateway instance (useful for systemd)",
|
||||
)
|
||||
gateway_run.add_argument(
|
||||
"--force",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Start a foreground gateway even when a systemd/launchd/s6 service "
|
||||
"already supervises this profile. Without --force, the command "
|
||||
"refuses because a second dispatcher escapes the service and can "
|
||||
"corrupt shared gateway state."
|
||||
),
|
||||
)
|
||||
gateway_run.add_argument(
|
||||
"--no-supervise",
|
||||
action="store_true",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""Tests for hermes_cli.gateway."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from types import ModuleType, SimpleNamespace
|
||||
|
||||
|
|
@ -27,6 +28,15 @@ def _install_fake_gateway_run(monkeypatch, start_gateway):
|
|||
monkeypatch.setattr(
|
||||
gateway, "refresh_systemd_unit_if_needed", lambda system=False: False
|
||||
)
|
||||
# Neutralize the supervised-gateway conflict guard by default so these
|
||||
# end-to-end tests don't trip over a launchd/systemd gateway that happens
|
||||
# to be installed+running on the developer's machine. Conflict-guard tests
|
||||
# override this snapshot after calling the helper.
|
||||
monkeypatch.setattr(
|
||||
gateway,
|
||||
"get_gateway_runtime_snapshot",
|
||||
lambda *a, **k: gateway.GatewayRuntimeSnapshot(manager="manual process"),
|
||||
)
|
||||
|
||||
|
||||
def test_run_gateway_exits_cleanly_on_keyboard_interrupt(monkeypatch, capsys):
|
||||
|
|
@ -104,6 +114,219 @@ def test_run_gateway_root_guard_has_escape_hatch(monkeypatch):
|
|||
assert calls == [(True, 2)]
|
||||
|
||||
|
||||
def _clear_supervisor_markers(monkeypatch):
|
||||
"""Make ``_running_under_gateway_supervisor()`` report a plain shell."""
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
monkeypatch.delenv("HERMES_S6_SUPERVISED_CHILD", raising=False)
|
||||
# Interactive macOS shells inherit XPC_SERVICE_NAME="0"; launchd jobs get
|
||||
# the real label. Default to the shell sentinel so the guard can fire.
|
||||
monkeypatch.setenv("XPC_SERVICE_NAME", "0")
|
||||
|
||||
|
||||
def _running_snapshot(manager="systemd (user)"):
|
||||
return gateway.GatewayRuntimeSnapshot(
|
||||
manager=manager, service_installed=True, service_running=True
|
||||
)
|
||||
|
||||
|
||||
def _process_snapshot(*pids: int, manager="manual process"):
|
||||
return gateway.GatewayRuntimeSnapshot(
|
||||
manager=manager,
|
||||
gateway_pids=tuple(pids),
|
||||
)
|
||||
|
||||
|
||||
def test_run_gateway_refuses_when_service_supervising(monkeypatch, capsys):
|
||||
"""A shell `gateway run --replace` must not become a second writer."""
|
||||
calls = []
|
||||
|
||||
def fake_start_gateway(*, replace, verbosity):
|
||||
calls.append((replace, verbosity))
|
||||
return object()
|
||||
|
||||
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
monkeypatch.setattr(gateway, "get_gateway_runtime_snapshot", _running_snapshot)
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
gateway.run_gateway(replace=True)
|
||||
|
||||
assert exc_info.value.code == 1
|
||||
assert calls == [] # dispatcher never started
|
||||
out = capsys.readouterr().out
|
||||
assert "already running under systemd (user)" in out
|
||||
assert "hermes gateway restart" in out
|
||||
assert "--force" in out
|
||||
|
||||
|
||||
def test_run_gateway_force_overrides_supervised_conflict(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_start_gateway(*, replace, verbosity):
|
||||
calls.append((replace, verbosity))
|
||||
return object()
|
||||
|
||||
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
monkeypatch.setattr(gateway, "get_gateway_runtime_snapshot", _running_snapshot)
|
||||
monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True)
|
||||
|
||||
gateway.run_gateway(replace=True, force=True)
|
||||
|
||||
assert calls == [(True, 0)]
|
||||
|
||||
|
||||
def test_run_gateway_allows_service_managed_startup(monkeypatch):
|
||||
"""systemd's own ExecStart (INVOCATION_ID set) must not be blocked."""
|
||||
calls = []
|
||||
|
||||
def fake_start_gateway(*, replace, verbosity):
|
||||
calls.append((replace, verbosity))
|
||||
return object()
|
||||
|
||||
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
monkeypatch.setenv("INVOCATION_ID", "deadbeefcafe")
|
||||
# Even with a "running" snapshot, the supervisor marker means *we* are it.
|
||||
monkeypatch.setattr(gateway, "get_gateway_runtime_snapshot", _running_snapshot)
|
||||
monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True)
|
||||
|
||||
gateway.run_gateway(replace=True)
|
||||
|
||||
assert calls == [(True, 0)]
|
||||
|
||||
|
||||
def test_run_gateway_allows_when_service_not_running(monkeypatch):
|
||||
"""Installed-but-stopped service: a foreground run is not a conflict."""
|
||||
calls = []
|
||||
|
||||
def fake_start_gateway(*, replace, verbosity):
|
||||
calls.append((replace, verbosity))
|
||||
return object()
|
||||
|
||||
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
monkeypatch.setattr(
|
||||
gateway,
|
||||
"get_gateway_runtime_snapshot",
|
||||
lambda: gateway.GatewayRuntimeSnapshot(
|
||||
manager="systemd (user)", service_installed=True, service_running=False
|
||||
),
|
||||
)
|
||||
monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True)
|
||||
|
||||
gateway.run_gateway()
|
||||
|
||||
assert calls == [(False, 0)]
|
||||
|
||||
|
||||
def test_run_gateway_refuses_existing_process_before_importing_gateway_run(monkeypatch, capsys):
|
||||
"""Bare `gateway run` should fail cheaply when another gateway owns the profile."""
|
||||
calls = []
|
||||
|
||||
def fake_start_gateway(*, replace, verbosity):
|
||||
calls.append((replace, verbosity))
|
||||
return object()
|
||||
|
||||
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
monkeypatch.setattr(
|
||||
gateway,
|
||||
"get_gateway_runtime_snapshot",
|
||||
lambda: _process_snapshot(17907),
|
||||
)
|
||||
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
gateway.run_gateway()
|
||||
|
||||
assert exc_info.value.code == 1
|
||||
assert calls == []
|
||||
out = capsys.readouterr().out
|
||||
assert "Another gateway instance is already running (PID 17907)" in out
|
||||
assert "hermes gateway run --replace" in out
|
||||
|
||||
|
||||
def test_run_gateway_replace_skips_existing_process_preflight(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_start_gateway(*, replace, verbosity):
|
||||
calls.append((replace, verbosity))
|
||||
return object()
|
||||
|
||||
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
monkeypatch.setattr(
|
||||
gateway,
|
||||
"get_gateway_runtime_snapshot",
|
||||
lambda: _process_snapshot(17907),
|
||||
)
|
||||
monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True)
|
||||
|
||||
gateway.run_gateway(replace=True)
|
||||
|
||||
assert calls == [(True, 0)]
|
||||
|
||||
|
||||
def test_s6_runtime_snapshot_reports_supervised_service(monkeypatch, tmp_path):
|
||||
service_dir = tmp_path / "gateway-default"
|
||||
service_dir.mkdir()
|
||||
|
||||
class FakeS6Manager:
|
||||
scandir = tmp_path
|
||||
|
||||
def is_running(self, name):
|
||||
assert name == "gateway-default"
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(gateway, "is_linux", lambda: True)
|
||||
monkeypatch.setattr("hermes_constants.is_container", lambda: True)
|
||||
monkeypatch.setattr("hermes_cli.service_manager.detect_service_manager", lambda: "s6")
|
||||
monkeypatch.setattr("hermes_cli.service_manager.get_service_manager", lambda: FakeS6Manager())
|
||||
monkeypatch.setattr(gateway, "find_gateway_pids", lambda: [123])
|
||||
monkeypatch.setattr(gateway, "_profile_suffix", lambda: "")
|
||||
|
||||
snapshot = gateway.get_gateway_runtime_snapshot()
|
||||
|
||||
assert snapshot.manager == "s6 (container supervisor)"
|
||||
assert snapshot.service_installed is True
|
||||
assert snapshot.service_running is True
|
||||
assert snapshot.service_scope == "s6"
|
||||
assert snapshot.gateway_pids == (123,)
|
||||
|
||||
|
||||
def test_running_under_gateway_supervisor_markers(monkeypatch):
|
||||
_clear_supervisor_markers(monkeypatch)
|
||||
assert gateway._running_under_gateway_supervisor() is False
|
||||
|
||||
monkeypatch.setenv("XPC_SERVICE_NAME", "org.nousresearch.hermes.gateway")
|
||||
assert gateway._running_under_gateway_supervisor() is True
|
||||
|
||||
monkeypatch.setenv("XPC_SERVICE_NAME", "0")
|
||||
monkeypatch.setenv("INVOCATION_ID", "abc123")
|
||||
assert gateway._running_under_gateway_supervisor() is True
|
||||
|
||||
monkeypatch.delenv("INVOCATION_ID", raising=False)
|
||||
monkeypatch.setenv("HERMES_S6_SUPERVISED_CHILD", "1")
|
||||
assert gateway._running_under_gateway_supervisor() is True
|
||||
|
||||
|
||||
def test_gateway_run_force_flag_survives_parser_extraction():
|
||||
from hermes_cli.subcommands.gateway import build_gateway_parser
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
build_gateway_parser(
|
||||
subparsers,
|
||||
cmd_gateway=lambda _args: None,
|
||||
cmd_proxy=lambda _args: None,
|
||||
)
|
||||
|
||||
args = parser.parse_args(["gateway", "run", "--force"])
|
||||
|
||||
assert args.force is True
|
||||
|
||||
|
||||
def test_run_gateway_windows_foreground_keeps_ctrl_c_enabled(monkeypatch):
|
||||
calls = []
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue