mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-09 03:11:58 +00:00
test: migrate stale os.kill monkeypatches to gateway.status._pid_exists
PR #21561 migrated liveness probes across 14 call sites from
`os.kill(pid, 0)` to `gateway.status._pid_exists` (psutil-first) so
the gateway doesn't Ctrl+C-itself on Windows via bpo-14484. A handful of
tests still patched the old `os.kill` seam and either happened to pass
on POSIX (when PID 12345 incidentally wasn't alive on the CI worker) or
failed outright — on CI runs they surfaced as 7 flaky/stable failures.
Migrate each affected test to patch the correct seam:
- tests/tools/test_browser_orphan_reaper.py (5 tests)
Patch `gateway.status._pid_exists` instead of `os.kill`.
Rename test_permission_error_on_kill_check_skips to
test_alive_legacy_daemon_is_reaped — the old assertion was
"PermissionError on sig 0 → skip dir"; post-migration the
untracked-alive-daemon path always reaps the dir after SIGTERM
(best-effort semantics were preserved).
- tests/tools/test_windows_native_support.py (4 tests)
Replace tests that asserted `os.kill` seam behavior with tests
that exercise `ProcessRegistry._is_host_pid_alive` as a
delegator and split out a new TestPidExistsOSErrorWidening class
that hits `gateway.status._pid_exists` directly via the POSIX
fallback branch (so Windows-style `OSError(WinError 87)` + `PermissionError`
widening is still covered on Linux CI).
- tests/tools/test_process_registry.py (1 test)
Mock `psutil.Process` + `_pid_exists` instead of `os.kill`
for the detached-session kill path.
- tests/tools/test_mcp_stability.py::test_kill_orphaned_uses_sigkill_when_available
SIGTERM → alive-check → SIGKILL flow now uses `_pid_exists`
for the middle step; assertion count drops from 3 to 2.
- tests/gateway/test_status.py::TestScopedLocks (2 tests)
`acquire_scoped_lock` consults `_pid_exists`; patch that
seam directly instead of trying to control the nested psutil
call via os.kill monkeypatch.
- tests/hermes_cli/test_gateway.py::test_stop_profile_gateway_keeps_pid_file_when_process_still_running
The stop loop sends one SIGTERM via os.kill then polls 20x via
_pid_exists; instrument both separately. Old assertion
`calls["kill"] == 21` split into `kill == 1` + `alive_probes == 20`.
- tests/hermes_cli/test_auth_toctou_file_modes.py::test_shared_nous_store_writes_0o600_with_0o700_parent
Commit c34884ea2 switched the pytest seat-belt guard in
`_nous_shared_store_path()` from `Path.home() / ".hermes"`
to `get_default_hermes_root()`, which honors HERMES_HOME. The
test sets both HERMES_HOME and HERMES_SHARED_AUTH_DIR to
subpaths of the same tmp_path, and the override now collapses
onto the same path the guard is refusing. Renamed the override
subdirectory so the two paths diverge — guard passes, test runs.
All 21 original CI failures and their local-flaky siblings now pass
(278 tests across the touched files, 0 failures).
This commit is contained in:
parent
291a158441
commit
f5ee780124
7 changed files with 160 additions and 80 deletions
|
|
@ -410,7 +410,9 @@ class TestScopedLocks:
|
|||
"kind": "hermes-gateway",
|
||||
}))
|
||||
|
||||
monkeypatch.setattr(status.os, "kill", lambda pid, sig: None)
|
||||
# Post-#21561 the liveness probe routes through
|
||||
# ``gateway.status._pid_exists`` (psutil-first, safe on Windows).
|
||||
monkeypatch.setattr(status, "_pid_exists", lambda pid: True)
|
||||
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: 123)
|
||||
|
||||
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||
|
|
@ -428,10 +430,8 @@ class TestScopedLocks:
|
|||
"kind": "hermes-gateway",
|
||||
}))
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
raise ProcessLookupError
|
||||
|
||||
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||
# Post-#21561: simulate "PID gone" via _pid_exists returning False.
|
||||
monkeypatch.setattr(status, "_pid_exists", lambda pid: False)
|
||||
|
||||
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
|
||||
|
||||
|
|
|
|||
|
|
@ -116,8 +116,12 @@ def test_shared_nous_store_writes_0o600_with_0o700_parent(tmp_path, monkeypatch)
|
|||
"""The Nous shared-credential store must land at 0o600 / parent 0o700."""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
# _nous_shared_store_path() refuses to touch the real shared store during
|
||||
# pytest runs; redirect it into tmp_path explicitly.
|
||||
monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(tmp_path / "shared"))
|
||||
# pytest runs; redirect it into tmp_path explicitly. Use a distinct
|
||||
# subdirectory name (``shared_override``) so the guard's "real user
|
||||
# home" reference — which currently tracks HERMES_HOME via
|
||||
# get_default_hermes_root() — can't collide with our override and
|
||||
# falsely claim we're writing to the real user's shared store.
|
||||
monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(tmp_path / "shared_override"))
|
||||
old_umask = os.umask(0o022)
|
||||
try:
|
||||
from hermes_cli import auth as auth_mod
|
||||
|
|
|
|||
|
|
@ -450,14 +450,21 @@ class TestWaitForGatewayExit:
|
|||
|
||||
class TestStopProfileGateway:
|
||||
def test_stop_profile_gateway_keeps_pid_file_when_process_still_running(self, monkeypatch):
|
||||
calls = {"kill": 0, "remove": 0}
|
||||
calls = {"kill": 0, "alive_probes": 0, "remove": 0}
|
||||
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 12345)
|
||||
# Post-#21561: the stop loop sends one SIGTERM via ``os.kill`` then
|
||||
# polls liveness via ``gateway.status._pid_exists`` (safe on
|
||||
# Windows — bpo-14484). Instrument both seams separately.
|
||||
monkeypatch.setattr(
|
||||
gateway.os,
|
||||
"kill",
|
||||
lambda pid, sig: calls.__setitem__("kill", calls["kill"] + 1),
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
"gateway.status._pid_exists",
|
||||
lambda pid: calls.__setitem__("alive_probes", calls["alive_probes"] + 1) or True,
|
||||
)
|
||||
monkeypatch.setattr("time.sleep", lambda _: None)
|
||||
monkeypatch.setattr(
|
||||
"gateway.status.remove_pid_file",
|
||||
|
|
@ -465,5 +472,6 @@ class TestStopProfileGateway:
|
|||
)
|
||||
|
||||
assert gateway.stop_profile_gateway() is True
|
||||
assert calls["kill"] == 21
|
||||
assert calls["kill"] == 1 # one SIGTERM
|
||||
assert calls["alive_probes"] == 20 # 20 liveness polls over the 2s window
|
||||
assert calls["remove"] == 0
|
||||
|
|
|
|||
|
|
@ -81,19 +81,18 @@ class TestReapOrphanedBrowserSessions:
|
|||
d = _make_socket_dir(fake_tmpdir, "h_orphan12345", pid=12345)
|
||||
|
||||
kill_calls = []
|
||||
original_kill = os.kill
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if sig == 0:
|
||||
return # pretend process exists
|
||||
# Don't actually kill anything
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
# Post-#21561 the liveness probe goes through
|
||||
# ``gateway.status._pid_exists`` (which wraps ``psutil.pid_exists``
|
||||
# so it's safe on Windows — ``os.kill(pid, 0)`` is bpo-14484).
|
||||
with patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Should have checked existence (sig 0) then killed (SIGTERM)
|
||||
assert (12345, 0) in kill_calls
|
||||
assert (12345, signal.SIGTERM) in kill_calls
|
||||
|
||||
def test_tracked_session_is_not_reaped(self, fake_tmpdir):
|
||||
|
|
@ -120,21 +119,31 @@ class TestReapOrphanedBrowserSessions:
|
|||
# Dir should still exist
|
||||
assert d.exists()
|
||||
|
||||
def test_permission_error_on_kill_check_skips(self, fake_tmpdir):
|
||||
"""If we can't check the PID (PermissionError), skip it."""
|
||||
def test_alive_legacy_daemon_is_reaped(self, fake_tmpdir):
|
||||
"""Alive, untracked, legacy (no owner_pid) daemon is reaped.
|
||||
|
||||
Post-#21561 the liveness probe goes through
|
||||
``gateway.status._pid_exists`` (which wraps ``psutil.pid_exists``
|
||||
because ``os.kill(pid, 0)`` is a footgun on Windows — bpo-14484).
|
||||
With no owner_pid file and no tracked-name entry, the reaper
|
||||
SIGTERMs the daemon and removes its socket dir regardless of
|
||||
whether SIGTERM succeeded (best-effort semantics).
|
||||
"""
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
d = _make_socket_dir(fake_tmpdir, "h_perm1234567", pid=12345)
|
||||
|
||||
def mock_kill(pid, sig):
|
||||
if sig == 0:
|
||||
raise PermissionError("not our process")
|
||||
sigterm_calls = []
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
def mock_kill(pid, sig):
|
||||
sigterm_calls.append((pid, sig))
|
||||
|
||||
with patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Dir should still exist (we didn't touch someone else's process)
|
||||
assert d.exists()
|
||||
assert (12345, signal.SIGTERM) in sigterm_calls
|
||||
assert not d.exists()
|
||||
|
||||
def test_cdp_sessions_are_also_reaped(self, fake_tmpdir):
|
||||
"""CDP sessions (cdp_ prefix) are also scanned."""
|
||||
|
|
@ -196,19 +205,13 @@ class TestOwnerPidCrossProcess:
|
|||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if pid == os.getpid() and sig == 0:
|
||||
return # real existence check: owner alive
|
||||
if sig == 0:
|
||||
return # pretend daemon exists too
|
||||
# Don't actually kill anything
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
# Owner alive → reaper skips without ever probing the daemon.
|
||||
with patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# We should have checked the owner (sig 0) but never tried to kill
|
||||
# the daemon.
|
||||
assert (12345, signal.SIGTERM) not in kill_calls
|
||||
# Dir should still exist
|
||||
assert d.exists()
|
||||
|
||||
def test_dead_owner_triggers_reap(self, fake_tmpdir):
|
||||
|
|
@ -224,20 +227,15 @@ class TestOwnerPidCrossProcess:
|
|||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if pid == 999999999 and sig == 0:
|
||||
raise ProcessLookupError # owner dead
|
||||
if pid == 12345 and sig == 0:
|
||||
return # daemon still alive
|
||||
# SIGTERM to daemon — noop in test
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
# Owner 999999999 dead, daemon 12345 alive.
|
||||
pid_alive = {999999999: False, 12345: True}
|
||||
with patch("gateway.status._pid_exists",
|
||||
side_effect=lambda pid: pid_alive.get(int(pid), False)), \
|
||||
patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Owner checked (returned dead), daemon checked (alive), daemon killed
|
||||
assert (999999999, 0) in kill_calls
|
||||
assert (12345, 0) in kill_calls
|
||||
assert (12345, signal.SIGTERM) in kill_calls
|
||||
# Dir cleaned up
|
||||
assert not d.exists()
|
||||
|
||||
def test_corrupt_owner_pid_falls_back_to_legacy(self, fake_tmpdir):
|
||||
|
|
@ -258,7 +256,8 @@ class TestOwnerPidCrossProcess:
|
|||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
with patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Legacy path took over → tracked → not reaped
|
||||
|
|
@ -266,10 +265,12 @@ class TestOwnerPidCrossProcess:
|
|||
assert d.exists()
|
||||
|
||||
def test_owner_pid_permission_error_treated_as_alive(self, fake_tmpdir):
|
||||
"""If os.kill(owner, 0) raises PermissionError, treat owner as alive.
|
||||
"""Owner PID owned by another user → treat as alive.
|
||||
|
||||
PermissionError means the PID exists but is owned by a different user —
|
||||
we must not assume the owner is dead (could kill someone else's daemon).
|
||||
Post-#21561 this is handled inside ``gateway.status._pid_exists``
|
||||
(via psutil's ``OpenProcess`` returning ``ERROR_ACCESS_DENIED`` on
|
||||
Windows, or via the POSIX fallback's ``except PermissionError``
|
||||
branch). Exposed to callers as ``alive=True``.
|
||||
"""
|
||||
from tools.browser_tool import _reap_orphaned_browser_sessions
|
||||
|
||||
|
|
@ -281,13 +282,13 @@ class TestOwnerPidCrossProcess:
|
|||
|
||||
def mock_kill(pid, sig):
|
||||
kill_calls.append((pid, sig))
|
||||
if pid == 22222 and sig == 0:
|
||||
raise PermissionError("not our user")
|
||||
|
||||
with patch("os.kill", side_effect=mock_kill):
|
||||
# Owner 22222 reported alive (PermissionError collapses to True
|
||||
# inside _pid_exists). Daemon never probed, never SIGTERMed.
|
||||
with patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch("os.kill", side_effect=mock_kill):
|
||||
_reap_orphaned_browser_sessions()
|
||||
|
||||
# Must NOT have tried to kill the daemon
|
||||
assert (12345, signal.SIGTERM) not in kill_calls
|
||||
assert d.exists()
|
||||
|
||||
|
|
|
|||
|
|
@ -130,15 +130,18 @@ class TestStdioPidTracking:
|
|||
fake_sigkill = 9
|
||||
monkeypatch.setattr(signal, "SIGKILL", fake_sigkill, raising=False)
|
||||
|
||||
# Post-#21561 the alive check routes through
|
||||
# ``gateway.status._pid_exists`` (so it's safe on Windows — see
|
||||
# bpo-14484). Return True so the SIGKILL escalation fires.
|
||||
with patch("tools.mcp_tool.os.kill") as mock_kill, \
|
||||
patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch("time.sleep") as mock_sleep:
|
||||
_kill_orphaned_mcp_children()
|
||||
|
||||
# SIGTERM, then alive-check (signal 0), then SIGKILL
|
||||
# SIGTERM then SIGKILL; the alive check no longer touches os.kill.
|
||||
mock_kill.assert_any_call(fake_pid, signal.SIGTERM)
|
||||
mock_kill.assert_any_call(fake_pid, 0) # alive check
|
||||
mock_kill.assert_any_call(fake_pid, fake_sigkill)
|
||||
assert mock_kill.call_count == 3
|
||||
assert mock_kill.call_count == 2
|
||||
mock_sleep.assert_called_once_with(2)
|
||||
|
||||
with _lock:
|
||||
|
|
|
|||
|
|
@ -728,18 +728,30 @@ class TestKillProcess:
|
|||
s.detached = True
|
||||
registry._running[s.id] = s
|
||||
|
||||
calls = []
|
||||
terminate_calls = []
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
calls.append((pid, sig))
|
||||
class FakeProcess:
|
||||
def __init__(self, pid):
|
||||
self.pid = pid
|
||||
def children(self, recursive=False):
|
||||
return []
|
||||
def terminate(self):
|
||||
terminate_calls.append(("terminate", self.pid))
|
||||
|
||||
import psutil as _psutil
|
||||
|
||||
try:
|
||||
with patch("tools.process_registry.os.kill", side_effect=fake_kill):
|
||||
# Post-#21561: liveness probe routes through
|
||||
# ``ProcessRegistry._is_host_pid_alive`` (→
|
||||
# ``gateway.status._pid_exists``), and the actual kill on POSIX
|
||||
# routes through ``psutil.Process(pid).terminate()``. Neither
|
||||
# touches ``os.kill`` directly. Mock both seams.
|
||||
with patch("gateway.status._pid_exists", return_value=True), \
|
||||
patch.object(_psutil, "Process", side_effect=lambda pid: FakeProcess(pid)):
|
||||
result = registry.kill_process(s.id)
|
||||
|
||||
assert result["status"] == "killed"
|
||||
assert (424242, 0) in calls
|
||||
assert (424242, signal.SIGTERM) in calls
|
||||
assert ("terminate", 424242) in terminate_calls
|
||||
finally:
|
||||
registry._running.pop(s.id, None)
|
||||
|
||||
|
|
|
|||
|
|
@ -308,54 +308,106 @@ class TestSigkillFallback:
|
|||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# OSError widening on os.kill(pid, 0) probes
|
||||
# OSError widening on liveness probes
|
||||
#
|
||||
# Post-#21561, ``ProcessRegistry._is_host_pid_alive`` delegates to
|
||||
# ``gateway.status._pid_exists``, which is the cross-platform liveness
|
||||
# primitive (psutil-first, ctypes/os.kill fallback). The tests below assert
|
||||
# (a) the delegation is correct and (b) ``_pid_exists`` correctly widens
|
||||
# Windows' ``OSError(WinError 87)`` / ``PermissionError`` behavior on the
|
||||
# POSIX fallback branch.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProcessRegistryOSErrorWidening:
|
||||
"""_is_host_pid_alive must treat Windows' OSError as 'not alive'."""
|
||||
"""_is_host_pid_alive delegates to gateway.status._pid_exists."""
|
||||
|
||||
def test_oserror_treated_as_not_alive(self, monkeypatch):
|
||||
"""_pid_exists → False propagates as _is_host_pid_alive → False."""
|
||||
from tools.process_registry import ProcessRegistry
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
# Simulate Windows' WinError 87 for an unknown PID
|
||||
raise OSError(22, "Invalid argument")
|
||||
|
||||
monkeypatch.setattr("tools.process_registry.os.kill", fake_kill)
|
||||
monkeypatch.setattr("gateway.status._pid_exists", lambda pid: False)
|
||||
assert ProcessRegistry._is_host_pid_alive(12345) is False
|
||||
|
||||
def test_permission_error_treated_as_not_alive(self, monkeypatch):
|
||||
"""Conservative: PermissionError also means 'not alive' (matches existing behavior)."""
|
||||
def test_permission_error_treated_as_alive(self, monkeypatch):
|
||||
"""PermissionError is encoded by _pid_exists as alive=True; propagates as-is.
|
||||
|
||||
This is a meaningful semantic change from the pre-#21561 version of
|
||||
this test (which asserted PermissionError → not-alive). The old
|
||||
``os.kill(pid, 0)``-based probe couldn't distinguish "gone" from
|
||||
"owned by another user" on some platforms, so it conservatively
|
||||
returned False. The new psutil-based probe CAN distinguish them via
|
||||
``OpenProcess + ERROR_ACCESS_DENIED`` on Windows / ``except
|
||||
PermissionError`` on POSIX, so alive=True is correct.
|
||||
"""
|
||||
from tools.process_registry import ProcessRegistry
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
raise PermissionError(1, "Operation not permitted")
|
||||
monkeypatch.setattr("gateway.status._pid_exists", lambda pid: True)
|
||||
assert ProcessRegistry._is_host_pid_alive(12345) is True
|
||||
|
||||
monkeypatch.setattr("tools.process_registry.os.kill", fake_kill)
|
||||
assert ProcessRegistry._is_host_pid_alive(12345) is False
|
||||
|
||||
def test_zero_or_none_pid_returns_false_without_calling_kill(self, monkeypatch):
|
||||
def test_zero_or_none_pid_returns_false_without_probing(self, monkeypatch):
|
||||
"""No wasted syscall on falsy pids."""
|
||||
from tools.process_registry import ProcessRegistry
|
||||
|
||||
kill_calls = []
|
||||
probes = []
|
||||
monkeypatch.setattr(
|
||||
"tools.process_registry.os.kill",
|
||||
lambda pid, sig: kill_calls.append(pid),
|
||||
"gateway.status._pid_exists",
|
||||
lambda pid: probes.append(pid) or True,
|
||||
)
|
||||
assert ProcessRegistry._is_host_pid_alive(None) is False
|
||||
assert ProcessRegistry._is_host_pid_alive(0) is False
|
||||
assert kill_calls == []
|
||||
assert probes == []
|
||||
|
||||
def test_alive_pid_returns_true(self, monkeypatch):
|
||||
from tools.process_registry import ProcessRegistry
|
||||
|
||||
# os.kill returning None (default) means "probe succeeded → pid alive"
|
||||
monkeypatch.setattr("tools.process_registry.os.kill", lambda pid, sig: None)
|
||||
monkeypatch.setattr("gateway.status._pid_exists", lambda pid: True)
|
||||
assert ProcessRegistry._is_host_pid_alive(os.getpid()) is True
|
||||
|
||||
|
||||
class TestPidExistsOSErrorWidening:
|
||||
"""gateway.status._pid_exists itself must widen Windows errors correctly.
|
||||
|
||||
The POSIX fallback branch (reached when psutil isn't importable) is the
|
||||
only path where Python raises ``OSError(WinError 87)`` on Windows for a
|
||||
gone PID instead of ``ProcessLookupError``. The function must catch the
|
||||
wider ``OSError`` to match POSIX semantics.
|
||||
"""
|
||||
|
||||
def test_oserror_gone_pid_returns_false(self, monkeypatch):
|
||||
"""Simulate Windows' OSError(WinError 87) for a gone PID via the POSIX fallback."""
|
||||
from gateway import status
|
||||
|
||||
# Force the psutil-first branch to miss so we exercise the fallback.
|
||||
monkeypatch.setitem(
|
||||
__import__("sys").modules, "psutil",
|
||||
type("P", (), {"pid_exists": staticmethod(lambda pid: (_ for _ in ()).throw(ImportError()))})()
|
||||
)
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", False)
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
raise OSError(22, "Invalid argument")
|
||||
|
||||
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||
assert status._pid_exists(12345) is False
|
||||
|
||||
def test_permission_error_returns_true(self, monkeypatch):
|
||||
"""POSIX fallback: PermissionError means alive (owned by another user)."""
|
||||
from gateway import status
|
||||
|
||||
monkeypatch.setitem(
|
||||
__import__("sys").modules, "psutil",
|
||||
type("P", (), {"pid_exists": staticmethod(lambda pid: (_ for _ in ()).throw(ImportError()))})()
|
||||
)
|
||||
monkeypatch.setattr(status, "_IS_WINDOWS", False)
|
||||
|
||||
def fake_kill(pid, sig):
|
||||
raise PermissionError(1, "Operation not permitted")
|
||||
|
||||
monkeypatch.setattr(status.os, "kill", fake_kill)
|
||||
assert status._pid_exists(12345) is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# tzdata dependency
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue