fix(gateway,windows): reliability — supervisor task, JOB breakaway, status --deep

Three coordinated fixes for the Windows gateway reliability story:

1. CREATE_BREAKAWAY_FROM_JOB on every detached spawn

   The 'hermes update' triggered from the Electron Desktop GUI ran inside
   Electron's job object. Without breakaway, the post-update gateway
   watcher spawned by update — already DETACHED_PROCESS — was still
   reaped when Electron's job tore down, so the gateway never came back
   after a GUI-initiated update. Adds CREATE_BREAKAWAY_FROM_JOB (0x01000000)
   to:
     - hermes_cli/_subprocess_compat.py::windows_detach_flags() — used by
       every helper that calls windows_detach_popen_kwargs(), including
       launch_detached_profile_gateway_restart()
     - The watcher subprocess's own respawn snippet in
       hermes_cli/gateway.py (inlined flags so the watcher's child
       respawn also breaks away)

   _spawn_detached() in gateway_windows.py already had the flag; this
   change brings the rest of the codebase to parity.

2. Per-minute supervisor Scheduled Task — Windows equivalent of
   systemd Restart=always

   Introduces hermes_cli/gateway_supervisor.py and registers it as a
   second Scheduled Task ('Hermes_Gateway_Supervisor', SC MINUTE /MO 1,
   LIMITED rights) alongside the existing ONLOGON task. Every minute,
   the supervisor uses the same gateway.status.get_running_pid() probe
   as 'hermes gateway status' and, if no gateway is alive, calls
   gateway_windows._spawn_detached() (which now includes BREAKAWAY) to
   bring one back.

   Covers every crash mode, not just 'machine rebooted': taskkill,
   OOM, GUI update SIGTERM, parent job teardown. Cheap — one pythonw
   startup per minute when down, one PID-existence check per minute
   when up.

   Wired into both the schtasks-success and Startup-folder-fallback
   install paths via _install_supervisor_best_effort(), and removed in
   uninstall(). Best-effort: a failing supervisor install logs a
   warning but doesn't roll back the primary install.

3. 'hermes gateway status --deep' shows per-probe PASS/FAIL

   Replaces the existing terse '--deep' output (which only printed
   paths) with an actual diagnostic table:
     [1] PID file present
     [2] Lock file held by a live process
     [3] get_running_pid() result
     [4] _pid_exists(pid) — OS-level liveness
     [5] gateway_state.json (state + age)
     [6] Last lifecycle event from gateway-exit-diag.log

   When the high-level summary disagrees with reality, the user can
   see exactly which signal is lying.

Test-leak fix
-------------

tests/hermes_cli/test_gateway_wsl.py::TestGatewayCommandWSLMessages
monkey-patched is_linux/is_wsl/supports_systemd_services to simulate
WSL but did NOT stub is_windows(). On a Windows host, the dispatcher
in _gateway_command_inner takes the is_windows() branch BEFORE the
WSL guidance branch, so the test invoked gateway_windows.install()
for real. install() writes to %APPDATA%\...\Startup\Hermes_Gateway.cmd
— the REAL user Startup folder, never sandboxed by tmp_path — pointing
at the test's pytest-of-<user>/pytest-<N>/.../gateway-service/ wrapper.
When pytest tore down the tmp_path, every subsequent Windows login
flashed a cmd.exe window that failed to find the missing target.

Stubs is_windows=False on all four affected tests:
  test_install_wsl_no_systemd
  test_start_wsl_no_systemd
  test_status_wsl_running_manual
  test_status_wsl_not_running

Defense-in-depth: _build_startup_launcher() now prefixes the launcher
with 'if not exist <target> exit /b 0', so any future stale Startup
entry silently no-ops instead of flashing a console window.

Status enhancements
-------------------

- status() now reports supervisor task presence alongside the existing
  schtasks/Startup info, and nudges the user to reinstall if the
  supervisor isn't registered.
- Deep mode dumps both the supervisor task name + script path.
This commit is contained in:
Teknium 2026-06-06 08:10:19 -07:00
parent 1c2189839d
commit c1e5fa433c
5 changed files with 594 additions and 8 deletions

View file

