From 10ee4a729ba22c00718ca12d5192a52e56fb5a09 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 28 May 2026 03:25:32 -0700 Subject: [PATCH] fix(gateway): drain on Windows `hermes gateway stop` so sessions survive restart (#33798) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sessions now survive `hermes gateway stop` / `restart` on native Windows. Previously the gateway died on schtasks `/End` + os.kill SIGTERM without ever running the drain loop, so the v0.13.0 session-resume feature (#21192) silently broke on Windows: `resume_pending=True` was never written, and the next boot started with a blank conversation history (issue #33778). Root cause is twofold and the reporter only identified half of it: 1. `hermes_cli/gateway_windows.py::stop()` did not write the `planned_stop_marker` before signalling. The reporter caught this. 2. The bigger reason: `asyncio.add_signal_handler` raises NotImplementedError for SIGTERM/SIGINT on Windows, so even if the marker had been written, the gateway's existing SIGTERM handler (which is what calls `runner.stop()` and the `mark_resume_pending` loop) was never invoked. Writing the marker would have been necessary-but-insufficient. The fix has two parts: * gateway/run.py: new `_run_planned_stop_watcher` daemon thread polls for the planned-stop marker file every 0.5s. When the marker appears it `loop.call_soon_threadsafe(shutdown_signal_handler, None)` — the same shutdown path a real SIGTERM would have driven, including the pre-drain `mark_resume_pending` writes (run.py:5977) and graceful drain wait. The existing signal handler already accepts `received_signal=None` and falls through to `consume_planned_stop_marker_for_self()`, so no handler changes needed. Runs on every platform as cheap belt-and-suspenders. * hermes_cli/gateway_windows.py: `stop()` now writes the marker for the running gateway PID and waits up to `agent.restart_drain_timeout` (default 30s) for the PID to exit cleanly. On clean drain, the kill sweep is non-forceful; on timeout, escalates to `kill_gateway_processes(force=True)` which routes to taskkill /T /F per `references/windows-native-support.md`. Validation: * 7 new tests in tests/gateway/test_planned_stop_watcher.py covering: marker→handler dispatch, no-marker idle, already-draining skip, not-yet-running skip, stop_event responsiveness, fire-once semantics, error tolerance. * 8 new tests in tests/hermes_cli/test_gateway_windows.py covering: marker-before-kill ordering, clean-drain skips force-kill, drain-timeout escalates to force=True, no-pid-skips-drain, invalid-pid handling, fast-exit success, timeout failure, marker-write-failure tolerance. * E2E (Linux, detached orphan): write_planned_stop_marker(pid) + `_drain_gateway_pid(pid, 5.0)` returns True in 0.5s after the victim sees the marker and exits. Tested with a double-forked subprocess so the test parent isn't holding it as a zombie. * Targeted: tests/gateway/{restart_drain,restart_resume_pending, signal,signal_format,status,shutdown_forensics,approve_deny_commands, planned_stop_watcher} + tests/hermes_cli/{gateway_windows, gateway_service} → 519/519. What was wrong with the reporter's claim (for future archaeology): they described the symptom as "no `resume_pending=True` written to `sessions.json`" — but Hermes uses `state.db` (SQLite), not `sessions.json`, and `mark_resume_pending` is called regardless of the marker (the marker only affects exit code 0 vs 1 for systemd revival semantics). The real session-loss path is the missing drain on Windows, not a missing marker. Both halves are fixed here. Closes #33778. --- gateway/run.py | 93 ++++++- hermes_cli/gateway_windows.py | 79 +++++- tests/gateway/test_planned_stop_watcher.py | 267 +++++++++++++++++++++ tests/hermes_cli/test_gateway_windows.py | 219 ++++++++++++++++- 4 files changed, 649 insertions(+), 9 deletions(-) create mode 100644 tests/gateway/test_planned_stop_watcher.py diff --git a/gateway/run.py b/gateway/run.py index 54dcebda3e6..057d15cab91 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -18193,6 +18193,72 @@ class GatewayRunner: return response +def _run_planned_stop_watcher( + stop_event: threading.Event, + runner, + loop: asyncio.AbstractEventLoop, + shutdown_handler, + *, + poll_interval: float = 0.5, +) -> None: + """Poll for the planned-stop marker and trigger graceful shutdown. + + On Windows, ``asyncio.add_signal_handler`` raises NotImplementedError + for SIGTERM/SIGINT, so the standard signal-driven shutdown path + never runs when ``hermes gateway stop`` signals the gateway. The + consequence is that the drain loop is skipped — in-flight agent + sessions are killed mid-turn and ``resume_pending`` is never set, + so the next gateway boot has no idea those sessions need to be + auto-resumed (issue #33778, v0.13.0 session-resume feature broken + on native Windows). + + This watcher runs on every platform (cheap, defensive) and bridges + the gap on Windows by translating a filesystem marker into the + same shutdown-handler invocation a real SIGTERM would have produced + on POSIX. The CLI's ``hermes_cli.gateway_windows.stop()`` writes + the marker via ``write_planned_stop_marker(pid)`` and then waits + for the gateway PID to exit; this watcher is what makes that + exit happen cleanly. + + On POSIX this is a no-op safety net — the signal handler always + races us to consuming the marker file because it fires synchronously + from the kernel's signal delivery. + + Args: + stop_event: cleared by start_gateway() during normal shutdown + to tell the watcher to exit. + runner: the GatewayRunner instance; we check ``_running`` and + ``_draining`` to avoid triggering shutdown if the gateway + is already in one of those states. + loop: the asyncio event loop the shutdown handler must run on. + shutdown_handler: same callable that's wired to SIGTERM — + tolerates a ``None`` signal argument (planned stop case) + and consumes the marker via + ``consume_planned_stop_marker_for_self()``. + poll_interval: seconds between marker checks. 0.5s gives a + responsive shutdown without burning CPU. + """ + from gateway.status import _get_planned_stop_marker_path + marker_path = _get_planned_stop_marker_path() + while not stop_event.is_set(): + try: + if ( + marker_path.exists() + and not getattr(runner, "_draining", False) + and getattr(runner, "_running", False) + ): + # Drive the same path as a real signal handler. + # Pass signal=None — the handler tolerates that and consumes + # the marker via consume_planned_stop_marker_for_self, + # which also validates target_pid + start_time match us. + loop.call_soon_threadsafe(shutdown_handler, None) + # Done — the handler will set _draining; we exit on next tick. + break + except Exception as _e: + logger.debug("Planned-stop watcher tick error: %s", _e) + stop_event.wait(poll_interval) + + def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, interval: int = 60): """ Background thread that ticks the cron scheduler at a regular interval. @@ -18597,7 +18663,28 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = pass else: logger.info("Skipping signal handlers (not running in main thread).") - + + # Windows fallback: asyncio.add_signal_handler raises NotImplementedError + # on Windows, so `hermes gateway stop`'s SIGTERM (which Python maps to + # TerminateProcess on Windows) never invokes shutdown_signal_handler. + # That means the drain loop never runs, mark_resume_pending never fires, + # and sessions are silently lost across restarts (issue #33778). + # + # The fix is a marker-polling thread: `hermes gateway stop` writes the + # planned-stop marker BEFORE killing, and this thread notices it and + # drives the same shutdown path the signal handler would have. Runs + # on every platform (cheap, defensive) so non-signal-bearing + # environments (Windows native, sandboxed CI runners that mask + # SIGTERM) still get a clean drain. + _planned_stop_watcher_stop = threading.Event() + _planned_stop_watcher_thread = threading.Thread( + target=_run_planned_stop_watcher, + args=(_planned_stop_watcher_stop, runner, loop, shutdown_signal_handler), + daemon=True, + name="planned-stop-watcher", + ) + _planned_stop_watcher_thread.start() + # Claim the PID file BEFORE bringing up any platform adapters. # This closes the --replace race window: two concurrent `gateway run # --replace` invocations both pass the termination-wait above, but @@ -18675,6 +18762,10 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool = cron_stop.set() cron_thread.join(timeout=5) + # Stop the planned-stop watcher (daemon=True so this is belt-and-suspenders). + _planned_stop_watcher_stop.set() + _planned_stop_watcher_thread.join(timeout=2) + # Close MCP server connections try: from tools.mcp_tool import shutdown_mcp_servers diff --git a/hermes_cli/gateway_windows.py b/hermes_cli/gateway_windows.py index e019bb3e638..a7f4b983dcb 100644 --- a/hermes_cli/gateway_windows.py +++ b/hermes_cli/gateway_windows.py @@ -1014,12 +1014,70 @@ def start() -> None: _report_gateway_start(f"direct spawn (PID {pid})") -def stop() -> None: - """Stop the gateway. Tries /End on the scheduled task, then kills any stragglers.""" - _assert_windows() - from hermes_cli.gateway import kill_gateway_processes +def _drain_gateway_pid(pid: int, drain_timeout: float) -> bool: + """Write the planned-stop marker and wait for the gateway PID to exit. - stopped_any = False + Windows cannot deliver POSIX signals to a Python asyncio loop + (``loop.add_signal_handler`` raises NotImplementedError), so writing + the marker is the ONLY way to ask a running gateway to drain + in-flight agents and persist ``resume_pending`` before exit. The + gateway's planned-stop watcher thread (gateway/run.py) polls for + the marker and drives the same shutdown path the SIGTERM handler + would have on POSIX. + + Returns True if the PID exited within the timeout, False if it + didn't (caller should escalate to schtasks /End + taskkill). + """ + if pid <= 0: + return False + try: + from gateway.status import write_planned_stop_marker, _pid_exists + except ImportError: + return False + + try: + write_planned_stop_marker(pid) + except Exception: + # Best-effort: if the marker can't be written, we have no choice + # but to fall through to a hard kill. Caller decides escalation. + pass + + deadline = time.monotonic() + max(drain_timeout, 1.0) + while time.monotonic() < deadline: + if not _pid_exists(pid): + return True + time.sleep(0.5) + return False + + +def stop() -> None: + """Stop the gateway. + + Writes the planned-stop marker first so the gateway can drain + in-flight agents and persist ``resume_pending`` before exit (the + gateway's marker-watcher thread picks this up — Windows asyncio + can't deliver SIGTERM to the loop, so the marker is our only IPC). + Then escalates: ``schtasks /End`` (kills the scheduled-task tree) + + ``kill_gateway_processes(force=True)`` for any strays. + """ + _assert_windows() + from hermes_cli.gateway import kill_gateway_processes, _get_restart_drain_timeout + from gateway.status import get_running_pid + + # Phase 1: ask the running gateway (if any) to drain itself by writing + # the planned-stop marker, then wait briefly for it to exit cleanly. + # On clean exit, sessions land with resume_pending=True and the next + # boot will auto-resume them. + pid = get_running_pid() + drained = False + if pid is not None: + try: + drain_timeout = float(_get_restart_drain_timeout() or 30.0) + except Exception: + drain_timeout = 30.0 + drained = _drain_gateway_pid(pid, drain_timeout) + + stopped_any = drained if is_task_registered(): code, _out, err = _exec_schtasks(["/End", "/TN", get_task_name()]) # schtasks returns nonzero when the task isn't currently running — don't treat that as an error. @@ -1028,12 +1086,19 @@ def stop() -> None: elif "not running" not in (err or "").lower(): print(f"⚠ schtasks /End returned code {code}: {err.strip()}") - killed = kill_gateway_processes(all_profiles=False) + # Phase 3: hard-kill any strays. When drain succeeded this is a no-op; + # when drain timed out this is the escalation that ensures the PID + # actually exits. Use force=True on Windows so taskkill /T /F walks + # the descendant tree (browser helpers, etc.). + killed = kill_gateway_processes(all_profiles=False, force=not drained) if killed: stopped_any = True print(f"✓ Killed {killed} gateway process(es)") if stopped_any: - print("✓ Gateway stopped") + if drained: + print("✓ Gateway stopped (drained cleanly)") + else: + print("✓ Gateway stopped") else: print("✗ No gateway was running") diff --git a/tests/gateway/test_planned_stop_watcher.py b/tests/gateway/test_planned_stop_watcher.py new file mode 100644 index 00000000000..8887d122a78 --- /dev/null +++ b/tests/gateway/test_planned_stop_watcher.py @@ -0,0 +1,267 @@ +"""Tests for the planned-stop marker watcher thread (gateway/run.py). + +The watcher is the Windows-fallback path for the v0.13.0 session-resume +feature — on Windows ``asyncio.add_signal_handler`` raises +NotImplementedError, so the SIGTERM signal handler never runs and the +shutdown drain (which writes ``resume_pending=True``) is skipped. The +watcher closes this gap by polling for the planned-stop marker file +and translating its existence into the same shutdown-handler call a +real SIGTERM would have produced. + +See issue #33778 for the original Windows session-loss bug report. +""" + +import asyncio +import threading +import time +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from gateway.run import _run_planned_stop_watcher + + +class _FakeRunner: + """Stand-in for GatewayRunner — only exposes the two flags the watcher reads.""" + + def __init__(self, *, running: bool = True, draining: bool = False): + self._running = running + self._draining = draining + + +def _make_loop_capturing_calls(): + """Build a fake asyncio loop whose call_soon_threadsafe records its args.""" + loop = MagicMock(spec=asyncio.AbstractEventLoop) + loop._captured = [] + + def fake_call_soon_threadsafe(fn, *args): + loop._captured.append((fn, args)) + + loop.call_soon_threadsafe = fake_call_soon_threadsafe + return loop + + +def test_watcher_fires_shutdown_when_marker_appears(tmp_path, monkeypatch): + """When the marker file exists, the watcher must call the shutdown handler.""" + marker = tmp_path / ".gateway-planned-stop.json" + + # Patch the marker-path resolver so the watcher polls our temp location. + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker) + + runner = _FakeRunner(running=True, draining=False) + loop = _make_loop_capturing_calls() + shutdown_handler = MagicMock(name="shutdown_signal_handler") + stop_event = threading.Event() + + # Drop the marker before the thread starts. + marker.write_text('{"target_pid": 1234}', encoding="utf-8") + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, shutdown_handler), + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + watcher.start() + watcher.join(timeout=2.0) + + assert not watcher.is_alive(), "Watcher should exit after firing" + assert len(loop._captured) == 1, ( + f"Expected exactly one shutdown invocation, got {loop._captured}" + ) + fn, args = loop._captured[0] + assert fn is shutdown_handler + # The handler must be called with signal=None (planned stop sentinel). + assert args == (None,) + + +def test_watcher_does_not_fire_when_marker_absent(tmp_path, monkeypatch): + """No marker = no shutdown call. Watcher just spins until stop_event.""" + marker = tmp_path / ".gateway-planned-stop.json" + # Deliberately do NOT create the marker. + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker) + + runner = _FakeRunner(running=True, draining=False) + loop = _make_loop_capturing_calls() + shutdown_handler = MagicMock() + stop_event = threading.Event() + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, shutdown_handler), + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + watcher.start() + time.sleep(0.3) # let it poll a few times + stop_event.set() + watcher.join(timeout=2.0) + + assert not watcher.is_alive() + assert loop._captured == [], ( + f"No marker present, but watcher fired shutdown: {loop._captured}" + ) + shutdown_handler.assert_not_called() + + +def test_watcher_skips_when_runner_already_draining(tmp_path, monkeypatch): + """If shutdown is already in progress, don't re-fire the handler. + + This prevents a race where the SIGTERM handler is mid-drain and the + watcher would double-tap the shutdown path. We check ``_draining`` + so the watcher backs off once any shutdown is in flight. + """ + marker = tmp_path / ".gateway-planned-stop.json" + marker.write_text('{"target_pid": 1234}', encoding="utf-8") + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker) + + # Already draining — watcher should be a no-op. + runner = _FakeRunner(running=False, draining=True) + loop = _make_loop_capturing_calls() + shutdown_handler = MagicMock() + stop_event = threading.Event() + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, shutdown_handler), + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + watcher.start() + time.sleep(0.2) + stop_event.set() + watcher.join(timeout=2.0) + + assert loop._captured == [], "Watcher fired while runner was already draining" + + +def test_watcher_skips_when_runner_not_started(tmp_path, monkeypatch): + """If the runner hasn't started, the marker is for a previous instance — + we shouldn't shutdown a not-yet-running gateway. + """ + marker = tmp_path / ".gateway-planned-stop.json" + marker.write_text('{"target_pid": 9999}', encoding="utf-8") + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker) + + runner = _FakeRunner(running=False, draining=False) + loop = _make_loop_capturing_calls() + shutdown_handler = MagicMock() + stop_event = threading.Event() + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, shutdown_handler), + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + watcher.start() + time.sleep(0.2) + stop_event.set() + watcher.join(timeout=2.0) + + assert loop._captured == [], "Watcher fired before runner was running" + + +def test_watcher_responds_to_stop_event_promptly(tmp_path, monkeypatch): + """Setting stop_event must exit the watcher within ~poll_interval seconds.""" + marker = tmp_path / ".gateway-planned-stop.json" + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker) + + runner = _FakeRunner(running=True, draining=False) + loop = _make_loop_capturing_calls() + stop_event = threading.Event() + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, MagicMock()), + kwargs={"poll_interval": 0.1}, + daemon=True, + ) + watcher.start() + time.sleep(0.05) + started_stop = time.monotonic() + stop_event.set() + watcher.join(timeout=2.0) + elapsed = time.monotonic() - started_stop + + assert not watcher.is_alive() + assert elapsed < 0.5, f"Watcher took {elapsed:.2f}s to honour stop_event" + + +def test_watcher_fires_only_once_when_marker_persists(tmp_path, monkeypatch): + """Marker file existing for multiple polls must NOT spam the handler. + + The watcher fires once and exits its loop (the shutdown handler is + responsible for consuming the marker on its own thread). If we + re-fired on every tick, the handler would be invoked dozens of + times before the gateway actually shuts down. + """ + marker = tmp_path / ".gateway-planned-stop.json" + marker.write_text('{"target_pid": 1234}', encoding="utf-8") + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", lambda: marker) + + runner = _FakeRunner(running=True, draining=False) + loop = _make_loop_capturing_calls() + stop_event = threading.Event() + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, MagicMock()), + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + watcher.start() + # Let the watcher tick several times — but it should exit after the first fire. + watcher.join(timeout=1.0) + + assert not watcher.is_alive() + assert len(loop._captured) == 1, ( + f"Watcher fired {len(loop._captured)} times; should fire once " + f"and exit (events={loop._captured})" + ) + + +def test_watcher_tolerates_marker_path_resolution_errors(tmp_path, monkeypatch, caplog): + """If _get_planned_stop_marker_path() raises, the watcher logs and continues.""" + from gateway import status as status_mod + + call_count = [0] + def explode(): + call_count[0] += 1 + # First call (the one outside the loop, at thread start) is fine — + # but subsequent .exists() calls on a corrupt Path could explode. + if call_count[0] == 1: + return tmp_path / "nonexistent" + raise OSError("filesystem failed") + + monkeypatch.setattr(status_mod, "_get_planned_stop_marker_path", explode) + + runner = _FakeRunner(running=True, draining=False) + loop = _make_loop_capturing_calls() + stop_event = threading.Event() + + watcher = threading.Thread( + target=_run_planned_stop_watcher, + args=(stop_event, runner, loop, MagicMock()), + kwargs={"poll_interval": 0.05}, + daemon=True, + ) + watcher.start() + time.sleep(0.2) + stop_event.set() + watcher.join(timeout=2.0) + + assert not watcher.is_alive(), "Watcher should still honour stop_event after errors" + # No shutdown fired because the marker never reported existence. + assert loop._captured == [] diff --git a/tests/hermes_cli/test_gateway_windows.py b/tests/hermes_cli/test_gateway_windows.py index 1bf6186fe23..e6130219828 100644 --- a/tests/hermes_cli/test_gateway_windows.py +++ b/tests/hermes_cli/test_gateway_windows.py @@ -481,4 +481,221 @@ def test_uninstall_access_denied_declined_keeps_task_and_cleans_files(monkeypatc out = capsys.readouterr().out assert "Skipped elevation" in out assert "UAC is Windows' admin approval prompt" in out - assert "Scheduled Task still registered" in out \ No newline at end of file + assert "Scheduled Task still registered" in out + + +# --------------------------------------------------------------------------- +# stop() drain semantics — issue #33778 +# +# Background: on Windows, asyncio.add_signal_handler raises NotImplementedError, +# so the gateway's SIGTERM handler (which drains in-flight agents and writes +# resume_pending=True) never fires when `hermes gateway stop` kills the +# process. The fix: stop() writes the planned_stop_marker first, waits for +# the gateway's marker-watcher thread to drain + exit cleanly, then escalates +# to taskkill if drain times out. +# --------------------------------------------------------------------------- + + +def test_stop_writes_planned_stop_marker_before_killing(monkeypatch): + """stop() must write the planned-stop marker BEFORE any kill signal. + + Without this, the gateway's drain loop never runs on Windows and + sessions silently lose context across restarts. + """ + pid = 99999 + events = [] + + monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None) + monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False) + + # Stub the marker write so we can record the order of operations. + from gateway import status as status_mod + + def fake_write_marker(target_pid): + events.append(("write_marker", target_pid)) + return True + + def fake_pid_exists(check_pid): + # Drain succeeds: pid "exits" right after the marker write. + return ("write_marker", pid) not in events + + monkeypatch.setattr(status_mod, "write_planned_stop_marker", fake_write_marker) + monkeypatch.setattr(status_mod, "_pid_exists", fake_pid_exists) + monkeypatch.setattr(status_mod, "get_running_pid", lambda: pid) + + def fake_kill(**kwargs): + events.append(("kill", kwargs.get("force", False))) + return 0 + + monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill) + monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 5.0) + + gateway_windows.stop() + + # Marker MUST be written before any kill. + kinds = [e[0] for e in events] + assert "write_marker" in kinds, "stop() never wrote the planned-stop marker" + marker_idx = kinds.index("write_marker") + kill_idx = kinds.index("kill") if "kill" in kinds else len(kinds) + assert marker_idx < kill_idx, ( + f"stop() killed before writing the marker (events={events})" + ) + + +def test_stop_waits_for_graceful_drain_before_force_kill(monkeypatch): + """When drain succeeds, stop() should NOT force-kill the gateway. + + drained=True means the gateway exited cleanly after seeing the + marker — escalating to taskkill /F afterwards would be wasted + work and may emit confusing "killed N processes" output. + """ + pid = 88888 + events = [] + + monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None) + monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False) + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "write_planned_stop_marker", lambda p: True) + + # Simulate the gateway exiting cleanly after one poll tick. + poll_count = [0] + def fake_pid_exists(check_pid): + poll_count[0] += 1 + return poll_count[0] < 2 # alive on first poll, gone on second + monkeypatch.setattr(status_mod, "_pid_exists", fake_pid_exists) + monkeypatch.setattr(status_mod, "get_running_pid", lambda: pid) + + def fake_kill(**kwargs): + events.append(("kill", kwargs.get("force", False))) + return 0 + monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill) + monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 5.0) + + gateway_windows.stop() + + # kill_gateway_processes is still called as the no-op sweep, but + # NOT with force=True — drain succeeded, gateway is already gone. + assert events == [("kill", False)], ( + f"After clean drain, force kill should be disabled (events={events})" + ) + + +def test_stop_escalates_to_force_kill_when_drain_times_out(monkeypatch): + """When drain times out, stop() MUST escalate to force=True. + + Drain timeout = gateway is stuck or unresponsive. Without the + taskkill /T /F escalation, the gateway stays alive and the next + `hermes gateway start` fails with "another instance is running". + """ + pid = 77777 + events = [] + + monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None) + monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False) + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "write_planned_stop_marker", lambda p: True) + # PID never exits — drain times out. + monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: True) + monkeypatch.setattr(status_mod, "get_running_pid", lambda: pid) + + def fake_kill(**kwargs): + events.append(("kill", kwargs.get("force", False))) + return 1 + monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill) + # Tiny drain timeout to keep the test fast. + monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 1.0) + + gateway_windows.stop() + + # When drain times out, kill is invoked with force=True so taskkill /T /F + # walks the process tree. + assert events == [("kill", True)], ( + f"After drain timeout, kill must use force=True (events={events})" + ) + + +def test_stop_no_running_gateway_skips_drain(monkeypatch): + """When no gateway is running, skip the drain wait entirely.""" + events = [] + + monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None) + monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False) + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "get_running_pid", lambda: None) + + def fake_write_marker(target_pid): + events.append(("write_marker", target_pid)) + return True + monkeypatch.setattr(status_mod, "write_planned_stop_marker", fake_write_marker) + monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: False) + + def fake_kill(**kwargs): + events.append(("kill", kwargs.get("force", False))) + return 0 + monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill) + monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 5.0) + + gateway_windows.stop() + + # With no PID to drain, no marker is written. Kill sweep still runs + # (defensive — covers the case where a stray gateway is alive without + # a PID file). force=True because drained=False. + assert ("write_marker", None) not in events + assert all(e[0] != "write_marker" for e in events), ( + f"Should not write marker when no PID is running (events={events})" + ) + assert events == [("kill", True)] + + +def test_drain_helper_handles_invalid_pid(monkeypatch): + """_drain_gateway_pid returns False for invalid PIDs without crashing.""" + assert gateway_windows._drain_gateway_pid(0, 5.0) is False + assert gateway_windows._drain_gateway_pid(-1, 5.0) is False + + +def test_drain_helper_returns_true_when_pid_exits_quickly(monkeypatch): + """_drain_gateway_pid polls _pid_exists until it returns False.""" + pid = 66666 + poll_count = [0] + + def fake_pid_exists(check_pid): + poll_count[0] += 1 + return poll_count[0] < 3 # alive twice, then gone + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "write_planned_stop_marker", lambda p: True) + monkeypatch.setattr(status_mod, "_pid_exists", fake_pid_exists) + + assert gateway_windows._drain_gateway_pid(pid, drain_timeout=5.0) is True + + +def test_drain_helper_returns_false_on_timeout(monkeypatch): + """_drain_gateway_pid returns False when the PID never exits.""" + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "write_planned_stop_marker", lambda p: True) + monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: True) + + assert gateway_windows._drain_gateway_pid(55555, drain_timeout=1.0) is False + + +def test_drain_helper_still_waits_if_marker_write_fails(monkeypatch): + """Marker-write failures are swallowed; drain still polls for PID exit. + + If the marker can't be written (disk full, permission error), the + gateway can't drain — but the wait still happens so a slow-shutdown + gateway from a different code path (e.g. SIGTERM working on this + platform after all) still gets observed cleanly. + """ + pid = 44444 + def fake_write(target_pid): + raise OSError("disk full") + + from gateway import status as status_mod + monkeypatch.setattr(status_mod, "write_planned_stop_marker", fake_write) + monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: False) + + # Returns True because _pid_exists immediately says "gone". + assert gateway_windows._drain_gateway_pid(pid, drain_timeout=5.0) is True \ No newline at end of file