mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-09 03:11:58 +00:00
PR #21561 migrated liveness probes across 14 call sites from
`os.kill(pid, 0)` to `gateway.status._pid_exists` (psutil-first) so
the gateway doesn't Ctrl+C-itself on Windows via bpo-14484. A handful of
tests still patched the old `os.kill` seam and either happened to pass
on POSIX (when PID 12345 incidentally wasn't alive on the CI worker) or
failed outright — on CI runs they surfaced as 7 flaky/stable failures.
Migrate each affected test to patch the correct seam:
- tests/tools/test_browser_orphan_reaper.py (5 tests)
Patch `gateway.status._pid_exists` instead of `os.kill`.
Rename test_permission_error_on_kill_check_skips to
test_alive_legacy_daemon_is_reaped — the old assertion was
"PermissionError on sig 0 → skip dir"; post-migration the
untracked-alive-daemon path always reaps the dir after SIGTERM
(best-effort semantics were preserved).
- tests/tools/test_windows_native_support.py (4 tests)
Replace tests that asserted `os.kill` seam behavior with tests
that exercise `ProcessRegistry._is_host_pid_alive` as a
delegator and split out a new TestPidExistsOSErrorWidening class
that hits `gateway.status._pid_exists` directly via the POSIX
fallback branch (so Windows-style `OSError(WinError 87)` + `PermissionError`
widening is still covered on Linux CI).
- tests/tools/test_process_registry.py (1 test)
Mock `psutil.Process` + `_pid_exists` instead of `os.kill`
for the detached-session kill path.
- tests/tools/test_mcp_stability.py::test_kill_orphaned_uses_sigkill_when_available
SIGTERM → alive-check → SIGKILL flow now uses `_pid_exists`
for the middle step; assertion count drops from 3 to 2.
- tests/gateway/test_status.py::TestScopedLocks (2 tests)
`acquire_scoped_lock` consults `_pid_exists`; patch that
seam directly instead of trying to control the nested psutil
call via os.kill monkeypatch.
- tests/hermes_cli/test_gateway.py::test_stop_profile_gateway_keeps_pid_file_when_process_still_running
The stop loop sends one SIGTERM via os.kill then polls 20x via
_pid_exists; instrument both separately. Old assertion
`calls["kill"] == 21` split into `kill == 1` + `alive_probes == 20`.
- tests/hermes_cli/test_auth_toctou_file_modes.py::test_shared_nous_store_writes_0o600_with_0o700_parent
Commit c34884ea2 switched the pytest seat-belt guard in
`_nous_shared_store_path()` from `Path.home() / ".hermes"`
to `get_default_hermes_root()`, which honors HERMES_HOME. The
test sets both HERMES_HOME and HERMES_SHARED_AUTH_DIR to
subpaths of the same tmp_path, and the override now collapses
onto the same path the guard is refusing. Renamed the override
subdirectory so the two paths diverge — guard passes, test runs.
All 21 original CI failures and their local-flaky siblings now pass
(278 tests across the touched files, 0 failures).
477 lines
18 KiB
Python
477 lines
18 KiB
Python
"""Tests for hermes_cli.gateway."""
|
|
|
|
import sys
|
|
from types import ModuleType, SimpleNamespace
|
|
from unittest.mock import patch, call
|
|
|
|
import pytest
|
|
|
|
import hermes_cli.gateway as gateway
|
|
|
|
|
|
def _install_fake_gateway_run(monkeypatch, start_gateway):
|
|
module = ModuleType("gateway.run")
|
|
module.start_gateway = start_gateway
|
|
monkeypatch.setitem(sys.modules, "gateway.run", module)
|
|
|
|
|
|
def test_run_gateway_exits_cleanly_on_keyboard_interrupt(monkeypatch, capsys):
|
|
calls = []
|
|
|
|
def fake_start_gateway(*, replace, verbosity):
|
|
calls.append((replace, verbosity))
|
|
return object()
|
|
|
|
def fake_asyncio_run(coro):
|
|
raise KeyboardInterrupt
|
|
|
|
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
|
monkeypatch.setattr(gateway.asyncio, "run", fake_asyncio_run)
|
|
|
|
gateway.run_gateway()
|
|
|
|
out = capsys.readouterr().out
|
|
assert calls == [(False, 0)]
|
|
assert "Press Ctrl+C to stop" in out
|
|
assert "Gateway stopped." in out
|
|
|
|
|
|
def test_run_gateway_exits_nonzero_when_start_gateway_reports_failure(monkeypatch):
|
|
calls = []
|
|
|
|
def fake_start_gateway(*, replace, verbosity):
|
|
calls.append((replace, verbosity))
|
|
return object()
|
|
|
|
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
|
monkeypatch.setattr(gateway.asyncio, "run", lambda coro: False)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
gateway.run_gateway(verbose=1, quiet=True, replace=True)
|
|
|
|
assert exc_info.value.code == 1
|
|
assert calls == [(True, None)]
|
|
|
|
|
|
def test_run_gateway_refuses_root_in_official_docker(monkeypatch, tmp_path, capsys):
|
|
project_root = tmp_path / "opt" / "hermes"
|
|
(project_root / "docker").mkdir(parents=True)
|
|
(project_root / "docker" / "entrypoint.sh").write_text("#!/bin/sh\n")
|
|
|
|
monkeypatch.setattr(gateway, "PROJECT_ROOT", project_root)
|
|
monkeypatch.setattr(gateway.os, "geteuid", lambda: 0)
|
|
monkeypatch.delenv("HERMES_ALLOW_ROOT_GATEWAY", raising=False)
|
|
monkeypatch.setattr(gateway, "_is_official_docker_checkout", lambda: True)
|
|
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
gateway.run_gateway()
|
|
|
|
assert exc_info.value.code == 1
|
|
out = capsys.readouterr().out
|
|
assert "Refusing to run the Hermes gateway as root" in out
|
|
assert "/opt/hermes/docker/entrypoint.sh" in out
|
|
|
|
|
|
def test_run_gateway_root_guard_has_escape_hatch(monkeypatch):
|
|
calls = []
|
|
|
|
def fake_start_gateway(*, replace, verbosity):
|
|
calls.append((replace, verbosity))
|
|
return object()
|
|
|
|
_install_fake_gateway_run(monkeypatch, fake_start_gateway)
|
|
monkeypatch.setattr(gateway.asyncio, "run", lambda coro: True)
|
|
monkeypatch.setattr(gateway.os, "geteuid", lambda: 0)
|
|
monkeypatch.setattr(gateway, "_is_official_docker_checkout", lambda: True)
|
|
monkeypatch.setenv("HERMES_ALLOW_ROOT_GATEWAY", "1")
|
|
|
|
gateway.run_gateway(verbose=2, replace=True)
|
|
|
|
assert calls == [(True, 2)]
|
|
|
|
|
|
class TestSystemdLingerStatus:
|
|
def test_reports_enabled(self, monkeypatch):
|
|
monkeypatch.setattr(gateway, "is_linux", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_termux", lambda: False)
|
|
monkeypatch.setenv("USER", "alice")
|
|
monkeypatch.setattr(
|
|
gateway.subprocess,
|
|
"run",
|
|
lambda *args, **kwargs: SimpleNamespace(returncode=0, stdout="yes\n", stderr=""),
|
|
)
|
|
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/loginctl")
|
|
|
|
assert gateway.get_systemd_linger_status() == (True, "")
|
|
|
|
def test_reports_disabled(self, monkeypatch):
|
|
monkeypatch.setattr(gateway, "is_linux", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_termux", lambda: False)
|
|
monkeypatch.setenv("USER", "alice")
|
|
monkeypatch.setattr(
|
|
gateway.subprocess,
|
|
"run",
|
|
lambda *args, **kwargs: SimpleNamespace(returncode=0, stdout="no\n", stderr=""),
|
|
)
|
|
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/loginctl")
|
|
|
|
assert gateway.get_systemd_linger_status() == (False, "")
|
|
|
|
def test_reports_termux_as_not_supported(self, monkeypatch):
|
|
monkeypatch.setattr(gateway, "is_termux", lambda: True)
|
|
|
|
assert gateway.get_systemd_linger_status() == (None, "not supported in Termux")
|
|
|
|
|
|
class TestContainerSystemdSupport:
|
|
def test_supports_systemd_services_in_container_with_user_manager(self, monkeypatch):
|
|
monkeypatch.setattr(gateway, "is_linux", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_termux", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_wsl", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_container", lambda: True)
|
|
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/systemctl")
|
|
monkeypatch.setattr(gateway, "_systemd_operational", lambda system=False: not system)
|
|
|
|
assert gateway.supports_systemd_services() is True
|
|
|
|
def test_supports_systemd_services_in_container_with_system_manager(self, monkeypatch):
|
|
monkeypatch.setattr(gateway, "is_linux", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_termux", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_wsl", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_container", lambda: True)
|
|
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/systemctl")
|
|
monkeypatch.setattr(gateway, "_systemd_operational", lambda system=False: system)
|
|
|
|
assert gateway.supports_systemd_services() is True
|
|
|
|
def test_supports_systemd_services_in_container_without_systemd(self, monkeypatch):
|
|
monkeypatch.setattr(gateway, "is_linux", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_termux", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_wsl", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_container", lambda: True)
|
|
monkeypatch.setattr("shutil.which", lambda name: "/usr/bin/systemctl")
|
|
monkeypatch.setattr(gateway, "_systemd_operational", lambda system=False: False)
|
|
|
|
assert gateway.supports_systemd_services() is False
|
|
|
|
|
|
def test_gateway_install_in_container_with_operational_systemd_uses_systemd(monkeypatch):
|
|
monkeypatch.setattr(gateway, "supports_systemd_services", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_wsl", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_macos", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_managed", lambda: False)
|
|
|
|
calls = []
|
|
monkeypatch.setattr(
|
|
gateway,
|
|
"systemd_install",
|
|
lambda force=False, system=False, run_as_user=None: calls.append((force, system, run_as_user)),
|
|
)
|
|
|
|
args = SimpleNamespace(
|
|
gateway_command="install",
|
|
force=False,
|
|
system=False,
|
|
run_as_user=None,
|
|
)
|
|
gateway.gateway_command(args)
|
|
|
|
assert calls == [(False, False, None)]
|
|
|
|
|
|
def test_gateway_start_in_container_with_operational_systemd_uses_systemd(monkeypatch):
|
|
monkeypatch.setattr(gateway, "supports_systemd_services", lambda: True)
|
|
monkeypatch.setattr(gateway, "is_wsl", lambda: False)
|
|
monkeypatch.setattr(gateway, "is_macos", lambda: False)
|
|
|
|
calls = []
|
|
monkeypatch.setattr(gateway, "systemd_start", lambda system=False: calls.append(system))
|
|
|
|
args = SimpleNamespace(gateway_command="start", system=False, all=False)
|
|
gateway.gateway_command(args)
|
|
|
|
assert calls == [False]
|
|
|
|
|
|
def test_systemd_status_warns_when_linger_disabled(monkeypatch, tmp_path, capsys):
|
|
unit_path = tmp_path / "hermes-gateway.service"
|
|
unit_path.write_text("[Unit]\n")
|
|
|
|
monkeypatch.setattr(gateway, "get_systemd_unit_path", lambda system=False: unit_path)
|
|
monkeypatch.setattr(gateway, "get_systemd_linger_status", lambda: (False, ""))
|
|
|
|
def fake_run(cmd, capture_output=False, text=False, check=False, **kwargs):
|
|
if cmd[:4] == ["systemctl", "--user", "status", gateway.get_service_name()]:
|
|
return SimpleNamespace(returncode=0, stdout="", stderr="")
|
|
if cmd[:3] == ["systemctl", "--user", "is-active"]:
|
|
return SimpleNamespace(returncode=0, stdout="active\n", stderr="")
|
|
if cmd[:3] == ["systemctl", "--user", "show"]:
|
|
return SimpleNamespace(
|
|
returncode=0,
|
|
stdout="ActiveState=active\nSubState=running\nResult=success\nExecMainStatus=0\n",
|
|
stderr="",
|
|
)
|
|
raise AssertionError(f"Unexpected command: {cmd}")
|
|
|
|
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
|
|
|
|
gateway.systemd_status(deep=False)
|
|
|
|
out = capsys.readouterr().out
|
|
assert "gateway service is running" in out
|
|
assert "Systemd linger is disabled" in out
|
|
assert "loginctl enable-linger" in out
|
|
|
|
|
|
def test_systemd_install_checks_linger_status(monkeypatch, tmp_path, capsys):
|
|
unit_path = tmp_path / "systemd" / "user" / "hermes-gateway.service"
|
|
|
|
monkeypatch.setattr(gateway, "get_systemd_unit_path", lambda system=False: unit_path)
|
|
|
|
calls = []
|
|
helper_calls = []
|
|
|
|
def fake_run(cmd, check=False, **kwargs):
|
|
calls.append((cmd, check))
|
|
return SimpleNamespace(returncode=0, stdout="", stderr="")
|
|
|
|
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
|
|
monkeypatch.setattr(gateway, "_ensure_linger_enabled", lambda: helper_calls.append(True))
|
|
|
|
gateway.systemd_install(force=False)
|
|
|
|
out = capsys.readouterr().out
|
|
assert unit_path.exists()
|
|
assert [cmd for cmd, _ in calls] == [
|
|
["systemctl", "--user", "daemon-reload"],
|
|
["systemctl", "--user", "enable", gateway.get_service_name()],
|
|
]
|
|
assert helper_calls == [True]
|
|
assert "User service installed and enabled" in out
|
|
|
|
|
|
def test_systemd_install_system_scope_skips_linger_and_uses_systemctl(monkeypatch, tmp_path, capsys):
|
|
unit_path = tmp_path / "etc" / "systemd" / "system" / "hermes-gateway.service"
|
|
|
|
monkeypatch.setattr(gateway, "get_systemd_unit_path", lambda system=False: unit_path)
|
|
monkeypatch.setattr(
|
|
gateway,
|
|
"generate_systemd_unit",
|
|
lambda system=False, run_as_user=None: f"scope={system} user={run_as_user}\n",
|
|
)
|
|
monkeypatch.setattr(gateway, "_require_root_for_system_service", lambda action: None)
|
|
|
|
calls = []
|
|
helper_calls = []
|
|
|
|
def fake_run(cmd, check=False, **kwargs):
|
|
calls.append((cmd, check))
|
|
return SimpleNamespace(returncode=0, stdout="", stderr="")
|
|
|
|
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
|
|
monkeypatch.setattr(gateway, "_ensure_linger_enabled", lambda: helper_calls.append(True))
|
|
|
|
gateway.systemd_install(force=False, system=True, run_as_user="alice")
|
|
|
|
out = capsys.readouterr().out
|
|
assert unit_path.exists()
|
|
assert unit_path.read_text(encoding="utf-8") == "scope=True user=alice\n"
|
|
assert [cmd for cmd, _ in calls] == [
|
|
["systemctl", "daemon-reload"],
|
|
["systemctl", "enable", gateway.get_service_name()],
|
|
]
|
|
assert helper_calls == []
|
|
assert "Configured to run as: alice" not in out # generated test unit has no User= line
|
|
assert "System service installed and enabled" in out
|
|
|
|
|
|
def test_conflicting_systemd_units_warning(monkeypatch, tmp_path, capsys):
|
|
user_unit = tmp_path / "user" / "hermes-gateway.service"
|
|
system_unit = tmp_path / "system" / "hermes-gateway.service"
|
|
user_unit.parent.mkdir(parents=True)
|
|
system_unit.parent.mkdir(parents=True)
|
|
user_unit.write_text("[Unit]\n", encoding="utf-8")
|
|
system_unit.write_text("[Unit]\n", encoding="utf-8")
|
|
|
|
monkeypatch.setattr(
|
|
gateway,
|
|
"get_systemd_unit_path",
|
|
lambda system=False: system_unit if system else user_unit,
|
|
)
|
|
|
|
gateway.print_systemd_scope_conflict_warning()
|
|
|
|
out = capsys.readouterr().out
|
|
assert "Both user and system gateway services are installed" in out
|
|
assert "hermes gateway uninstall" in out
|
|
assert "--system" in out
|
|
|
|
|
|
def test_install_linux_gateway_from_setup_system_choice_without_root_prints_followup(monkeypatch, capsys):
|
|
monkeypatch.setattr(gateway, "prompt_linux_gateway_install_scope", lambda: "system")
|
|
monkeypatch.setattr(gateway.os, "geteuid", lambda: 1000)
|
|
monkeypatch.setattr(gateway, "_default_system_service_user", lambda: "alice")
|
|
monkeypatch.setattr(gateway, "systemd_install", lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not install")))
|
|
|
|
scope, did_install = gateway.install_linux_gateway_from_setup(force=False)
|
|
|
|
out = capsys.readouterr().out
|
|
assert (scope, did_install) == ("system", False)
|
|
assert "sudo hermes gateway install --system --run-as-user alice" in out
|
|
assert "sudo hermes gateway start --system" in out
|
|
|
|
|
|
def test_install_linux_gateway_from_setup_system_choice_as_root_installs(monkeypatch):
|
|
monkeypatch.setattr(gateway, "prompt_linux_gateway_install_scope", lambda: "system")
|
|
monkeypatch.setattr(gateway.os, "geteuid", lambda: 0)
|
|
monkeypatch.setattr(gateway, "_default_system_service_user", lambda: "alice")
|
|
|
|
calls = []
|
|
monkeypatch.setattr(
|
|
gateway,
|
|
"systemd_install",
|
|
lambda force=False, system=False, run_as_user=None: calls.append((force, system, run_as_user)),
|
|
)
|
|
|
|
scope, did_install = gateway.install_linux_gateway_from_setup(force=True)
|
|
|
|
assert (scope, did_install) == ("system", True)
|
|
assert calls == [(True, True, "alice")]
|
|
|
|
|
|
def test_find_gateway_pids_falls_back_to_pid_file_when_process_scan_fails(monkeypatch):
|
|
monkeypatch.setattr(gateway, "_get_service_pids", lambda: set())
|
|
monkeypatch.setattr(gateway, "is_windows", lambda: False)
|
|
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 321)
|
|
|
|
def fake_run(cmd, **kwargs):
|
|
if cmd[:4] == ["ps", "-A", "eww", "-o"]:
|
|
return SimpleNamespace(returncode=1, stdout="", stderr="ps failed")
|
|
if cmd[:3] == ["ps", "-o", "ppid="]:
|
|
# _get_ancestor_pids() walks up the tree; return "no parent" so
|
|
# the loop terminates cleanly.
|
|
return SimpleNamespace(returncode=1, stdout="", stderr="")
|
|
raise AssertionError(f"Unexpected command: {cmd}")
|
|
|
|
monkeypatch.setattr(gateway.subprocess, "run", fake_run)
|
|
|
|
assert gateway.find_gateway_pids() == [321]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _wait_for_gateway_exit
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestWaitForGatewayExit:
|
|
"""PID-based wait with force-kill on timeout."""
|
|
|
|
def test_returns_immediately_when_no_pid(self, monkeypatch):
|
|
"""If get_running_pid returns None, exit instantly."""
|
|
monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
|
|
# Should return without sleeping at all.
|
|
gateway._wait_for_gateway_exit(timeout=1.0, force_after=0.5)
|
|
|
|
def test_returns_when_process_exits_gracefully(self, monkeypatch):
|
|
"""Process exits after a couple of polls — no SIGKILL needed."""
|
|
poll_count = 0
|
|
|
|
def mock_get_running_pid():
|
|
nonlocal poll_count
|
|
poll_count += 1
|
|
return 12345 if poll_count <= 2 else None
|
|
|
|
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
|
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
|
|
|
gateway._wait_for_gateway_exit(timeout=10.0, force_after=999.0)
|
|
# Should have polled until None was returned.
|
|
assert poll_count == 3
|
|
|
|
def test_force_kills_after_grace_period(self, monkeypatch):
|
|
"""When the process doesn't exit, force-kill the saved PID."""
|
|
|
|
# Simulate monotonic time advancing past force_after
|
|
call_num = 0
|
|
def fake_monotonic():
|
|
nonlocal call_num
|
|
call_num += 1
|
|
# First two calls: initial deadline + force_deadline setup (time 0)
|
|
# Then each loop iteration advances time
|
|
return call_num * 2.0 # 2, 4, 6, 8, ...
|
|
|
|
kills = []
|
|
def mock_terminate(pid, force=False):
|
|
kills.append((pid, force))
|
|
|
|
# get_running_pid returns the PID until kill is sent, then None
|
|
def mock_get_running_pid():
|
|
return None if kills else 42
|
|
|
|
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
|
monkeypatch.setattr("gateway.status.get_running_pid", mock_get_running_pid)
|
|
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
|
|
|
gateway._wait_for_gateway_exit(timeout=10.0, force_after=5.0)
|
|
assert (42, True) in kills
|
|
|
|
def test_handles_process_already_gone_on_kill(self, monkeypatch):
|
|
"""ProcessLookupError during force-kill is not fatal."""
|
|
|
|
call_num = 0
|
|
def fake_monotonic():
|
|
nonlocal call_num
|
|
call_num += 1
|
|
return call_num * 3.0 # Jump past force_after quickly
|
|
|
|
def mock_terminate(pid, force=False):
|
|
raise ProcessLookupError
|
|
|
|
monkeypatch.setattr("time.monotonic", fake_monotonic)
|
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
|
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 99)
|
|
monkeypatch.setattr(gateway, "terminate_pid", mock_terminate)
|
|
|
|
# Should not raise — ProcessLookupError means it's already gone.
|
|
gateway._wait_for_gateway_exit(timeout=10.0, force_after=2.0)
|
|
|
|
def test_kill_gateway_processes_force_uses_helper(self, monkeypatch):
|
|
calls = []
|
|
|
|
monkeypatch.setattr(gateway, "find_gateway_pids", lambda exclude_pids=None, all_profiles=False: [11, 22])
|
|
monkeypatch.setattr(gateway, "terminate_pid", lambda pid, force=False: calls.append((pid, force)))
|
|
|
|
killed = gateway.kill_gateway_processes(force=True)
|
|
|
|
assert killed == 2
|
|
assert calls == [(11, True), (22, True)]
|
|
|
|
|
|
class TestStopProfileGateway:
|
|
def test_stop_profile_gateway_keeps_pid_file_when_process_still_running(self, monkeypatch):
|
|
calls = {"kill": 0, "alive_probes": 0, "remove": 0}
|
|
|
|
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 12345)
|
|
# Post-#21561: the stop loop sends one SIGTERM via ``os.kill`` then
|
|
# polls liveness via ``gateway.status._pid_exists`` (safe on
|
|
# Windows — bpo-14484). Instrument both seams separately.
|
|
monkeypatch.setattr(
|
|
gateway.os,
|
|
"kill",
|
|
lambda pid, sig: calls.__setitem__("kill", calls["kill"] + 1),
|
|
)
|
|
monkeypatch.setattr(
|
|
"gateway.status._pid_exists",
|
|
lambda pid: calls.__setitem__("alive_probes", calls["alive_probes"] + 1) or True,
|
|
)
|
|
monkeypatch.setattr("time.sleep", lambda _: None)
|
|
monkeypatch.setattr(
|
|
"gateway.status.remove_pid_file",
|
|
lambda: calls.__setitem__("remove", calls["remove"] + 1),
|
|
)
|
|
|
|
assert gateway.stop_profile_gateway() is True
|
|
assert calls["kill"] == 1 # one SIGTERM
|
|
assert calls["alive_probes"] == 20 # 20 liveness polls over the 2s window
|
|
assert calls["remove"] == 0
|