""" Gateway subcommand for hermes CLI. Handles: hermes gateway [run|start|stop|restart|status|install|uninstall|setup] """ import asyncio import logging import os import shlex import shutil import signal import subprocess import sys import textwrap import time from dataclasses import dataclass from pathlib import Path PROJECT_ROOT = Path(__file__).parent.parent.resolve() from gateway.status import terminate_pid from gateway.restart import ( DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT, GATEWAY_SERVICE_RESTART_EXIT_CODE, parse_restart_drain_timeout, ) from hermes_cli.config import ( get_env_value, get_hermes_home, is_managed, managed_error, read_raw_config, save_env_value, write_platform_config_field, ) # display_hermes_home is imported lazily at call sites to avoid ImportError # when hermes_constants is cached from a pre-update version during `hermes update`. from hermes_cli.setup import ( print_header, print_info, print_success, print_warning, print_error, prompt, prompt_choice, prompt_yes_no, ) from hermes_cli.colors import Colors, color logger = logging.getLogger(__name__) # ============================================================================= # Process Management (for manual gateway runs) # ============================================================================= @dataclass(frozen=True) class GatewayRuntimeSnapshot: manager: str service_installed: bool = False service_running: bool = False gateway_pids: tuple[int, ...] = () service_scope: str | None = None @property def running(self) -> bool: return self.service_running or bool(self.gateway_pids) @property def has_process_service_mismatch(self) -> bool: return self.service_installed and self.running and not self.service_running @dataclass(frozen=True) class ProfileGatewayProcess: profile: str path: Path pid: int def _get_service_pids() -> set: """Return PIDs currently managed by systemd or launchd gateway services. Used to avoid killing freshly-restarted service processes when sweeping for stale manual gateway processes after a service restart. Relies on the service manager having committed the new PID before the restart command returns (true for both systemd and launchd in practice). """ pids: set = set() # --- systemd (Linux): user and system scopes --- if supports_systemd_services(): for scope_args in [["systemctl", "--user"], ["systemctl"]]: try: result = subprocess.run( scope_args + [ "list-units", "hermes-gateway*", "--plain", "--no-legend", "--no-pager", ], capture_output=True, text=True, timeout=5, ) for line in result.stdout.strip().splitlines(): parts = line.split() if not parts or not parts[0].endswith(".service"): continue svc = parts[0] try: show = subprocess.run( scope_args + ["show", svc, "--property=MainPID", "--value"], capture_output=True, text=True, timeout=5, ) pid = int(show.stdout.strip()) if pid > 0: pids.add(pid) except (ValueError, subprocess.TimeoutExpired): pass except (FileNotFoundError, subprocess.TimeoutExpired): pass # --- launchd (macOS) --- if is_macos(): try: label = get_launchd_label() result = subprocess.run( ["launchctl", "list", label], capture_output=True, text=True, timeout=5, ) if result.returncode == 0: # Try plist format first (macOS 26+): "PID" = ; pid = _parse_launchd_pid_from_list_output(result.stdout) if pid is not None and pid > 0: pids.add(pid) else: # Fall back to legacy tab-separated format: # "PID\tStatus\tLabel" for line in result.stdout.strip().splitlines(): parts = line.split() if len(parts) >= 3 and parts[2] == label: try: pid = int(parts[0]) if pid > 0: pids.add(pid) except ValueError: pass except (FileNotFoundError, subprocess.TimeoutExpired): pass return pids def _get_parent_pid(pid: int) -> int | None: """Return the parent PID for ``pid``, or ``None`` when unavailable. Uses psutil (core dependency) which works on every platform. The older implementation shelled out to ``ps -o ppid= -p ``, which silently fails on Windows (no ``ps``) so the ancestor walk terminated at self — the caller's dedup / exclude logic then couldn't distinguish "hermes CLI that invoked this scan" from "real gateway process". """ if pid <= 1: return None try: import psutil # type: ignore return psutil.Process(pid).ppid() or None except ImportError: pass except Exception: return None # Fallback: shell out to ps (POSIX only — bare ``ps`` doesn't exist on Windows). if not shutil.which("ps"): return None try: result = subprocess.run( ["ps", "-o", "ppid=", "-p", str(pid)], capture_output=True, text=True, timeout=5, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if result.returncode != 0: return None raw = result.stdout.strip() if not raw: return None try: parent_pid = int(raw.splitlines()[-1].strip()) except ValueError: return None return parent_pid if parent_pid > 0 else None def _is_pid_ancestor_of_current_process(target_pid: int) -> bool: """Return True when ``target_pid`` is this process or one of its ancestors.""" if target_pid <= 0: return False pid = os.getpid() seen: set[int] = set() while pid and pid not in seen: if pid == target_pid: return True seen.add(pid) pid = _get_parent_pid(pid) or 0 return False def _request_gateway_self_restart(pid: int) -> bool: """Ask a running gateway ancestor to restart itself asynchronously.""" if not hasattr(signal, "SIGUSR1"): return False if not _is_pid_ancestor_of_current_process(pid): return False try: os.kill(pid, signal.SIGUSR1) # windows-footgun: ok — POSIX signal, guarded by hasattr(signal, 'SIGUSR1') above except (ProcessLookupError, PermissionError, OSError): return False return True def _graceful_restart_via_sigusr1(pid: int, drain_timeout: float) -> bool: """Send SIGUSR1 to a gateway PID and wait for it to exit gracefully. SIGUSR1 is wired in gateway/run.py to ``request_restart(via_service=True)`` which drains in-flight agent runs (up to ``agent.restart_drain_timeout`` seconds), then exits. Both systemd (``Restart=always``) and launchd (unconditional ``KeepAlive``) restart on any exit. This is the drain-aware alternative to ``systemctl restart`` / ``SIGTERM``, which SIGKILL in-flight agents after a short timeout. Args: pid: Gateway process PID (systemd MainPID, launchd PID, or bare process PID). drain_timeout: Seconds to wait for the process to exit after sending SIGUSR1. Should be slightly larger than the gateway's ``agent.restart_drain_timeout`` to allow the drain loop to finish cleanly. Returns: True if the PID was signalled and exited within the timeout. False if SIGUSR1 couldn't be sent or the process didn't exit in time (caller should fall back to a harder restart path). """ if not hasattr(signal, "SIGUSR1"): return False if pid <= 0: return False try: os.kill(pid, signal.SIGUSR1) # windows-footgun: ok — POSIX signal, guarded by hasattr(signal, 'SIGUSR1') above except ProcessLookupError: # Already gone — nothing to drain. return True except (PermissionError, OSError): return False import time as _time deadline = _time.monotonic() + max(drain_timeout, 1.0) # IMPORTANT Windows note: ``os.kill(pid, 0)`` is NOT a no-op on # Windows — Python's implementation calls ``TerminateProcess(handle, 0)`` # for sig=0, hard-killing the target. Use the cross-platform # ``_pid_exists`` helper in gateway.status which does OpenProcess + # WaitForSingleObject on Windows. from gateway.status import _pid_exists while _time.monotonic() < deadline: if not _pid_exists(pid): return True _time.sleep(0.5) # Drain didn't finish in time. return False def _get_ancestor_pids() -> set[int]: """Return the set of PIDs in the current process's ancestor chain. Walks from the current PID up to PID 1 (init) so that process-table scans never match the calling CLI process or any of its parents. This prevents ``hermes gateway status`` from falsely counting the ``hermes`` CLI that invoked it as a running gateway instance (see #13242). """ ancestors: set[int] = set() pid = os.getpid() # Cap iterations to avoid infinite loops on exotic platforms. for _ in range(64): ancestors.add(pid) parent = _get_parent_pid(pid) if parent is None or parent <= 0 or parent in ancestors: break pid = parent return ancestors def _append_unique_pid( pids: list[int], pid: int | None, exclude_pids: set[int] ) -> None: if pid is None or pid <= 0: return if pid == os.getpid() or pid in exclude_pids or pid in pids: return pids.append(pid) def _scan_gateway_pids( exclude_pids: set[int], all_profiles: bool = False, include_restart_managers: bool = False, ) -> list[int]: """Best-effort process-table scan for gateway PIDs. This supplements the profile-scoped PID file so status views can still spot a live gateway when the PID file is stale/missing, and ``--all`` sweeps can discover gateways outside the current profile. """ # Exclude the entire ancestor chain so the CLI process that invoked this # scan (e.g. ``hermes gateway status``) is never mistaken for a running # gateway. See #13242. exclude_pids = exclude_pids | _get_ancestor_pids() pids: list[int] = [] # Strict command-line matcher shared with gateway.status: requires the # actual ``gateway run`` subcommand (or the dedicated entrypoints), so this # scan no longer false-matches ``gateway status``/``dashboard`` siblings or # unrelated processes like ``python -m tui_gateway``. Lazy import mirrors the # circular-import avoidance used elsewhere in this module. from gateway.status import ( looks_like_gateway_command_line, looks_like_gateway_runtime_command_line, ) current_home = str(get_hermes_home().resolve()) current_home_lc = current_home.lower() current_profile_arg = _profile_arg(current_home) current_profile_name = ( current_profile_arg.split()[-1] if current_profile_arg else "" ) current_profile_name_lc = current_profile_name.lower() def _matches_current_profile(command: str) -> bool: command_lc = command.lower() if current_profile_name: return ( f"--profile {current_profile_name_lc}" in command_lc or f"-p {current_profile_name_lc}" in command_lc or f"hermes_home={current_home_lc}" in command_lc ) # Default-profile case: no profile flag in argv. Accept as long as # the command doesn't advertise *some other* profile. HERMES_HOME # may be passed via env (not visible in wmic/CIM command line) so # its absence is NOT disqualifying — only a non-matching explicit # HERMES_HOME= in argv is. if "--profile " in command_lc or " -p " in command_lc: return False if ( "hermes_home=" in command_lc and f"hermes_home={current_home_lc}" not in command_lc ): return False return True def _matches_gateway_runtime(command: str) -> bool: if looks_like_gateway_command_line(command): return True return include_restart_managers and looks_like_gateway_runtime_command_line(command) try: if is_windows(): # Prefer wmic when present (fast, stable output format). On # modern Windows 11 / Win 10 late builds, wmic has been # removed as part of the WMIC deprecation — fall back to # PowerShell's Get-CimInstance. Any OSError here (FileNotFoundError # on missing wmic) trips the fallback. wmic_path = shutil.which("wmic") used_fallback = False result = None if wmic_path is not None: try: result = subprocess.run( [ wmic_path, "process", "get", "ProcessId,CommandLine", "/FORMAT:LIST", ], capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=10, ) except (OSError, subprocess.TimeoutExpired): result = None if result is None or result.returncode != 0 or not (result.stdout or ""): # Fallback: PowerShell Get-CimInstance, emit LIST-style output # so the downstream parser below doesn't need to branch. powershell = shutil.which("powershell") or shutil.which("pwsh") if powershell is None: return [] ps_cmd = ( "Get-CimInstance Win32_Process | " "ForEach-Object { " " 'CommandLine=' + ($_.CommandLine -replace \"`r`n\",' ' -replace \"`n\",' '); " " 'ProcessId=' + $_.ProcessId; " " '' " "}" ) try: result = subprocess.run( [powershell, "-NoProfile", "-Command", ps_cmd], capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=15, ) except (OSError, subprocess.TimeoutExpired): return [] used_fallback = True if result.returncode != 0 or result.stdout is None: return [] current_cmd = "" for line in result.stdout.split("\n"): line = line.strip() if line.startswith("CommandLine="): current_cmd = line[len("CommandLine=") :] elif line.startswith("ProcessId="): pid_str = line[len("ProcessId=") :] if _matches_gateway_runtime(current_cmd) and ( all_profiles or _matches_current_profile(current_cmd) ): try: _append_unique_pid(pids, int(pid_str), exclude_pids) except ValueError: pass current_cmd = "" else: # Try /proc first (works in Docker without procps installed), # fall back to ps -A eww. _found_via_proc = False if os.path.isdir("/proc"): try: my_pid = os.getpid() for entry in os.listdir("/proc"): if not entry.isdigit(): continue pid = int(entry) if pid == my_pid or pid in exclude_pids: continue try: with open(f"/proc/{pid}/cmdline", "rb") as _f: cmdline = _f.read().decode("utf-8", errors="replace") cmdline = cmdline.replace("\x00", " ") if _matches_gateway_runtime(cmdline) and ( all_profiles or _matches_current_profile(cmdline) ): _append_unique_pid(pids, pid, exclude_pids) except (OSError, PermissionError): continue _found_via_proc = True except Exception: pass if not _found_via_proc: result = subprocess.run( ["ps", "-A", "eww", "-o", "pid=,command="], capture_output=True, text=True, timeout=10, ) if result.returncode != 0: return [] for line in result.stdout.split("\n"): stripped = line.strip() if not stripped or "grep" in stripped: continue pid = None command = "" parts = stripped.split(None, 1) if len(parts) == 2: try: pid = int(parts[0]) command = parts[1] except ValueError: pid = None if pid is None: aux_parts = stripped.split() if len(aux_parts) > 10 and aux_parts[1].isdigit(): pid = int(aux_parts[1]) command = " ".join(aux_parts[10:]) if pid is None: continue if _matches_gateway_runtime(command) and ( all_profiles or _matches_current_profile(command) ): _append_unique_pid(pids, pid, exclude_pids) except (OSError, subprocess.TimeoutExpired): return [] # Windows-specific: collapse venv launcher stubs. A venv-built # ``pythonw.exe`` in ``/Scripts/`` is a ~100 KB launcher exe # that spawns the base Python (e.g. ``C:\Program Files\Python311\ # pythonw.exe``) with the same command line, preserving the venv's # ``pyvenv.cfg`` context. This is standard Windows CPython venv # behaviour — BUT it means every gateway run produces two pythonw # PIDs with identical command lines (one launcher stub, one actual # interpreter) which is confusing in ``gateway status`` output. # Filter the stub: if a PID in our result is the PARENT of another # PID in our result, and both are pythonw.exe, the parent is the # launcher stub — drop it, keep the child. if is_windows() and len(pids) > 1: pids = _filter_venv_launcher_stubs(pids) return pids def _filter_venv_launcher_stubs(pids: list[int]) -> list[int]: """Drop venv-launcher ``pythonw.exe`` stubs that are parents of the real interpreter process. See comment at the tail of ``_scan_gateway_pids``. Uses ``psutil`` (core dependency). Safe on any platform; only invoked on Windows by the caller because the stub pattern is Windows-specific. """ try: import psutil # type: ignore except ImportError: return pids pid_set = set(pids) # Collect each PID's parent so we can flag "child of another matched PID". parent_of: dict[int, int | None] = {} for pid in pids: try: parent_of[pid] = psutil.Process(pid).ppid() except (psutil.NoSuchProcess, psutil.AccessDenied): parent_of[pid] = None # For each child whose parent is also in our set, drop the parent. drop: set[int] = set() for pid, ppid in parent_of.items(): if ppid is not None and ppid in pid_set: drop.add(ppid) return [p for p in pids if p not in drop] def find_gateway_pids( exclude_pids: set | None = None, all_profiles: bool = False ) -> list: """Find PIDs of running gateway processes. Args: exclude_pids: PIDs to exclude from the result (e.g. service-managed PIDs that should not be killed during a stale-process sweep). all_profiles: When ``True``, return gateway PIDs across **all** profiles (the pre-7923 global behaviour). ``hermes update`` needs this because a code update affects every profile. When ``False`` (default), only PIDs belonging to the current Hermes profile are returned. """ _exclude = set(exclude_pids or set()) pids: list[int] = [] if not all_profiles: try: from gateway.status import get_running_pid _append_unique_pid(pids, get_running_pid(), _exclude) except Exception: pass for pid in _get_service_pids(): _append_unique_pid(pids, pid, _exclude) try: include_restart_managers = not supports_systemd_services() except Exception: include_restart_managers = False for pid in _scan_gateway_pids( _exclude, all_profiles=all_profiles, include_restart_managers=include_restart_managers, ): _append_unique_pid(pids, pid, _exclude) return pids def find_profile_gateway_processes( exclude_pids: set | None = None, ) -> list[ProfileGatewayProcess]: """Return running gateway PIDs mapped to Hermes profiles via PID files.""" _exclude = set(exclude_pids or set()) processes: list[ProfileGatewayProcess] = [] try: from gateway.status import get_running_pid from hermes_cli.profiles import list_profiles except Exception: return processes seen: set[int] = set() for profile in list_profiles(): try: pid = get_running_pid(profile.path / "gateway.pid", cleanup_stale=False) except Exception: continue if pid is None or pid <= 0 or pid in _exclude or pid in seen: continue seen.add(pid) processes.append( ProfileGatewayProcess(profile=profile.name, path=profile.path, pid=pid) ) return processes def _gateway_run_args_for_profile(profile: str) -> list[str]: args = [get_python_path(), "-m", "hermes_cli.main"] if profile != "default": args.extend(["--profile", profile]) args.extend(["gateway", "run", "--replace"]) return args def _capture_gateway_argv(pid: int) -> list[str] | None: """Return the live argv of a running gateway process, or ``None``. Used to respawn gateways that have no profile→PID-file mapping (e.g. a Windows Scheduled Task running ``pythonw.exe -m hermes_cli.main gateway run``). ``_pause_windows_gateways_for_update`` force-kills such gateways before mutating the venv; without their original command line we cannot bring them back, so we snapshot it here before the kill. Best-effort: returns ``None`` if psutil is unavailable, the process is gone, access is denied, or the argv doesn't look like a gateway command. """ if pid <= 1: return None try: import psutil # type: ignore except ImportError: return None try: argv = list(psutil.Process(pid).cmdline() or []) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): return None except Exception: return None if not argv: return None # Guard against snapshotting an unrelated process whose PID happened to be # reported by the scan: only respawn things that actually look like a # gateway run command line. try: from gateway.status import looks_like_gateway_command_line if not looks_like_gateway_command_line(" ".join(argv)): return None except Exception: pass return argv def launch_detached_gateway_restart_by_cmdline( old_pid: int, run_argv: list[str] ) -> bool: """Relaunch a gateway by replaying its captured command line after exit. Companion to ``launch_detached_profile_gateway_restart`` for gateways that have no profile→PID-file mapping (Scheduled-Task / manually-launched ``gateway run`` whose HERMES_HOME or argv doesn't match a known profile). Uses the identical detached-watcher mechanism; only the respawn argv differs (the process's own argv instead of a profile-derived one). """ if old_pid <= 0 or not run_argv: return False return _spawn_gateway_restart_watcher(old_pid, list(run_argv)) def launch_detached_profile_gateway_restart(profile: str, old_pid: int) -> bool: """Relaunch a manually-run profile gateway after its current PID exits.""" if old_pid <= 0: return False return _spawn_gateway_restart_watcher(old_pid, _gateway_run_args_for_profile(profile)) def _spawn_gateway_restart_watcher(old_pid: int, run_argv: list[str]) -> bool: """Spawn the detached watcher that respawns ``run_argv`` once ``old_pid`` exits.""" if old_pid <= 0 or not run_argv: return False # The watcher is a tiny Python subprocess that polls the old PID and # respawns the gateway once it's gone. Both legs of the chain need # platform-appropriate detach semantics: # # POSIX — ``start_new_session=True`` (os.setsid in the child) detaches # from the parent's process group so Ctrl+C in the CLI doesn't # propagate and the watcher/gateway survive the CLI exiting. # # Windows — ``start_new_session`` is silently accepted but does NOT # detach. The watcher stays attached to the CLI's console and dies # when the user closes the terminal, leaving ``hermes update`` users # with no running gateway until they re-invoke ``hermes gateway`` # manually. The Win32 equivalent is the ``CREATE_NEW_PROCESS_GROUP | # DETACHED_PROCESS | CREATE_NO_WINDOW`` creationflags bundle. # # ``windows_detach_popen_kwargs()`` returns the right kwargs for the # host platform and is a no-op on POSIX (just ``start_new_session=True``). from hermes_cli._subprocess_compat import ( windows_detach_flags_without_breakaway, windows_detach_popen_kwargs, ) watcher = textwrap.dedent( """ import os import subprocess import sys import time from hermes_cli._subprocess_compat import ( windows_detach_flags, windows_detach_flags_without_breakaway, ) pid = int(sys.argv[1]) cmd = sys.argv[2:] deadline = time.monotonic() + 120 while time.monotonic() < deadline: # ``os.kill(pid, 0)`` is not a no-op on Windows — use the # cross-platform existence check. from gateway.status import _pid_exists if not _pid_exists(pid): break time.sleep(0.2) # Platform-appropriate detach for the respawned gateway. On POSIX # start_new_session=True maps to os.setsid; on Windows we need # explicit creationflags because start_new_session is a no-op there. # CREATE_BREAKAWAY_FROM_JOB is critical: the watcher itself may have # been spawned inside a job object (Electron/Tauri parent), and # without breakaway the respawned gateway would die when that job # tears down. See _subprocess_compat.windows_detach_flags(). _popen_kwargs = { "stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL, } if sys.platform == "win32": try: _popen_kwargs["creationflags"] = windows_detach_flags() subprocess.Popen(cmd, **_popen_kwargs) except OSError: # CREATE_BREAKAWAY_FROM_JOB can be rejected with # ERROR_ACCESS_DENIED when the parent's job object refuses # breakaway. Retry without it — DETACHED_PROCESS et al. # alone are enough in most setups. Mirrors the canonical # fallback in gateway_windows._spawn_detached. _popen_kwargs["creationflags"] = windows_detach_flags_without_breakaway() subprocess.Popen(cmd, **_popen_kwargs) else: _popen_kwargs["start_new_session"] = True subprocess.Popen(cmd, **_popen_kwargs) """ ).strip() watcher_argv = [ sys.executable, "-c", watcher, str(old_pid), *run_argv, ] # Same platform-aware detach for the watcher process itself — so # closing the user's terminal doesn't kill the watcher. try: subprocess.Popen( watcher_argv, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **windows_detach_popen_kwargs(), ) except OSError: # CREATE_BREAKAWAY_FROM_JOB rejected by the parent job object # (Electron, Windows Terminal with restrictive job settings, …). # Retry without it. POSIX never reaches this branch — there # ``start_new_session=True`` cannot raise OSError — so the # fallback is only meaningful on Windows. try: fallback_kwargs: dict = ( {"creationflags": windows_detach_flags_without_breakaway()} if sys.platform == "win32" else {"start_new_session": True} ) subprocess.Popen( watcher_argv, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, **fallback_kwargs, ) except OSError: return False return True def _probe_systemd_service_running(system: bool = False) -> tuple[bool, bool]: selected_system = _select_systemd_scope(system) unit_exists = get_systemd_unit_path(system=selected_system).exists() if not unit_exists: return selected_system, False try: result = _run_systemctl( ["is-active", get_service_name()], system=selected_system, capture_output=True, text=True, timeout=10, ) except (RuntimeError, subprocess.TimeoutExpired): return selected_system, False return selected_system, result.stdout.strip() == "active" def _read_systemd_unit_environment(system: bool = False) -> dict[str, str]: """Parse the gateway unit's ``Environment=`` directives. ``systemctl show -p Environment`` returns a single line of space-separated ``KEY=VALUE`` pairs; values are not quoted in the output even when the unit file quoted them. We split on whitespace and ``=``. """ selected_system = _select_systemd_scope(system) try: result = _run_systemctl( [ "show", get_service_name(), "--no-pager", "--property", "Environment", ], system=selected_system, capture_output=True, text=True, timeout=10, ) except (RuntimeError, subprocess.TimeoutExpired, OSError): return {} if result.returncode != 0: return {} parsed: dict[str, str] = {} for line in result.stdout.splitlines(): if not line.startswith("Environment="): continue body = line[len("Environment=") :].strip() for token in body.split(): if "=" not in token: continue key, value = token.split("=", 1) parsed[key] = value return parsed def _sync_hermes_home_from_systemd_unit(system: bool) -> None: """When acting on a system-scope unit, adopt its ``HERMES_HOME``. Under ``sudo``, ``HERMES_HOME`` is stripped and ``HOME=/root``, so :func:`get_hermes_home` falls back to ``/root/.hermes`` — the wrong profile. The unit file pins ``HERMES_HOME`` for the actual gateway process, so we mirror that into our own environment to make ``read_runtime_status`` / ``get_running_pid`` read the correct files. """ if not system: return env = _read_systemd_unit_environment(system=True) unit_home = env.get("HERMES_HOME", "").strip() if not unit_home: return current = os.environ.get("HERMES_HOME", "").strip() if current == unit_home: return os.environ["HERMES_HOME"] = unit_home def _read_systemd_unit_properties( system: bool = False, properties: tuple[str, ...] = ( "ActiveState", "SubState", "Result", "ExecMainStatus", "MainPID", ), ) -> dict[str, str]: """Return selected ``systemctl show`` properties for the gateway unit.""" selected_system = _select_systemd_scope(system) try: result = _run_systemctl( [ "show", get_service_name(), "--no-pager", "--property", ",".join(properties), ], system=selected_system, capture_output=True, text=True, timeout=10, ) except (RuntimeError, subprocess.TimeoutExpired, OSError): return {} if result.returncode != 0: return {} parsed: dict[str, str] = {} for line in result.stdout.splitlines(): if "=" not in line: continue key, value = line.split("=", 1) parsed[key] = value.strip() return parsed def _systemd_main_pid_from_props(props: dict[str, str]) -> int | None: try: pid = int(props.get("MainPID", "0") or "0") except (TypeError, ValueError): return None return pid if pid > 0 else None def _systemd_main_pid(system: bool = False) -> int | None: return _systemd_main_pid_from_props(_read_systemd_unit_properties(system=system)) def _read_gateway_runtime_status() -> dict | None: try: from gateway.status import read_runtime_status state = read_runtime_status() except Exception: return None return state if isinstance(state, dict) else None def _gateway_runtime_status_for_pid(pid: int | None) -> dict | None: if not pid: return None state = _read_gateway_runtime_status() if not state: return None try: state_pid = int(state.get("pid", 0) or 0) except (TypeError, ValueError): return None return state if state_pid == pid else None def _wait_for_systemd_service_restart( *, system: bool = False, previous_pid: int | None = None, timeout: float = 60.0, ) -> bool: """Wait for the gateway service to become active after a restart handoff.""" import time svc = get_service_name() scope_label = _service_scope_label(system).capitalize() deadline = time.monotonic() + timeout printed_runtime_wait = False while time.monotonic() < deadline: props = _read_systemd_unit_properties(system=system) active_state = props.get("ActiveState", "") sub_state = props.get("SubState", "") new_pid = None try: from gateway.status import get_running_pid new_pid = get_running_pid() except Exception: new_pid = None if not new_pid: new_pid = _systemd_main_pid_from_props(props) if active_state == "active": if new_pid and (previous_pid is None or new_pid != previous_pid): runtime_state = _gateway_runtime_status_for_pid(new_pid) gateway_state = (runtime_state or {}).get("gateway_state") if gateway_state == "running": print(f"✓ {scope_label} service restarted (PID {new_pid})") return True if gateway_state == "startup_failed": reason = (runtime_state or {}).get( "exit_reason" ) or "startup failed" print( f"⚠ {scope_label} service process restarted (PID {new_pid}), but gateway startup failed: {reason}" ) return False if not printed_runtime_wait: print( f"⏳ {scope_label} service process started (PID {new_pid}); waiting for gateway runtime..." ) printed_runtime_wait = True if active_state == "activating" and sub_state == "auto-restart": time.sleep(1) continue if _systemd_unit_is_start_limited(props): _print_systemd_start_limit_wait(system=system) return False time.sleep(2) print( f"⚠ {scope_label} service did not become active within {int(timeout)}s.\n" f" Check status: {'sudo ' if system else ''}hermes gateway status\n" f" Check logs: journalctl {'--user ' if not system else ''}-u {svc} -l --since '2 min ago'" ) return False def _systemd_unit_is_start_limited(props: dict[str, str]) -> bool: result = props.get("Result", "").lower() sub_state = props.get("SubState", "").lower() return result == "start-limit-hit" or sub_state == "start-limit-hit" def _systemd_error_indicates_start_limit(exc: subprocess.CalledProcessError) -> bool: parts: list[str] = [] for attr in ("stderr", "stdout", "output"): value = getattr(exc, attr, None) if not value: continue if isinstance(value, bytes): value = value.decode(errors="replace") parts.append(str(value)) text = "\n".join(parts).lower() return ( "start-limit-hit" in text or "start request repeated too quickly" in text or "start-limit" in text ) def _systemd_service_is_start_limited(system: bool = False) -> bool: return _systemd_unit_is_start_limited(_read_systemd_unit_properties(system=system)) def _print_systemd_start_limit_wait(system: bool = False) -> None: svc = get_service_name() scope_label = _service_scope_label(system).capitalize() scope_flag = " --system" if system else "" systemctl_prefix = "systemctl " if system else "systemctl --user " journal_prefix = "journalctl " if system else "journalctl --user " print(f"⏳ {scope_label} service is temporarily rate-limited by systemd.") print(" systemd is refusing another immediate start after repeated exits.") print( f" Wait for the start-limit window to expire, then run: {'sudo ' if system else ''}hermes gateway restart{scope_flag}" ) print(f" Or clear the failed state manually: {systemctl_prefix}reset-failed {svc}") print(f" Check logs: {journal_prefix}-u {svc} -l --since '5 min ago'") def _recover_pending_systemd_restart( system: bool = False, previous_pid: int | None = None ) -> bool: """Recover a planned service restart that is stuck in systemd state.""" props = _read_systemd_unit_properties(system=system) if not props: return False try: from gateway.status import read_runtime_status except Exception: return False runtime_state = read_runtime_status() or {} if not runtime_state.get("restart_requested"): return False active_state = props.get("ActiveState", "") sub_state = props.get("SubState", "") exec_main_status = props.get("ExecMainStatus", "") result = props.get("Result", "") if active_state == "activating" and sub_state == "auto-restart": print("⏳ Service restart already pending — waiting for systemd relaunch...") return _wait_for_systemd_service_restart( system=system, previous_pid=previous_pid, ) if active_state == "failed" and ( exec_main_status == str(GATEWAY_SERVICE_RESTART_EXIT_CODE) or result == "exit-code" ): svc = get_service_name() scope_label = _service_scope_label(system).capitalize() print( f"↻ Clearing failed state for pending {scope_label.lower()} service restart..." ) _run_systemctl( ["reset-failed", svc], system=system, check=False, timeout=30, ) _run_systemctl( ["start", svc], system=system, check=False, timeout=90, ) return _wait_for_systemd_service_restart( system=system, previous_pid=previous_pid, ) return False def _parse_launchd_pid_from_list_output(output: str) -> int | None: """Extract the PID from ``launchctl list