fix(gateway): recover stale pid and planned restart state

This commit is contained in:
helix4u 2026-04-22 17:17:16 -06:00 committed by Teknium
parent 284e084bcc
commit b52123eb15
7 changed files with 646 additions and 79 deletions

View file

@ -2687,8 +2687,9 @@ class GatewayRunner:
except Exception as _e: except Exception as _e:
logger.debug("SessionDB close error: %s", _e) logger.debug("SessionDB close error: %s", _e)
from gateway.status import remove_pid_file from gateway.status import remove_pid_file, release_gateway_runtime_lock
remove_pid_file() remove_pid_file()
release_gateway_runtime_lock()
# Write a clean-shutdown marker so the next startup knows this # Write a clean-shutdown marker so the next startup knows this
# wasn't a crash. suspend_recently_active() only needs to run # wasn't a crash. suspend_recently_active() only needs to run
@ -10845,7 +10846,13 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
# The PID file is scoped to HERMES_HOME, so future multi-profile # The PID file is scoped to HERMES_HOME, so future multi-profile
# setups (each profile using a distinct HERMES_HOME) will naturally # setups (each profile using a distinct HERMES_HOME) will naturally
# allow concurrent instances without tripping this guard. # allow concurrent instances without tripping this guard.
from gateway.status import get_running_pid, remove_pid_file, terminate_pid from gateway.status import (
acquire_gateway_runtime_lock,
get_running_pid,
release_gateway_runtime_lock,
remove_pid_file,
terminate_pid,
)
existing_pid = get_running_pid() existing_pid = get_running_pid()
if existing_pid is not None and existing_pid != os.getpid(): if existing_pid is not None and existing_pid != os.getpid():
if replace: if replace:
@ -11058,14 +11065,21 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
"Exiting to avoid double-running.", _current_pid "Exiting to avoid double-running.", _current_pid
) )
return False return False
if not acquire_gateway_runtime_lock():
logger.error(
"Gateway runtime lock is already held by another instance. Exiting."
)
return False
try: try:
write_pid_file() write_pid_file()
except FileExistsError: except FileExistsError:
release_gateway_runtime_lock()
logger.error( logger.error(
"PID file race lost to another gateway instance. Exiting." "PID file race lost to another gateway instance. Exiting."
) )
return False return False
atexit.register(remove_pid_file) atexit.register(remove_pid_file)
atexit.register(release_gateway_runtime_lock)
# Start the gateway # Start the gateway
success = await runner.start() success = await runner.start()

View file