@ -97,6 +97,16 @@ def resolve_node_command(name: str, argv: Sequence[str]) -> list[str]:
_CREATE_NEW_PROCESS_GROUP = 0x00000200 _CREATE_NEW_PROCESS_GROUP = 0x00000200
_DETACHED_PROCESS = 0x00000008 _DETACHED_PROCESS = 0x00000008
_CREATE_NO_WINDOW = 0x08000000 _CREATE_NO_WINDOW = 0x08000000
# Escape any Win32 job object the parent process belongs to. Without this,
# a detached child still inherits its parent's job object membership, and
# when that parent (Electron, Tauri, Windows Terminal, the Desktop GUI's
# bootstrap-installer) dies, the OS tears down the whole job — taking the
# "detached" child with it. Critical for the post-update gateway watcher:
# Electron spawns the Tauri updater inside its own job, the updater spawns
# the watcher subprocess; without BREAKAWAY the watcher dies the instant
# Electron exits, so the gateway never gets respawned after a `hermes
# update` triggered from the GUI. See fix/windows-gateway-reliability.
_CREATE_BREAKAWAY_FROM_JOB = 0x01000000
def windows_detach_flags() -> int: def windows_detach_flags() -> int:
@ -116,10 +126,30 @@ def windows_detach_flags() -> int:
- ``CREATE_NO_WINDOW`` suppress the brief cmd flash that would - ``CREATE_NO_WINDOW`` suppress the brief cmd flash that would
otherwise appear when launching a console app. Redundant with otherwise appear when launching a console app. Redundant with
DETACHED_PROCESS but explicit for clarity. DETACHED_PROCESS but explicit for clarity.
- ``CREATE_BREAKAWAY_FROM_JOB`` escape any job object the parent is
in. Electron (Desktop app) and Tauri (bootstrap installer) wrap
their children in job objects; without breakaway, those children
die when the parent process exits even if they were spawned with
DETACHED_PROCESS. This was the missing flag that made the
post-update gateway respawn watcher silently die alongside the
Tauri updater after the Electron Desktop's update flow finished.
If a process is in a job that disallows breakaway (rare
JOB_OBJECT_LIMIT_BREAKAWAY_OK isn't set), CreateProcess returns
ERROR_ACCESS_DENIED. Python surfaces that as ``PermissionError``
on the ``subprocess.Popen`` call. Callers in this codebase already
wrap detached spawns in ``try/except OSError`` and fall back to a
cmd.exe wrapper, so the breakaway-denied case degrades gracefully
rather than crashing.
""" """
if not IS_WINDOWS: if not IS_WINDOWS:
return 0 return 0
return _CREATE_NEW_PROCESS_GROUP | _DETACHED_PROCESS | _CREATE_NO_WINDOW return (
_CREATE_NEW_PROCESS_GROUP
| _DETACHED_PROCESS
| _CREATE_NO_WINDOW
| _CREATE_BREAKAWAY_FROM_JOB
)
def windows_hide_flags() -> int: def windows_hide_flags() -> int:

View file

@ -664,6 +664,10 @@ def launch_detached_profile_gateway_restart(profile: str, old_pid: int) -> bool:
# Platform-appropriate detach for the respawned gateway. On POSIX # Platform-appropriate detach for the respawned gateway. On POSIX
# start_new_session=True maps to os.setsid; on Windows we need # start_new_session=True maps to os.setsid; on Windows we need
# explicit creationflags because start_new_session is a no-op there. # explicit creationflags because start_new_session is a no-op there.
# CREATE_BREAKAWAY_FROM_JOB is critical: the watcher itself may have
# been spawned inside a job object (Electron/Tauri parent), and
# without breakaway the respawned gateway would die when that job
# tears down. See _subprocess_compat.windows_detach_flags().
_popen_kwargs = { _popen_kwargs = {
"stdout": subprocess.DEVNULL, "stdout": subprocess.DEVNULL,
"stderr": subprocess.DEVNULL, "stderr": subprocess.DEVNULL,
@ -672,8 +676,12 @@ def launch_detached_profile_gateway_restart(profile: str, old_pid: int) -> bool:
_CREATE_NEW_PROCESS_GROUP = 0x00000200 _CREATE_NEW_PROCESS_GROUP = 0x00000200
_DETACHED_PROCESS = 0x00000008 _DETACHED_PROCESS = 0x00000008
_CREATE_NO_WINDOW = 0x08000000 _CREATE_NO_WINDOW = 0x08000000
_CREATE_BREAKAWAY_FROM_JOB = 0x01000000
_popen_kwargs["creationflags"] = ( _popen_kwargs["creationflags"] = (
_CREATE_NEW_PROCESS_GROUP | _DETACHED_PROCESS | _CREATE_NO_WINDOW _CREATE_NEW_PROCESS_GROUP
| _DETACHED_PROCESS
| _CREATE_NO_WINDOW
| _CREATE_BREAKAWAY_FROM_JOB
) )
else: else:
_popen_kwargs["start_new_session"] = True _popen_kwargs["start_new_session"] = True

View file

@ -0,0 +1,170 @@
"""Windows gateway supervisor — poll-and-respawn for crash recovery.
Invoked once per minute by the ``Hermes_Gateway_Supervisor`` Scheduled Task
(see :func:`hermes_cli.gateway_windows._install_supervisor_task`). Checks
whether the per-profile gateway is alive and, if not, spawns a detached
replacement using the same ``_spawn_detached()`` helper that
``hermes gateway start`` uses.
This is the Windows analogue of systemd's ``Restart=always``. It runs as the
logged-in user (LIMITED rights), needs no admin context, and exits silently
when the gateway is already up.
CLI:
pythonw -m hermes_cli.gateway_supervisor [--profile NAME]
Exit codes:
0 always (the supervisor is best-effort; never crash the schtasks parent)
Logging:
All actions are appended to ``$HERMES_HOME/logs/gateway-supervisor.log``.
The file is truncated to the last 2000 lines on each invocation to keep
it bounded.
"""
from __future__ import annotations
import argparse
import datetime
import os
import sys
import traceback
from pathlib import Path
_LOG_MAX_LINES = 2000
def _resolve_log_path() -> Path:
"""Return the supervisor log path under the current HERMES_HOME.
Imported lazily so a missing/broken hermes_constants import doesn't
crash the supervisor we fall back to a sensible default.
"""
try:
from hermes_constants import get_hermes_home
home = get_hermes_home()
except Exception:
# Best-effort fallback. The supervisor must never crash on import.
home = Path(os.environ.get("HERMES_HOME") or Path.home() / ".hermes")
log_dir = Path(home) / "logs"
return log_dir / "gateway-supervisor.log"
def _log(msg: str) -> None:
"""Append a timestamped line to the supervisor log.
Best-effort: a logging failure must never crash the supervisor.
"""
try:
log_path = _resolve_log_path()
log_path.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")
with open(log_path, "a", encoding="utf-8") as fh:
fh.write(f"[{ts}] {msg}\n")
except Exception:
pass
def _truncate_log() -> None:
"""Keep the supervisor log bounded (last ``_LOG_MAX_LINES`` lines).
Cheap to do once per invocation file is small and reads sequentially.
"""
try:
log_path = _resolve_log_path()
if not log_path.exists():
return
with open(log_path, "r", encoding="utf-8", errors="replace") as fh:
lines = fh.readlines()
if len(lines) <= _LOG_MAX_LINES:
return
tail = lines[-_LOG_MAX_LINES:]
tmp = log_path.with_suffix(log_path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as fh:
fh.writelines(tail)
tmp.replace(log_path)
except Exception:
pass
def _parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
prog="hermes_cli.gateway_supervisor",
description="Windows poll-and-respawn supervisor for the Hermes gateway.",
)
# ``--profile`` is accepted for parity with the main CLI but the
# supervisor relies on HERMES_HOME being correctly set in the
# scheduled-task environment; the .cmd wrapper sets it explicitly.
parser.add_argument("--profile", default=None)
return parser.parse_args(argv)
def _gateway_is_alive() -> tuple[bool, int | None]:
"""Probe whether a gateway is running for the current HERMES_HOME.
Returns ``(alive, pid)``. ``pid`` is the running gateway PID when
``alive`` is True, else None.
"""
try:
from gateway.status import get_running_pid
except Exception as exc:
_log(f"probe import failure: {exc!r}")
return (True, None) # Conservative: don't try to respawn if we can't probe
try:
pid = get_running_pid(cleanup_stale=False)
except Exception as exc:
_log(f"get_running_pid raised: {exc!r}")
return (True, None)
return (pid is not None, pid)
def _respawn() -> int | None:
"""Spawn a fresh detached gateway. Return the PID, or None on failure."""
try:
from hermes_cli import gateway_windows
except Exception as exc:
_log(f"gateway_windows import failure: {exc!r}")
return None
try:
return gateway_windows._spawn_detached()
except Exception as exc:
_log(f"_spawn_detached raised: {exc!r}\n{traceback.format_exc()}")
return None
def main(argv: list[str] | None = None) -> int:
"""Entrypoint for ``pythonw -m hermes_cli.gateway_supervisor``."""
try:
_parse_args(argv)
except SystemExit:
# argparse exits with 2 on --help / bad args. The schtasks parent
# should still see 0 so the task doesn't get marked "last run failed".
return 0
except Exception as exc:
_log(f"argparse raised: {exc!r}")
return 0
_truncate_log()
try:
alive, pid = _gateway_is_alive()
if alive:
# Stay quiet on healthy ticks to keep the log small.
return 0
_log("gateway is down; respawning")
new_pid = _respawn()
if new_pid is None:
_log("respawn failed; will retry on next tick")
else:
_log(f"respawned gateway pid={new_pid}")
except Exception as exc:
_log(f"supervisor uncaught: {exc!r}\n{traceback.format_exc()}")
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

View file

@ -380,13 +380,26 @@ def _build_gateway_cmd_script(
def _build_startup_launcher(script_path: Path) -> str: def _build_startup_launcher(script_path: Path) -> str:
"""The tiny .cmd that goes in the Startup folder. Just minimizes and chains.""" """The tiny .cmd that goes in the Startup folder. Just minimizes and chains.
Defense-in-depth: bail out silently if the target script is gone. Test
fixtures historically wrote Startup entries pointing at pytest tmp_path
directories that vanish after the test session. Without the existence
guard, every subsequent Windows login flashes a cmd.exe window that
fails to find the target. The check + ``exit /b 0`` keeps that case
silent.
"""
quoted_target = _quote_cmd_script_arg(str(script_path))
lines = [ lines = [
"@echo off", "@echo off",
f"rem {_TASK_DESCRIPTION}", f"rem {_TASK_DESCRIPTION}",
# If the wrapper script is gone (typical for stale entries from
# uninstalled/migrated installs), silently no-op instead of
# flashing a cmd window with a "file not found" error.
f"if not exist {quoted_target} exit /b 0",
# ``start "" /min`` detaches with a minimized console window. # ``start "" /min`` detaches with a minimized console window.
# ``/d /c`` on cmd.exe skips AUTORUN and runs the target script once. # ``/d /c`` on cmd.exe skips AUTORUN and runs the target script once.
f'start "" /min cmd.exe /d /c {_quote_cmd_script_arg(str(script_path))}', f'start "" /min cmd.exe /d /c {quoted_target}',
] ]
return "\r\n".join(lines) + "\r\n" return "\r\n".join(lines) + "\r\n"
@ -478,6 +491,188 @@ def _install_scheduled_task(task_name: str, script_path: Path) -> tuple[bool, st
return (False, f"schtasks /Create failed (code {last_code}): {last_err.strip()}") return (False, f"schtasks /Create failed (code {last_code}): {last_err.strip()}")
# ---------------------------------------------------------------------------
# Supervisor task: poll-and-respawn for crash recovery
# ---------------------------------------------------------------------------
#
# The ONLOGON Scheduled Task above only fires when a user logs in. That covers
# the "reboot" case but NOT the "gateway crashed mid-session" or "hermes update
# SIGTERM'd the gateway and the watcher failed to respawn" cases. On Linux,
# systemd's ``Restart=always`` handles that. The Windows equivalent has to be
# built — Scheduled Tasks can include a ``<RestartOnFailure>`` element, but
# that only triggers on non-zero exit codes, not on external taskkill, and
# requires the XML schema (no /sc flag shortcut).
#
# Instead we install a SECOND Scheduled Task that runs every minute, checks
# whether the gateway is alive via the PID-file / lock-file probes, and
# spawns it again if not. Cheap (one pythonw startup per minute when down,
# one PID-existence check per minute when up), no extra resident process,
# survives every crash mode equally.
def get_supervisor_task_name() -> str:
"""Per-profile name for the poll-and-respawn supervisor task."""
_assert_windows()
from hermes_cli.gateway import _profile_suffix
suffix = _profile_suffix()
if not suffix:
return f"{_TASK_NAME_DEFAULT}_Supervisor"
return f"{_TASK_NAME_DEFAULT}_Supervisor_{suffix}"
def get_supervisor_script_path() -> Path:
"""Path to the supervisor .cmd that the per-minute task invokes."""
_assert_windows()
from hermes_cli.config import get_hermes_home
script_dir = Path(get_hermes_home()) / "gateway-service"
script_dir.mkdir(parents=True, exist_ok=True)
return script_dir / f"{_sanitize_filename(get_supervisor_task_name())}.cmd"
def _build_supervisor_cmd_script(
python_path: str,
working_dir: str,
hermes_home: str,
profile_arg: str,
) -> str:
"""Render the supervisor .cmd.
Invokes ``pythonw -m hermes_cli.gateway_supervisor`` which re-uses the
same get_running_pid() truth source ``hermes gateway status`` uses, and
only spawns a new gateway if the probe says no gateway is alive. Pieces:
- ``pythonw.exe`` no console flash every minute.
- Output redirected to NUL; the supervisor module writes its own log.
- The respawn calls ``_spawn_detached()`` which uses
CREATE_BREAKAWAY_FROM_JOB, so the respawned gateway is independent
of the schtasks-spawned probe and of any job object the probe is in.
"""
pythonw_path = _derive_venv_pythonw(python_path)
venv_dir = str(Path(python_path).resolve().parent.parent)
lines = ["@echo off", f"rem {_TASK_DESCRIPTION} (supervisor)"]
lines.append(f"cd /d {_quote_cmd_script_arg(working_dir)}")
lines.append(f'set "HERMES_HOME={hermes_home}"')
lines.append('set "PYTHONIOENCODING=utf-8"')
lines.append(f'set "VIRTUAL_ENV={venv_dir}"')
prog_args = [pythonw_path, "-m", "hermes_cli.gateway_supervisor"]
if profile_arg:
# Pass the same --profile X argv the rest of the install path uses,
# so the supervisor sees the right HERMES_HOME / config.
prog_args.extend(profile_arg.split())
lines.append(
" ".join(_quote_cmd_script_arg(a) for a in prog_args) + " >NUL 2>&1"
)
lines.append("exit /b 0")
return "\r\n".join(lines) + "\r\n"
def _write_supervisor_script() -> Path:
"""Generate and write the supervisor .cmd. Return its absolute path."""
_assert_windows()
from hermes_cli.config import get_hermes_home
from hermes_cli.gateway import (
PROJECT_ROOT,
_profile_arg,
get_python_path,
)
python_path = get_python_path()
working_dir = _stable_gateway_working_dir(PROJECT_ROOT)
hermes_home = str(Path(get_hermes_home()).resolve())
profile_arg = _profile_arg(hermes_home)
content = _build_supervisor_cmd_script(python_path, working_dir, hermes_home, profile_arg)
script_path = get_supervisor_script_path()
tmp = script_path.with_suffix(".tmp")
tmp.write_text(content, encoding="utf-8", newline="")
tmp.replace(script_path)
return script_path
def _install_supervisor_task(task_name: str, script_path: Path) -> tuple[bool, str]:
"""Create/replace the per-minute supervisor Scheduled Task.
Trigger: ``MINUTE /MO 1`` fires once a minute starting at install.
Run level: LIMITED (no admin context needed; probe + detached spawn
work as the logged-in user).
Returns (success, detail).
"""
quoted_script = _quote_schtasks_arg(str(script_path))
delete_code, delete_out, delete_err = _exec_schtasks(["/Delete", "/F", "/TN", task_name])
delete_detail = (delete_err or delete_out or "").strip()
if delete_code != 0 and delete_detail and "cannot find" not in delete_detail.lower():
if _is_access_denied(delete_detail):
return (False, f"schtasks /Delete failed (code {delete_code}): {delete_detail}")
base = [
"/Create",
"/F",
"/SC",
"MINUTE",
"/MO",
"1",
"/RL",
"LIMITED",
"/TN",
task_name,
"/TR",
quoted_script,
]
user = _resolve_task_user()
variants = []
if user:
variants.append([*base, "/RU", user, "/NP", "/IT"])
variants.append(base)
last_code = 1
last_err = ""
for argv in variants:
code, out, err = _exec_schtasks(argv)
if code == 0:
return (True, f"Created Scheduled Task {task_name!r} (every 1 min)")
last_code, last_err = code, (err or out or "")
return (False, f"schtasks /Create failed (code {last_code}): {last_err.strip()}")
def is_supervisor_task_registered() -> bool:
"""True iff the per-minute supervisor task exists."""
code, _out, _err = _exec_schtasks(["/Query", "/TN", get_supervisor_task_name()])
return code == 0
def _install_supervisor_best_effort() -> bool:
"""Write the supervisor script + register the per-minute Scheduled Task.
Best-effort: any failure is printed as a warning, never raised. The
primary install path completed before this is called, so the user
still has working autostart even if supervisor registration fails
(e.g. on locked-down task ACLs).
Returns True on success, False on any failure.
"""
try:
script_path = _write_supervisor_script()
except Exception as exc:
print(f"⚠ Could not write gateway supervisor script: {exc}")
print(" (Login autostart still installed. Reliability: every-minute respawn disabled.)")
return False
ok, detail = _install_supervisor_task(get_supervisor_task_name(), script_path)
if ok:
print(f"{detail} — gateway will auto-respawn within 60s of any crash.")
return True
print(f"⚠ Could not register gateway supervisor: {detail}")
print(" (Login autostart still installed. Reliability: every-minute respawn disabled.)")
return False
def _install_startup_entry(script_path: Path) -> Path: def _install_startup_entry(script_path: Path) -> Path:
"""Write the Startup-folder fallback launcher. Returns its path.""" """Write the Startup-folder fallback launcher. Returns its path."""
entry = get_startup_entry_path() entry = get_startup_entry_path()
@ -710,6 +905,7 @@ def _install_startup_fallback(script_path: Path, start_now: bool, detail: str) -
entry = _install_startup_entry(script_path) entry = _install_startup_entry(script_path)
print(f"✓ Installed Windows login item: {entry}") print(f"✓ Installed Windows login item: {entry}")
print(f" Task script: {script_path}") print(f" Task script: {script_path}")
_install_supervisor_best_effort()
# Re-running `hermes -p <profile> gateway install` must be safe. # Re-running `hermes -p <profile> gateway install` must be safe.
# Startup-folder fallback only installs login persistence. Starting is # Startup-folder fallback only installs login persistence. Starting is
@ -792,6 +988,7 @@ def install(
print(f"{detail}") print(f"{detail}")
print(f" Task script: {script_path}") print(f" Task script: {script_path}")
print(" Gateway auto-start installed for Windows login.") print(" Gateway auto-start installed for Windows login.")
_install_supervisor_best_effort()
if start_now: if start_now:
running_pids = _gateway_pids() running_pids = _gateway_pids()
if running_pids: if running_pids:
@ -832,6 +1029,7 @@ def install(
entry = _install_startup_entry(script_path) entry = _install_startup_entry(script_path)
print(f"✓ Installed Windows login item: {entry}") print(f"✓ Installed Windows login item: {entry}")
print(f" Task script: {script_path}") print(f" Task script: {script_path}")
_install_supervisor_best_effort()
# Re-running `hermes -p <profile> gateway install` must be safe. # Re-running `hermes -p <profile> gateway install` must be safe.
# Startup-folder fallback only installs login persistence. Starting is # Startup-folder fallback only installs login persistence. Starting is
@ -900,9 +1098,23 @@ def uninstall() -> None:
"""Remove both the Scheduled Task and the Startup-folder fallback, if present.""" """Remove both the Scheduled Task and the Startup-folder fallback, if present."""
_assert_windows() _assert_windows()
task_name = get_task_name() task_name = get_task_name()
supervisor_name = get_supervisor_task_name()
script_path = get_task_script_path() script_path = get_task_script_path()
supervisor_script = get_supervisor_script_path()
startup_entry = get_startup_entry_path() startup_entry = get_startup_entry_path()
# Remove the supervisor first so it can't try to respawn the gateway we're
# about to clean up. Best-effort: schtasks /Delete on a non-existent task
# returns nonzero, which we treat as "already gone".
if is_supervisor_task_registered():
code, _out, err = _exec_schtasks(["/Delete", "/F", "/TN", supervisor_name])
if code == 0:
print(f"✓ Removed supervisor task {supervisor_name!r}")
else:
detail = (err or "").strip()
if "cannot find" not in detail.lower():
print(f"⚠ schtasks /Delete supervisor returned code {code}: {detail}")
scheduled_task_removed = False scheduled_task_removed = False
if is_task_registered(): if is_task_registered():
code, _out, err = _exec_schtasks(["/Delete", "/F", "/TN", task_name]) code, _out, err = _exec_schtasks(["/Delete", "/F", "/TN", task_name])
@ -926,7 +1138,11 @@ def uninstall() -> None:
else: else:
print(f"⚠ schtasks /Delete returned code {code}: {detail}") print(f"⚠ schtasks /Delete returned code {code}: {detail}")
for path, label in [(startup_entry, "Windows login item"), (script_path, "Task script")]: for path, label in [
(startup_entry, "Windows login item"),
(script_path, "Task script"),
(supervisor_script, "Supervisor script"),
]:
try: try:
path.unlink() path.unlink()
print(f"✓ Removed {label}: {path}") print(f"✓ Removed {label}: {path}")
@ -984,12 +1200,146 @@ def _gateway_pids() -> list[int]:
return list(find_gateway_pids()) return list(find_gateway_pids())
def _print_deep_probes() -> None:
"""Print PASS/FAIL per individual probe of gateway liveness.
The default ``status`` output collapses several signals into one
/ line, which is great when they agree and confusing when they
don't. The deep-probe block shows each underlying check independently
so the user can see exactly which signal is wrong.
Probes:
[1] PID file present
[2] Lock file present and held by some process
[3] gateway.status.get_running_pid() returns a PID
[4] _pid_exists(pid) OS confirms the process is alive
[5] gateway_state.json exists and parses (and is fresh-ish)
[6] Last lifecycle event in gateway-exit-diag.log
"""
import json
from datetime import datetime, timezone
from hermes_cli.config import get_hermes_home
home = Path(get_hermes_home()).resolve()
pid_path = home / "gateway.pid"
lock_path = home / "gateway.lock"
state_path = home / "gateway_state.json"
diag_path = home / "logs" / "gateway-exit-diag.log"
print()
print("Deep probes:")
def _mark(ok: bool) -> str:
return "PASS" if ok else "FAIL"
# [1] PID file
pid_exists = pid_path.exists()
pid_value: int | None = None
if pid_exists:
try:
data = json.loads(pid_path.read_text(encoding="utf-8"))
pid_value = int(data.get("pid")) if data.get("pid") is not None else None
print(f" [1] {_mark(True):4s} PID file present: {pid_path} (pid={pid_value})")
except Exception as exc:
print(f" [1] {_mark(False):4s} PID file present but unreadable: {exc}")
else:
print(f" [1] {_mark(False):4s} PID file missing: {pid_path}")
# [2] Lock file present + held
lock_held = False
lock_present = lock_path.exists()
if lock_present:
try:
from gateway.status import is_gateway_runtime_lock_active
lock_held = is_gateway_runtime_lock_active(lock_path)
print(f" [2] {_mark(lock_held):4s} Lock file held by a live process: {lock_path}")
except Exception as exc:
print(f" [2] {_mark(False):4s} Could not probe lock: {exc}")
else:
print(f" [2] {_mark(False):4s} Lock file missing: {lock_path}")
# [3] get_running_pid()
running_pid: int | None = None
try:
from gateway.status import get_running_pid
running_pid = get_running_pid(cleanup_stale=False)
print(f" [3] {_mark(running_pid is not None):4s} get_running_pid() => {running_pid}")
except Exception as exc:
print(f" [3] {_mark(False):4s} get_running_pid() raised: {exc!r}")
# [4] _pid_exists() on the probed PID
candidate_pid = running_pid if running_pid is not None else pid_value
if candidate_pid is not None:
try:
from gateway.status import _pid_exists
alive = bool(_pid_exists(candidate_pid))
print(f" [4] {_mark(alive):4s} _pid_exists({candidate_pid}) => {alive}")
except Exception as exc:
print(f" [4] {_mark(False):4s} _pid_exists raised: {exc!r}")
else:
print(f" [4] {_mark(False):4s} No candidate PID to verify")
# [5] runtime status file
if state_path.exists():
try:
state_data = json.loads(state_path.read_text(encoding="utf-8"))
gateway_state = state_data.get("gateway_state")
updated_at = state_data.get("updated_at")
age_str = ""
if updated_at:
try:
updated_dt = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
age_seconds = int((now - updated_dt).total_seconds())
age_str = f" (updated {age_seconds}s ago)"
except Exception:
pass
ok = gateway_state == "running"
print(f" [5] {_mark(ok):4s} gateway_state.json state={gateway_state!r}{age_str}")
except Exception as exc:
print(f" [5] {_mark(False):4s} gateway_state.json present but unreadable: {exc}")
else:
print(f" [5] {_mark(False):4s} gateway_state.json missing: {state_path}")
# [6] Last lifecycle event from the exit-diag log
if diag_path.exists():
try:
with open(diag_path, "rb") as fh:
# Read last ~4KB; one event is well under 500 bytes.
fh.seek(0, 2)
size = fh.tell()
fh.seek(max(0, size - 4096))
tail = fh.read().decode("utf-8", errors="replace").splitlines()
last_event = next((ln for ln in reversed(tail) if ln.strip()), "")
if last_event:
try:
event = json.loads(last_event)
tag = event.get("tag", "?")
pid = event.get("pid", "?")
ts = event.get("ts", "?")
healthy = tag in ("gateway.start",)
print(f" [6] {_mark(healthy):4s} Last lifecycle event: tag={tag} pid={pid} ts={ts}")
except Exception:
print(f" [6] {_mark(False):4s} Last lifecycle line not JSON: {last_event[:120]}")
else:
print(f" [6] {_mark(False):4s} exit-diag log empty: {diag_path}")
except Exception as exc:
print(f" [6] {_mark(False):4s} exit-diag log unreadable: {exc}")
else:
print(f" [6] {_mark(False):4s} exit-diag log missing: {diag_path}")
def status(deep: bool = False) -> None: def status(deep: bool = False) -> None:
"""Print a status report for the Windows gateway service.""" """Print a status report for the Windows gateway service."""
_assert_windows() _assert_windows()
task_name = get_task_name() task_name = get_task_name()
task_installed = is_task_registered() task_installed = is_task_registered()
startup_installed = is_startup_entry_installed() startup_installed = is_startup_entry_installed()
supervisor_installed = is_supervisor_task_registered()
pids = _gateway_pids() pids = _gateway_pids()
if task_installed: if task_installed:
@ -1004,6 +1354,12 @@ def status(deep: bool = False) -> None:
else: else:
print("✗ Gateway service not installed") print("✗ Gateway service not installed")
if supervisor_installed:
print(f"✓ Supervisor task registered: {get_supervisor_task_name()} (every 1 min)")
elif task_installed or startup_installed:
print(f" Supervisor not registered — reinstall to enable auto-respawn:")
print(f" hermes gateway install")
if pids: if pids:
print(f"✓ Gateway process running (PID: {', '.join(map(str, pids))})") print(f"✓ Gateway process running (PID: {', '.join(map(str, pids))})")
else: else:
@ -1011,9 +1367,14 @@ def status(deep: bool = False) -> None:
if deep: if deep:
print() print()
print(f" Task name: {task_name}") print(f" Task name: {task_name}")
print(f" Task script: {get_task_script_path()}") print(f" Task script: {get_task_script_path()}")
print(f" Startup entry: {get_startup_entry_path()}") print(f" Startup entry: {get_startup_entry_path()}")
print(f" Supervisor task: {get_supervisor_task_name()}")
print(f" Supervisor script:{get_supervisor_script_path()}")
# Surface the per-probe truth so the user can see *which* signal
# is lying when the high-level summary disagrees with reality.
_print_deep_probes()
if not task_installed and not startup_installed and not pids: if not task_installed and not startup_installed and not pids:
print() print()

View file

@ -167,6 +167,14 @@ class TestGatewayCommandWSLMessages:
monkeypatch.setattr(gateway, "supports_systemd_services", lambda: False) monkeypatch.setattr(gateway, "supports_systemd_services", lambda: False)
monkeypatch.setattr(gateway, "is_macos", lambda: False) monkeypatch.setattr(gateway, "is_macos", lambda: False)
monkeypatch.setattr(gateway, "is_managed", lambda: False) monkeypatch.setattr(gateway, "is_managed", lambda: False)
# CRITICAL: also stub is_windows. Without this, running this test on a
# real Windows host falls through to the is_windows() branch *before*
# the WSL guidance branch, invoking gateway_windows.install() which
# writes a Startup-folder .cmd into the real user's Startup folder
# (NOT tmp_path) pointing at a now-vanished pytest fixture path.
# The user then sees a broken Hermes_Gateway.cmd flash a cmd.exe
# window on every login. See fix/windows-gateway-reliability.
monkeypatch.setattr(gateway, "is_windows", lambda: False)
args = SimpleNamespace( args = SimpleNamespace(
gateway_command="install", force=False, system=False, gateway_command="install", force=False, system=False,
@ -189,6 +197,10 @@ class TestGatewayCommandWSLMessages:
monkeypatch.setattr(gateway, "is_wsl", lambda: True) monkeypatch.setattr(gateway, "is_wsl", lambda: True)
monkeypatch.setattr(gateway, "supports_systemd_services", lambda: False) monkeypatch.setattr(gateway, "supports_systemd_services", lambda: False)
monkeypatch.setattr(gateway, "is_macos", lambda: False) monkeypatch.setattr(gateway, "is_macos", lambda: False)
# See test_install_wsl_no_systemd: stub is_windows so a Windows host
# running this test does NOT actually spawn a detached gateway via
# gateway_windows.start().
monkeypatch.setattr(gateway, "is_windows", lambda: False)
args = SimpleNamespace(gateway_command="start", system=False) args = SimpleNamespace(gateway_command="start", system=False)
with pytest.raises(SystemExit) as exc_info: with pytest.raises(SystemExit) as exc_info:
@ -206,6 +218,9 @@ class TestGatewayCommandWSLMessages:
monkeypatch.setattr(gateway, "is_macos", lambda: False) monkeypatch.setattr(gateway, "is_macos", lambda: False)
monkeypatch.setattr(gateway, "is_termux", lambda: False) monkeypatch.setattr(gateway, "is_termux", lambda: False)
monkeypatch.setattr(gateway, "is_wsl", lambda: True) monkeypatch.setattr(gateway, "is_wsl", lambda: True)
# Stub is_windows so a Windows host running this test does NOT take
# the Windows status branch (which reads gateway_windows.is_installed()).
monkeypatch.setattr(gateway, "is_windows", lambda: False)
monkeypatch.setattr(gateway, "find_gateway_pids", lambda: [12345]) monkeypatch.setattr(gateway, "find_gateway_pids", lambda: [12345])
monkeypatch.setattr(gateway, "_runtime_health_lines", lambda: []) monkeypatch.setattr(gateway, "_runtime_health_lines", lambda: [])
# Stub out the systemd unit path check # Stub out the systemd unit path check
@ -231,6 +246,8 @@ class TestGatewayCommandWSLMessages:
monkeypatch.setattr(gateway, "is_macos", lambda: False) monkeypatch.setattr(gateway, "is_macos", lambda: False)
monkeypatch.setattr(gateway, "is_termux", lambda: False) monkeypatch.setattr(gateway, "is_termux", lambda: False)
monkeypatch.setattr(gateway, "is_wsl", lambda: True) monkeypatch.setattr(gateway, "is_wsl", lambda: True)
# See test_status_wsl_running_manual.
monkeypatch.setattr(gateway, "is_windows", lambda: False)
monkeypatch.setattr(gateway, "find_gateway_pids", lambda: []) monkeypatch.setattr(gateway, "find_gateway_pids", lambda: [])
monkeypatch.setattr(gateway, "_runtime_health_lines", lambda: []) monkeypatch.setattr(gateway, "_runtime_health_lines", lambda: [])
monkeypatch.setattr( monkeypatch.setattr(