test(windows): align gateway restart CI coverage

This commit is contained in:
Gille 2026-06-25 00:36:53 -06:00 committed by Teknium
parent e7d2f0b93c
commit bf0513bca0
3 changed files with 53 additions and 46 deletions

View file

@ -749,7 +749,7 @@ def test_stop_writes_planned_stop_marker_before_killing(monkeypatch):
def test_stop_waits_for_graceful_drain_before_force_kill(monkeypatch):
"""When drain succeeds, stop() should NOT force-kill the gateway.
"""When drain succeeds, stop() should NOT force-terminate the gateway.
drained=True means the gateway exited cleanly after seeing the
marker escalating to taskkill /F afterwards would be wasted
@ -760,30 +760,36 @@ def test_stop_waits_for_graceful_drain_before_force_kill(monkeypatch):
monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None)
monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False)
monkeypatch.setattr(gateway_windows, "_gateway_pids", lambda: [])
from gateway import status as status_mod
monkeypatch.setattr(status_mod, "write_planned_stop_marker", lambda p: True)
def fake_write_marker(target_pid):
events.append(("write_marker", target_pid))
return True
monkeypatch.setattr(status_mod, "write_planned_stop_marker", fake_write_marker)
# Simulate the gateway exiting cleanly after one poll tick.
poll_count = [0]
def fake_pid_exists(check_pid):
poll_count[0] += 1
return poll_count[0] < 2 # alive on first poll, gone on second
monkeypatch.setattr(status_mod, "_pid_exists", fake_pid_exists)
monkeypatch.setattr(status_mod, "get_running_pid", lambda: pid)
def fake_kill(**kwargs):
events.append(("kill", kwargs.get("force", False)))
return 0
monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill)
def fake_terminate_pid(target_pid, force=False):
events.append(("terminate", target_pid, force))
monkeypatch.setattr(status_mod, "terminate_pid", fake_terminate_pid)
monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 5.0)
gateway_windows.stop()
# kill_gateway_processes is still called as the no-op sweep, but
# NOT with force=True — drain succeeded, gateway is already gone.
assert events == [("kill", False)], (
f"After clean drain, force kill should be disabled (events={events})"
assert events == [("write_marker", pid)], (
f"After clean drain, force termination should be skipped (events={events})"
)
@ -799,35 +805,34 @@ def test_stop_escalates_to_force_kill_when_drain_times_out(monkeypatch):
monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None)
monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False)
monkeypatch.setattr(gateway_windows, "_gateway_pids", lambda: [])
from gateway import status as status_mod
monkeypatch.setattr(status_mod, "write_planned_stop_marker", lambda p: True)
# PID never exits — drain times out.
monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: True)
monkeypatch.setattr(status_mod, "get_running_pid", lambda: pid)
monkeypatch.setattr(gateway_windows, "_drain_gateway_pid", lambda *_args: False)
def fake_kill(**kwargs):
events.append(("kill", kwargs.get("force", False)))
return 1
monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill)
# Tiny drain timeout to keep the test fast.
monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 1.0)
def fake_terminate_pid(target_pid, force=False):
events.append(("terminate", target_pid, force))
monkeypatch.setattr(status_mod, "terminate_pid", fake_terminate_pid)
gateway_windows.stop()
# When drain times out, kill is invoked with force=True so taskkill /T /F
# walks the process tree.
assert events == [("kill", True)], (
f"After drain timeout, kill must use force=True (events={events})"
assert events == [("terminate", pid, True)], (
f"After drain timeout, known PID must be force terminated (events={events})"
)
def test_stop_no_running_gateway_skips_drain(monkeypatch):
"""When no gateway is running, skip the drain wait entirely."""
"""When no gateway PID file is running, skip drain but clear known strays."""
events = []
stray_pid = 42424
monkeypatch.setattr(gateway_windows, "_assert_windows", lambda: None)
monkeypatch.setattr(gateway_windows, "is_task_registered", lambda: False)
monkeypatch.setattr(gateway_windows, "_gateway_pids", lambda: [stray_pid])
from gateway import status as status_mod
monkeypatch.setattr(status_mod, "get_running_pid", lambda: None)
@ -836,24 +841,24 @@ def test_stop_no_running_gateway_skips_drain(monkeypatch):
events.append(("write_marker", target_pid))
return True
monkeypatch.setattr(status_mod, "write_planned_stop_marker", fake_write_marker)
monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: False)
monkeypatch.setattr(status_mod, "_pid_exists", lambda check_pid: check_pid == stray_pid)
def fake_kill(**kwargs):
events.append(("kill", kwargs.get("force", False)))
return 0
monkeypatch.setattr("hermes_cli.gateway.kill_gateway_processes", fake_kill)
def fake_terminate_pid(target_pid, force=False):
events.append(("terminate", target_pid, force))
monkeypatch.setattr(status_mod, "terminate_pid", fake_terminate_pid)
monkeypatch.setattr("hermes_cli.gateway._get_restart_drain_timeout", lambda: 5.0)
gateway_windows.stop()
# With no PID to drain, no marker is written. Kill sweep still runs
# (defensive — covers the case where a stray gateway is alive without
# a PID file). force=True because drained=False.
# With no PID to drain, no marker is written. The bounded profile scan can
# still find and terminate a known stray without falling back to a broad
# process sweep.
assert ("write_marker", None) not in events
assert all(e[0] != "write_marker" for e in events), (
f"Should not write marker when no PID is running (events={events})"
)
assert events == [("kill", True)]
assert events == [("terminate", stray_pid, True)]
def test_drain_helper_handles_invalid_pid(monkeypatch):