@ -22,11 +22,18 @@ from pathlib import Path
from hermes_constants import get_hermes_home from hermes_constants import get_hermes_home
from typing import Any, Optional from typing import Any, Optional
if sys.platform == "win32":
import msvcrt
else:
import fcntl
_GATEWAY_KIND = "hermes-gateway" _GATEWAY_KIND = "hermes-gateway"
_RUNTIME_STATUS_FILE = "gateway_state.json" _RUNTIME_STATUS_FILE = "gateway_state.json"
_LOCKS_DIRNAME = "gateway-locks" _LOCKS_DIRNAME = "gateway-locks"
_IS_WINDOWS = sys.platform == "win32" _IS_WINDOWS = sys.platform == "win32"
_UNSET = object() _UNSET = object()
_GATEWAY_LOCK_FILENAME = "gateway.lock"
_gateway_lock_handle = None
def _get_pid_path() -> Path: def _get_pid_path() -> Path:
@ -35,6 +42,14 @@ def _get_pid_path() -> Path:
return home / "gateway.pid" return home / "gateway.pid"
def _get_gateway_lock_path(pid_path: Optional[Path] = None) -> Path:
"""Return the path to the runtime gateway lock file."""
if pid_path is not None:
return pid_path.with_name(_GATEWAY_LOCK_FILENAME)
home = get_hermes_home()
return home / _GATEWAY_LOCK_FILENAME
def _get_runtime_status_path() -> Path: def _get_runtime_status_path() -> Path:
"""Return the persisted runtime health/status file path.""" """Return the persisted runtime health/status file path."""
return _get_pid_path().with_name(_RUNTIME_STATUS_FILE) return _get_pid_path().with_name(_RUNTIME_STATUS_FILE)
@ -212,6 +227,19 @@ def _read_pid_record(pid_path: Optional[Path] = None) -> Optional[dict]:
return None return None
def _read_gateway_lock_record(lock_path: Optional[Path] = None) -> Optional[dict[str, Any]]:
return _read_pid_record(lock_path or _get_gateway_lock_path())
def _pid_from_record(record: Optional[dict[str, Any]]) -> Optional[int]:
if not record:
return None
try:
return int(record["pid"])
except (KeyError, TypeError, ValueError):
return None
def _cleanup_invalid_pid_path(pid_path: Path, *, cleanup_stale: bool) -> None: def _cleanup_invalid_pid_path(pid_path: Path, *, cleanup_stale: bool) -> None:
if not cleanup_stale: if not cleanup_stale:
return return
@ -224,6 +252,102 @@ def _cleanup_invalid_pid_path(pid_path: Path, *, cleanup_stale: bool) -> None:
pass pass
def _write_gateway_lock_record(handle) -> None:
handle.seek(0)
handle.truncate()
json.dump(_build_pid_record(), handle)
handle.flush()
try:
os.fsync(handle.fileno())
except OSError:
pass
def _try_acquire_file_lock(handle) -> bool:
try:
if _IS_WINDOWS:
handle.seek(0, os.SEEK_END)
if handle.tell() == 0:
handle.write("\n")
handle.flush()
handle.seek(0)
msvcrt.locking(handle.fileno(), msvcrt.LK_NBLCK, 1)
else:
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except (BlockingIOError, OSError):
return False
def _release_file_lock(handle) -> None:
try:
if _IS_WINDOWS:
handle.seek(0)
msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1)
else:
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
except OSError:
pass
def acquire_gateway_runtime_lock() -> bool:
"""Claim the cross-process runtime lock for the gateway.
Unlike the PID file, the lock is owned by the live process itself. If the
process dies abruptly, the OS releases the lock automatically.
"""
global _gateway_lock_handle
if _gateway_lock_handle is not None:
return True
path = _get_gateway_lock_path()
path.parent.mkdir(parents=True, exist_ok=True)
handle = open(path, "a+", encoding="utf-8")
if not _try_acquire_file_lock(handle):
handle.close()
return False
_write_gateway_lock_record(handle)
_gateway_lock_handle = handle
return True
def release_gateway_runtime_lock() -> None:
"""Release the gateway runtime lock when owned by this process."""
global _gateway_lock_handle
handle = _gateway_lock_handle
if handle is None:
return
_gateway_lock_handle = None
_release_file_lock(handle)
try:
handle.close()
except OSError:
pass
def is_gateway_runtime_lock_active(lock_path: Optional[Path] = None) -> bool:
"""Return True when some process currently owns the gateway runtime lock."""
global _gateway_lock_handle
resolved_lock_path = lock_path or _get_gateway_lock_path()
if _gateway_lock_handle is not None and resolved_lock_path == _get_gateway_lock_path():
return True
if not resolved_lock_path.exists():
return False
handle = open(resolved_lock_path, "a+", encoding="utf-8")
try:
if _try_acquire_file_lock(handle):
_release_file_lock(handle)
return False
return True
finally:
try:
handle.close()
except OSError:
pass
def write_pid_file() -> None: def write_pid_file() -> None:
"""Write the current process PID and metadata to the gateway PID file. """Write the current process PID and metadata to the gateway PID file.
@ -583,35 +707,42 @@ def get_running_pid(
Cleans up stale PID files automatically. Cleans up stale PID files automatically.
""" """
resolved_pid_path = pid_path or _get_pid_path() resolved_pid_path = pid_path or _get_pid_path()
record = _read_pid_record(resolved_pid_path) resolved_lock_path = _get_gateway_lock_path(resolved_pid_path)
if not record: lock_active = is_gateway_runtime_lock_active(resolved_lock_path)
if not lock_active:
_cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale) _cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale)
return None return None
try: primary_record = _read_pid_record(resolved_pid_path)
pid = int(record["pid"]) fallback_record = _read_gateway_lock_record(resolved_lock_path)
except (KeyError, TypeError, ValueError):
_cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale)
return None
try: for record in (primary_record, fallback_record):
os.kill(pid, 0) # signal 0 = existence check, no actual signal sent pid = _pid_from_record(record)
except (ProcessLookupError, PermissionError): if pid is None:
_cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale) continue
return None
recorded_start = record.get("start_time") try:
current_start = _get_process_start_time(pid) os.kill(pid, 0) # signal 0 = existence check, no actual signal sent
if recorded_start is not None and current_start is not None and current_start != recorded_start: except ProcessLookupError:
_cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale) continue
return None except PermissionError:
# The process exists but belongs to another user/service scope.
# With the runtime lock still held, prefer keeping it visible
# rather than deleting the PID file as "stale".
if _record_looks_like_gateway(record):
return pid
continue
if not _looks_like_gateway_process(pid): recorded_start = record.get("start_time")
if not _record_looks_like_gateway(record): current_start = _get_process_start_time(pid)
_cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale) if recorded_start is not None and current_start is not None and current_start != recorded_start:
return None continue
return pid if _looks_like_gateway_process(pid) or _record_looks_like_gateway(record):
return pid
_cleanup_invalid_pid_path(resolved_pid_path, cleanup_stale=cleanup_stale)
return None
def is_gateway_running( def is_gateway_running(

View file

@ -333,6 +333,147 @@ def _probe_systemd_service_running(system: bool = False) -> tuple[bool, bool]:
return selected_system, result.stdout.strip() == "active" return selected_system, result.stdout.strip() == "active"
def _read_systemd_unit_properties(
system: bool = False,
properties: tuple[str, ...] = (
"ActiveState",
"SubState",
"Result",
"ExecMainStatus",
),
) -> dict[str, str]:
"""Return selected ``systemctl show`` properties for the gateway unit."""
selected_system = _select_systemd_scope(system)
try:
result = _run_systemctl(
[
"show",
get_service_name(),
"--no-pager",
"--property",
",".join(properties),
],
system=selected_system,
capture_output=True,
text=True,
timeout=10,
)
except (RuntimeError, subprocess.TimeoutExpired, OSError):
return {}
if result.returncode != 0:
return {}
parsed: dict[str, str] = {}
for line in result.stdout.splitlines():
if "=" not in line:
continue
key, value = line.split("=", 1)
parsed[key] = value.strip()
return parsed
def _wait_for_systemd_service_restart(
*,
system: bool = False,
previous_pid: int | None = None,
timeout: float = 60.0,
) -> bool:
"""Wait for the gateway service to become active after a restart handoff."""
import time
svc = get_service_name()
scope_label = _service_scope_label(system).capitalize()
deadline = time.time() + timeout
while time.time() < deadline:
props = _read_systemd_unit_properties(system=system)
active_state = props.get("ActiveState", "")
sub_state = props.get("SubState", "")
new_pid = None
try:
from gateway.status import get_running_pid
new_pid = get_running_pid()
except Exception:
new_pid = None
if active_state == "active":
if new_pid and (previous_pid is None or new_pid != previous_pid):
print(f"{scope_label} service restarted (PID {new_pid})")
return True
if previous_pid is None:
print(f"{scope_label} service restarted")
return True
if active_state == "activating" and sub_state == "auto-restart":
time.sleep(1)
continue
time.sleep(2)
print(
f"{scope_label} service did not become active within {int(timeout)}s.\n"
f" Check status: {'sudo ' if system else ''}hermes gateway status\n"
f" Check logs: journalctl {'--user ' if not system else ''}-u {svc} -l --since '2 min ago'"
)
return False
def _recover_pending_systemd_restart(system: bool = False, previous_pid: int | None = None) -> bool:
"""Recover a planned service restart that is stuck in systemd state."""
props = _read_systemd_unit_properties(system=system)
if not props:
return False
try:
from gateway.status import read_runtime_status
except Exception:
return False
runtime_state = read_runtime_status() or {}
if not runtime_state.get("restart_requested"):
return False
active_state = props.get("ActiveState", "")
sub_state = props.get("SubState", "")
exec_main_status = props.get("ExecMainStatus", "")
result = props.get("Result", "")
if active_state == "activating" and sub_state == "auto-restart":
print("⏳ Service restart already pending — waiting for systemd relaunch...")
return _wait_for_systemd_service_restart(
system=system,
previous_pid=previous_pid,
)
if active_state == "failed" and (
exec_main_status == str(GATEWAY_SERVICE_RESTART_EXIT_CODE)
or result == "exit-code"
):
svc = get_service_name()
scope_label = _service_scope_label(system).capitalize()
print(f"↻ Clearing failed state for pending {scope_label.lower()} service restart...")
_run_systemctl(
["reset-failed", svc],
system=system,
check=False,
timeout=30,
)
_run_systemctl(
["start", svc],
system=system,
check=False,
timeout=90,
)
return _wait_for_systemd_service_restart(
system=system,
previous_pid=previous_pid,
)
return False
def _probe_launchd_service_running() -> bool: def _probe_launchd_service_running() -> bool:
if not get_launchd_plist_path().exists(): if not get_launchd_plist_path().exists():
return False return False
@ -470,7 +611,8 @@ def stop_profile_gateway() -> bool:
except (ProcessLookupError, PermissionError): except (ProcessLookupError, PermissionError):
break break
remove_pid_file() if get_running_pid() is None:
remove_pid_file()
return True return True
@ -1505,14 +1647,9 @@ def systemd_restart(system: bool = False):
pid = get_running_pid() pid = get_running_pid()
if pid is not None and _request_gateway_self_restart(pid): if pid is not None and _request_gateway_self_restart(pid):
# SIGUSR1 sent — the gateway will drain active agents, exit with
# code 75, and systemd will restart it after RestartSec (30s).
# Wait for the old process to die and the new one to become active
# so the CLI doesn't return while the service is still restarting.
import time import time
scope_label = _service_scope_label(system).capitalize() scope_label = _service_scope_label(system).capitalize()
svc = get_service_name() svc = get_service_name()
scope_cmd = _systemctl_cmd(system)
# Phase 1: wait for old process to exit (drain + shutdown) # Phase 1: wait for old process to exit (drain + shutdown)
print(f"{scope_label} service draining active work...") print(f"{scope_label} service draining active work...")
@ -1526,48 +1663,41 @@ def systemd_restart(system: bool = False):
else: else:
print(f"⚠ Old process (PID {pid}) still alive after 90s") print(f"⚠ Old process (PID {pid}) still alive after 90s")
# Phase 2: wait for systemd to start the new process # The gateway exits with code 75 for a planned service restart.
print(f"⏳ Waiting for {svc} to restart...") # systemd can sit in the RestartSec window or even wedge itself into a
deadline = time.time() + 60 # failed/rate-limited state if the operator asks for another restart in
while time.time() < deadline: # the middle of that handoff. Clear any stale failed state and kick the
try: # unit immediately so `hermes gateway restart` behaves idempotently.
result = subprocess.run( _run_systemctl(
scope_cmd + ["is-active", svc], ["reset-failed", svc],
capture_output=True, text=True, timeout=5, system=system,
) check=False,
if result.stdout.strip() == "active": timeout=30,
# Verify it's a NEW process, not the old one somehow
new_pid = get_running_pid()
if new_pid and new_pid != pid:
print(f"{scope_label} service restarted (PID {new_pid})")
return
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
time.sleep(2)
# Timed out — check final state
try:
result = subprocess.run(
scope_cmd + ["is-active", svc],
capture_output=True, text=True, timeout=5,
)
if result.stdout.strip() == "active":
print(f"{scope_label} service restarted")
return
except Exception:
pass
print(
f"{scope_label} service did not become active within 60s.\n"
f" Check status: {'sudo ' if system else ''}hermes gateway status\n"
f" Check logs: journalctl {'--user ' if not system else ''}-u {svc} --since '2 min ago'"
) )
_run_systemctl(
["start", svc],
system=system,
check=False,
timeout=90,
)
_wait_for_systemd_service_restart(system=system, previous_pid=pid)
return return
if _recover_pending_systemd_restart(system=system, previous_pid=pid):
return
_run_systemctl(
["reset-failed", get_service_name()],
system=system,
check=False,
timeout=30,
)
_run_systemctl(["reload-or-restart", get_service_name()], system=system, check=True, timeout=90) _run_systemctl(["reload-or-restart", get_service_name()], system=system, check=True, timeout=90)
print(f"{_service_scope_label(system).capitalize()} service restarted") print(f"{_service_scope_label(system).capitalize()} service restarted")
def systemd_status(deep: bool = False, system: bool = False): def systemd_status(deep: bool = False, system: bool = False, full: bool = False):
system = _select_systemd_scope(system) system = _select_systemd_scope(system)
unit_path = get_systemd_unit_path(system=system) unit_path = get_systemd_unit_path(system=system)
scope_flag = " --system" if system else "" scope_flag = " --system" if system else ""
@ -1590,8 +1720,12 @@ def systemd_status(deep: bool = False, system: bool = False):
print(f" Run: {'sudo ' if system else ''}hermes gateway restart{scope_flag} # auto-refreshes the unit") print(f" Run: {'sudo ' if system else ''}hermes gateway restart{scope_flag} # auto-refreshes the unit")
print() print()
status_cmd = ["status", get_service_name(), "--no-pager"]
if full:
status_cmd.append("-l")
_run_systemctl( _run_systemctl(
["status", get_service_name(), "--no-pager"], status_cmd,
system=system, system=system,
capture_output=False, capture_output=False,
timeout=10, timeout=10,
@ -1624,6 +1758,19 @@ def systemd_status(deep: bool = False, system: bool = False):
for line in runtime_lines: for line in runtime_lines:
print(f" {line}") print(f" {line}")
unit_props = _read_systemd_unit_properties(system=system)
active_state = unit_props.get("ActiveState", "")
sub_state = unit_props.get("SubState", "")
exec_main_status = unit_props.get("ExecMainStatus", "")
result_code = unit_props.get("Result", "")
if active_state == "activating" and sub_state == "auto-restart":
print(" ⏳ Restart pending: systemd is waiting to relaunch the gateway")
elif active_state == "failed" and exec_main_status == str(GATEWAY_SERVICE_RESTART_EXIT_CODE):
print(" ⚠ Planned restart is stuck in systemd failed state (exit 75)")
print(f" Run: systemctl {'--user ' if not system else ''}reset-failed {get_service_name()} && {'sudo ' if system else ''}hermes gateway start{scope_flag}")
elif active_state == "failed" and result_code:
print(f" ⚠ Systemd unit result: {result_code}")
if system: if system:
print("✓ System service starts at boot without requiring systemd linger") print("✓ System service starts at boot without requiring systemd linger")
elif deep: elif deep:
@ -1639,7 +1786,10 @@ def systemd_status(deep: bool = False, system: bool = False):
if deep: if deep:
print() print()
print("Recent logs:") print("Recent logs:")
subprocess.run(_journalctl_cmd(system) + ["-u", get_service_name(), "-n", "20", "--no-pager"], timeout=10) log_cmd = _journalctl_cmd(system) + ["-u", get_service_name(), "-n", "20", "--no-pager"]
if full:
log_cmd.append("-l")
subprocess.run(log_cmd, timeout=10)
# ============================================================================= # =============================================================================
@ -3762,12 +3912,13 @@ def gateway_command(args):
elif subcmd == "status": elif subcmd == "status":
deep = getattr(args, 'deep', False) deep = getattr(args, 'deep', False)
full = getattr(args, 'full', False)
system = getattr(args, 'system', False) system = getattr(args, 'system', False)
snapshot = get_gateway_runtime_snapshot(system=system) snapshot = get_gateway_runtime_snapshot(system=system)
# Check for service first # Check for service first
if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()): if supports_systemd_services() and (get_systemd_unit_path(system=False).exists() or get_systemd_unit_path(system=True).exists()):
systemd_status(deep, system=system) systemd_status(deep, system=system, full=full)
_print_gateway_process_mismatch(snapshot) _print_gateway_process_mismatch(snapshot)
elif is_macos() and get_launchd_plist_path().exists(): elif is_macos() and get_launchd_plist_path().exists():
launchd_status(deep) launchd_status(deep)

View file

@ -6888,6 +6888,12 @@ For more help on a command:
# gateway status # gateway status
gateway_status = gateway_subparsers.add_parser("status", help="Show gateway status") gateway_status = gateway_subparsers.add_parser("status", help="Show gateway status")
gateway_status.add_argument("--deep", action="store_true", help="Deep status check") gateway_status.add_argument("--deep", action="store_true", help="Deep status check")
gateway_status.add_argument(
"-l",
"--full",
action="store_true",
help="Show full, untruncated service/log output where supported",
)
gateway_status.add_argument( gateway_status.add_argument(
"--system", "--system",
action="store_true", action="store_true",

View file

@ -65,7 +65,11 @@ class TestGatewayPidState:
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None) monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
assert status.get_running_pid() == os.getpid() assert status.acquire_gateway_runtime_lock() is True
try:
assert status.get_running_pid() == os.getpid()
finally:
status.release_gateway_runtime_lock()
def test_get_running_pid_accepts_script_style_gateway_cmdline(self, tmp_path, monkeypatch): def test_get_running_pid_accepts_script_style_gateway_cmdline(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path)) monkeypatch.setenv("HERMES_HOME", str(tmp_path))
@ -85,7 +89,11 @@ class TestGatewayPidState:
lambda pid: "/venv/bin/python /repo/hermes_cli/main.py gateway run --replace", lambda pid: "/venv/bin/python /repo/hermes_cli/main.py gateway run --replace",
) )
assert status.get_running_pid() == os.getpid() assert status.acquire_gateway_runtime_lock() is True
try:
assert status.get_running_pid() == os.getpid()
finally:
status.release_gateway_runtime_lock()
def test_get_running_pid_accepts_explicit_pid_path_without_cleanup(self, tmp_path, monkeypatch): def test_get_running_pid_accepts_explicit_pid_path_without_cleanup(self, tmp_path, monkeypatch):
other_home = tmp_path / "profile-home" other_home = tmp_path / "profile-home"
@ -102,9 +110,82 @@ class TestGatewayPidState:
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123) monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None) monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
lock_path = other_home / "gateway.lock"
lock_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status, "is_gateway_runtime_lock_active", lambda lock_path=None: True)
assert status.get_running_pid(pid_path, cleanup_stale=False) == os.getpid() assert status.get_running_pid(pid_path, cleanup_stale=False) == os.getpid()
assert pid_path.exists() assert pid_path.exists()
def test_runtime_lock_claims_and_releases_liveness(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
assert status.is_gateway_runtime_lock_active() is False
assert status.acquire_gateway_runtime_lock() is True
assert status.is_gateway_runtime_lock_active() is True
status.release_gateway_runtime_lock()
assert status.is_gateway_runtime_lock_active() is False
def test_get_running_pid_treats_pid_file_as_stale_without_runtime_lock(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
assert status.get_running_pid() is None
assert not pid_path.exists()
def test_get_running_pid_falls_back_to_live_lock_record(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
pid_path.write_text(json.dumps({
"pid": 99999,
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
}))
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
monkeypatch.setattr(
status,
"_build_pid_record",
lambda: {
"pid": os.getpid(),
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway"],
"start_time": 123,
},
)
assert status.acquire_gateway_runtime_lock() is True
def fake_kill(pid, sig):
if pid == 99999:
raise ProcessLookupError
return None
monkeypatch.setattr(status.os, "kill", fake_kill)
try:
assert status.get_running_pid() == os.getpid()
finally:
status.release_gateway_runtime_lock()
class TestGatewayRuntimeStatus: class TestGatewayRuntimeStatus:
def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch): def test_write_runtime_status_overwrites_stale_pid_on_restart(self, tmp_path, monkeypatch):

View file

@ -121,6 +121,12 @@ def test_systemd_status_warns_when_linger_disabled(monkeypatch, tmp_path, capsys
return SimpleNamespace(returncode=0, stdout="", stderr="") return SimpleNamespace(returncode=0, stdout="", stderr="")
if cmd[:3] == ["systemctl", "--user", "is-active"]: if cmd[:3] == ["systemctl", "--user", "is-active"]:
return SimpleNamespace(returncode=0, stdout="active\n", stderr="") return SimpleNamespace(returncode=0, stdout="active\n", stderr="")
if cmd[:3] == ["systemctl", "--user", "show"]:
return SimpleNamespace(
returncode=0,
stdout="ActiveState=active\nSubState=running\nResult=success\nExecMainStatus=0\n",
stderr="",
)
raise AssertionError(f"Unexpected command: {cmd}") raise AssertionError(f"Unexpected command: {cmd}")
monkeypatch.setattr(gateway.subprocess, "run", fake_run) monkeypatch.setattr(gateway.subprocess, "run", fake_run)
@ -352,3 +358,24 @@ class TestWaitForGatewayExit:
assert killed == 2 assert killed == 2
assert calls == [(11, True), (22, True)] assert calls == [(11, True), (22, True)]
class TestStopProfileGateway:
def test_stop_profile_gateway_keeps_pid_file_when_process_still_running(self, monkeypatch):
calls = {"kill": 0, "remove": 0}
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 12345)
monkeypatch.setattr(
gateway.os,
"kill",
lambda pid, sig: calls.__setitem__("kill", calls["kill"] + 1),
)
monkeypatch.setattr("time.sleep", lambda _: None)
monkeypatch.setattr(
"gateway.status.remove_pid_file",
lambda: calls.__setitem__("remove", calls["remove"] + 1),
)
assert gateway.stop_profile_gateway() is True
assert calls["kill"] == 21
assert calls["remove"] == 0

View file

@ -77,8 +77,10 @@ class TestSystemdServiceRefresh:
gateway_cli.systemd_restart() gateway_cli.systemd_restart()
assert unit_path.read_text(encoding="utf-8") == "new unit\n" assert unit_path.read_text(encoding="utf-8") == "new unit\n"
assert calls[:2] == [ assert calls[:4] == [
["systemctl", "--user", "daemon-reload"], ["systemctl", "--user", "daemon-reload"],
["systemctl", "--user", "show", gateway_cli.get_service_name(), "--no-pager", "--property", "ActiveState,SubState,Result,ExecMainStatus"],
["systemctl", "--user", "reset-failed", gateway_cli.get_service_name()],
["systemctl", "--user", "reload-or-restart", gateway_cli.get_service_name()], ["systemctl", "--user", "reload-or-restart", gateway_cli.get_service_name()],
] ]
@ -474,13 +476,21 @@ class TestGatewaySystemServiceRouting:
raise ProcessLookupError() raise ProcessLookupError()
monkeypatch.setattr(os, "kill", fake_kill) monkeypatch.setattr(os, "kill", fake_kill)
# Simulate systemctl is-active returning "active" with a new PID # Simulate systemctl reset-failed/start followed by an active unit
new_pid = [None] new_pid = [None]
def fake_subprocess_run(cmd, **kwargs): def fake_subprocess_run(cmd, **kwargs):
if "is-active" in cmd: if "reset-failed" in cmd:
result = SimpleNamespace(stdout="active\n", returncode=0) calls.append(("reset-failed", cmd))
new_pid[0] = 999 # new PID return SimpleNamespace(stdout="", returncode=0)
return result if "start" in cmd:
calls.append(("start", cmd))
return SimpleNamespace(stdout="", returncode=0)
if "show" in cmd:
new_pid[0] = 999
return SimpleNamespace(
stdout="ActiveState=active\nSubState=running\nResult=success\nExecMainStatus=0\n",
returncode=0,
)
raise AssertionError(f"Unexpected systemctl call: {cmd}") raise AssertionError(f"Unexpected systemctl call: {cmd}")
monkeypatch.setattr(gateway_cli.subprocess, "run", fake_subprocess_run) monkeypatch.setattr(gateway_cli.subprocess, "run", fake_subprocess_run)
@ -494,9 +504,131 @@ class TestGatewaySystemServiceRouting:
gateway_cli.systemd_restart() gateway_cli.systemd_restart()
assert ("self", 654) in calls assert ("self", 654) in calls
assert any(call[0] == "reset-failed" for call in calls)
assert any(call[0] == "start" for call in calls)
out = capsys.readouterr().out.lower() out = capsys.readouterr().out.lower()
assert "restarted" in out assert "restarted" in out
def test_systemd_restart_recovers_failed_planned_restart(self, monkeypatch, capsys):
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
monkeypatch.setattr(gateway_cli, "refresh_systemd_unit_if_needed", lambda system=False: None)
monkeypatch.setattr(
"gateway.status.read_runtime_status",
lambda: {"restart_requested": True, "gateway_state": "stopped"},
)
monkeypatch.setattr(gateway_cli, "_request_gateway_self_restart", lambda pid: False)
calls = []
started = {"value": False}
def fake_subprocess_run(cmd, **kwargs):
if "show" in cmd:
if not started["value"]:
return SimpleNamespace(
stdout=(
"ActiveState=failed\n"
"SubState=failed\n"
"Result=exit-code\n"
f"ExecMainStatus={GATEWAY_SERVICE_RESTART_EXIT_CODE}\n"
),
returncode=0,
)
return SimpleNamespace(
stdout="ActiveState=active\nSubState=running\nResult=success\nExecMainStatus=0\n",
returncode=0,
)
if "reset-failed" in cmd:
calls.append(("reset-failed", cmd))
return SimpleNamespace(stdout="", returncode=0)
if "start" in cmd:
started["value"] = True
calls.append(("start", cmd))
return SimpleNamespace(stdout="", returncode=0)
raise AssertionError(f"Unexpected command: {cmd}")
monkeypatch.setattr(gateway_cli.subprocess, "run", fake_subprocess_run)
monkeypatch.setattr(
"gateway.status.get_running_pid",
lambda: 999 if started["value"] else None,
)
gateway_cli.systemd_restart()
assert any(call[0] == "reset-failed" for call in calls)
assert any(call[0] == "start" for call in calls)
out = capsys.readouterr().out.lower()
assert "restarted" in out
def test_systemd_status_surfaces_planned_restart_failure(self, monkeypatch, capsys):
unit = SimpleNamespace(exists=lambda: True)
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
monkeypatch.setattr(gateway_cli, "get_systemd_unit_path", lambda system=False: unit)
monkeypatch.setattr(gateway_cli, "has_conflicting_systemd_units", lambda: False)
monkeypatch.setattr(gateway_cli, "has_legacy_hermes_units", lambda: False)
monkeypatch.setattr(gateway_cli, "systemd_unit_is_current", lambda system=False: True)
monkeypatch.setattr(gateway_cli, "_runtime_health_lines", lambda: ["⚠ Last shutdown reason: Gateway restart requested"])
monkeypatch.setattr(gateway_cli, "get_systemd_linger_status", lambda: (True, ""))
monkeypatch.setattr(gateway_cli, "_read_systemd_unit_properties", lambda system=False: {
"ActiveState": "failed",
"SubState": "failed",
"Result": "exit-code",
"ExecMainStatus": str(GATEWAY_SERVICE_RESTART_EXIT_CODE),
})
calls = []
def fake_run_systemctl(args, **kwargs):
calls.append(args)
if args[:2] == ["status", gateway_cli.get_service_name()]:
return SimpleNamespace(returncode=0, stdout="", stderr="")
if args[:2] == ["is-active", gateway_cli.get_service_name()]:
return SimpleNamespace(returncode=3, stdout="failed\n", stderr="")
raise AssertionError(f"Unexpected args: {args}")
monkeypatch.setattr(gateway_cli, "_run_systemctl", fake_run_systemctl)
gateway_cli.systemd_status()
out = capsys.readouterr().out
assert "Planned restart is stuck in systemd failed state" in out
def test_gateway_status_dispatches_full_flag(self, monkeypatch):
user_unit = SimpleNamespace(exists=lambda: True)
system_unit = SimpleNamespace(exists=lambda: False)
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True)
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
monkeypatch.setattr(gateway_cli, "is_macos", lambda: False)
monkeypatch.setattr(
gateway_cli,
"get_systemd_unit_path",
lambda system=False: system_unit if system else user_unit,
)
monkeypatch.setattr(
gateway_cli,
"get_gateway_runtime_snapshot",
lambda system=False: gateway_cli.GatewayRuntimeSnapshot(
manager="systemd (user)",
service_installed=True,
service_running=False,
gateway_pids=(),
service_scope="user",
),
)
calls = []
monkeypatch.setattr(
gateway_cli,
"systemd_status",
lambda deep=False, system=False, full=False: calls.append((deep, system, full)),
)
gateway_cli.gateway_command(
SimpleNamespace(gateway_command="status", deep=False, system=False, full=True)
)
assert calls == [(False, False, True)]
def test_gateway_install_passes_system_flags(self, monkeypatch): def test_gateway_install_passes_system_flags(self, monkeypatch):
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True) monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True)
monkeypatch.setattr(gateway_cli, "is_termux", lambda: False) monkeypatch.setattr(gateway_cli, "is_termux", lambda: False)
@ -547,11 +679,15 @@ class TestGatewaySystemServiceRouting:
) )
calls = [] calls = []
monkeypatch.setattr(gateway_cli, "systemd_status", lambda deep=False, system=False: calls.append((deep, system))) monkeypatch.setattr(
gateway_cli,
"systemd_status",
lambda deep=False, system=False, full=False: calls.append((deep, system, full)),
)
gateway_cli.gateway_command(SimpleNamespace(gateway_command="status", deep=False, system=False)) gateway_cli.gateway_command(SimpleNamespace(gateway_command="status", deep=False, system=False))
assert calls == [(False, False)] assert calls == [(False, False, False)]
def test_gateway_status_reports_manual_process_when_service_is_stopped(self, monkeypatch, capsys): def test_gateway_status_reports_manual_process_when_service_is_stopped(self, monkeypatch, capsys):
user_unit = SimpleNamespace(exists=lambda: True) user_unit = SimpleNamespace(exists=lambda: True)
@ -565,7 +701,11 @@ class TestGatewaySystemServiceRouting:
"get_systemd_unit_path", "get_systemd_unit_path",
lambda system=False: system_unit if system else user_unit, lambda system=False: system_unit if system else user_unit,
) )
monkeypatch.setattr(gateway_cli, "systemd_status", lambda deep=False, system=False: print("service stopped")) monkeypatch.setattr(
gateway_cli,
"systemd_status",
lambda deep=False, system=False, full=False: print("service stopped"),
)
monkeypatch.setattr( monkeypatch.setattr(
gateway_cli, gateway_cli,
"get_gateway_runtime_snapshot", "get_gateway_runtime_snapshot",
@ -1570,6 +1710,23 @@ class TestMigrateLegacyCommand:
assert called == {"interactive": False, "dry_run": False} assert called == {"interactive": False, "dry_run": False}
class TestGatewayStatusParser:
def test_gateway_status_subparser_accepts_full_flag(self):
import subprocess
import sys
result = subprocess.run(
[sys.executable, "-m", "hermes_cli.main", "gateway", "status", "-l", "--help"],
cwd=str(gateway_cli.PROJECT_ROOT),
capture_output=True,
text=True,
timeout=15,
)
assert result.returncode == 0
assert "unrecognized arguments" not in result.stderr
def test_gateway_command_migrate_legacy_dry_run_passes_through( def test_gateway_command_migrate_legacy_dry_run_passes_through(
self, monkeypatch self, monkeypatch
): ):