View file

@ -1538,6 +1538,7 @@ class TestSigkillEscalation:
lambda: {"terminal": {"daemon_term_grace_seconds": -5}})
assert ProcessRegistry._daemon_term_grace_seconds() == 0.0
@pytest.mark.live_system_guard_bypass
def test_entire_tree_is_sigkilled_not_just_parent(self, monkeypatch):
"""A SIGTERM-ignoring parent + children are ALL force-killed.

View file

@ -940,9 +940,9 @@ class TestGatewayDetachedWatcherWindowsFlags:
breaks away from any job-object the watcher itself inherits.
Static check the watcher source is built at import time and embedded
verbatim in the module text. Parsing it for an exact AST node would be
brittle; the textual presence of the hex flag plus the symbolic name is
a sufficient regression guard.
verbatim in the module text. The literal Win32 bits live in
hermes_cli._subprocess_compat; the watcher must call that helper from
inside the inlined payload so runtime behavior keeps the breakaway bit.
The bit was added to the inlined payload by PR #40909. This test
ensures a future refactor of the dedent block doesn't silently drop it.
@ -955,14 +955,16 @@ class TestGatewayDetachedWatcherWindowsFlags:
end = text.find(").strip()", idx)
assert end != -1, "watcher block end not found"
block = text[idx:end]
assert "0x01000000" in block, (
"Inlined respawn watcher must set CREATE_BREAKAWAY_FROM_JOB "
"(0x01000000) on the respawned gateway — without it, the new "
"gateway is reaped when the parent job is torn down."
assert "from hermes_cli._subprocess_compat import" in block
assert "windows_detach_flags" in block
assert "windows_detach_flags()" in block, (
"Inlined respawn watcher must call windows_detach_flags() for the "
"respawned gateway; that helper carries CREATE_BREAKAWAY_FROM_JOB "
"so the new gateway is not reaped when the parent job tears down."
)
assert "_CREATE_BREAKAWAY_FROM_JOB" in block, (
"Inlined respawn watcher must name CREATE_BREAKAWAY_FROM_JOB "
"symbolically so the intent is greppable."
assert "See _subprocess_compat.windows_detach_flags()" in block, (
"Inlined respawn watcher should keep the breakaway intent greppable "
"near the helper call."
)
def test_launch_detached_profile_gateway_restart_outer_popen_has_access_denied_fallback(
@ -1000,10 +1002,9 @@ class TestGatewayDetachedWatcherWindowsFlags:
idx = text.find(marker)
end = text.find(").strip()", idx)
block = text[idx:end]
# The inlined script catches OSError on the respawn and retries
# with breakaway cleared via ``& ~_CREATE_BREAKAWAY_FROM_JOB``.
assert "~_CREATE_BREAKAWAY_FROM_JOB" in block, (
assert "except OSError" in block
assert "windows_detach_flags_without_breakaway()" in block, (
"Inlined respawn must catch OSError on the breakaway-denied "
"CreateProcess and retry without the breakaway bit, matching "
"gateway_windows._spawn_detached's fallback pattern."
"CreateProcess and retry with windows_detach_flags_without_breakaway(), "
"matching gateway_windows._spawn_detached's fallback pattern."
)