hermes-agent/tests/hermes_cli/test_update_concurrent_quarantine.py
Teknium 46f8948bad test+harden(cli): cover parent-chain walk in concurrent-instance detection
Follow-up to @Strontvod's fix.

Tests:
- Five new tests in test_update_concurrent_quarantine.py cover the parent-
  chain exclusion: the .exe launcher is excluded, an unrelated sibling
  hermes.exe is still reported, multi-level ancestry is fully excluded,
  PID cycles in the parent chain don't hang, and a partially-stubbed
  psutil (no Process attribute) degrades gracefully instead of crashing.
- New _fake_psutil_with_parent_chain helper builds a fuller stand-in
  (Process / NoSuchProcess / AccessDenied + process_iter) than the
  process_iter-only SimpleNamespace the older tests use.

Hardening:
- Broaden the except in the parent-walk to bare Exception. The original
  fix listed (NoSuchProcess, AccessDenied, ValueError), but those names
  are evaluated lazily during exception matching — if psutil is a partial
  stub without the attribute, the exception handler itself raises
  AttributeError that escapes. The function is documented as 'never raises'
  (the surrounding update flow depends on it), so the broader catch keeps
  the contract regardless of how the dependency is shaped.

AUTHOR_MAP:
- Map schepers.zander1@gmail.com -> Strontvod so the salvaged commit
  resolves to @Strontvod in the release notes.

All 18 detect_concurrent + quarantine tests pass.
2026-05-24 19:51:46 -07:00

504 lines
18 KiB
Python

"""Tests for issue #26670 — concurrent hermes.exe detection and improved
quarantine retry / reboot-deferred fallback during `hermes update` on Windows.
These tests force ``_is_windows`` to return ``True`` via patching so the
Windows-specific code paths can be exercised on any host.
"""
from __future__ import annotations
import os
import sys
import types
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli import main as cli_main
# Tests in this module either exercise the REAL _detect_concurrent_hermes_instances
# helper (and need the autouse stub in tests/hermes_cli/conftest.py disabled),
# or supply their own explicit return value via patch.object. Mark the whole
# module so the conftest fixture skips its default stub.
pytestmark = pytest.mark.real_concurrent_gate
# ---------------------------------------------------------------------------
# _detect_concurrent_hermes_instances
# ---------------------------------------------------------------------------
def _make_proc(pid: int, exe: str, name: str = "hermes.exe"):
"""Build a duck-typed psutil Process stand-in with the .info dict."""
proc = MagicMock()
proc.info = {"pid": pid, "exe": exe, "name": name}
return proc
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_returns_empty_when_no_other_processes(_winp, tmp_path):
scripts_dir = tmp_path
(scripts_dir / "hermes.exe").write_bytes(b"")
(scripts_dir / "hermes-gateway.exe").write_bytes(b"")
fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter([]))
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == []
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_excludes_self_pid(_winp, tmp_path):
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
my_pid = os.getpid()
procs = [_make_proc(my_pid, str(shim), "hermes.exe")]
fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(procs))
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == []
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_finds_other_hermes_process(_winp, tmp_path):
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
other_pid = os.getpid() + 1
procs = [
_make_proc(other_pid, str(shim), "hermes.exe"),
_make_proc(os.getpid() + 2, r"C:\\Windows\\System32\\notepad.exe", "notepad.exe"),
]
fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(procs))
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == [(other_pid, "hermes.exe")]
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_matches_case_insensitively(_winp, tmp_path):
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
# Simulate the desktop spawning hermes.EXE (uppercase ext) from same path
upper = str(shim).replace("hermes.exe", "HERMES.EXE")
procs = [_make_proc(9999, upper, "HERMES.EXE")]
fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(procs))
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == [(9999, "HERMES.EXE")]
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_no_psutil_returns_empty(_winp, tmp_path):
scripts_dir = tmp_path
(scripts_dir / "hermes.exe").write_bytes(b"")
# Block psutil import — simulate environment without it.
with patch.dict(sys.modules, {"psutil": None}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == []
@patch.object(cli_main, "_is_windows", return_value=False)
def test_detect_concurrent_is_noop_off_windows(_winp, tmp_path):
"""No process enumeration off-Windows; the file-lock issue is Windows-only."""
assert cli_main._detect_concurrent_hermes_instances(tmp_path) == []
# ---------------------------------------------------------------------------
# Parent-chain exclusion (issue #30768 follow-up — the setuptools .exe
# launcher on Windows is a separate native process that spawns python.exe;
# excluding only ``os.getpid()`` flags the launcher as a concurrent instance.
# ---------------------------------------------------------------------------
def _fake_psutil_with_parent_chain(
parent_chain: list[int],
proc_iter_rows: list,
):
"""Build a psutil stand-in that has Process()/parent() AND process_iter().
``parent_chain`` is the list of PIDs returned by successive ``.parent()``
calls starting from the seed (``os.getpid()``); the last entry's
``.parent()`` returns ``None`` to terminate the walk.
"""
class _FakeProc:
def __init__(self, pid: int, chain: list[int]):
self.pid = pid
self._chain = chain
def parent(self):
if not self._chain:
return None
next_pid = self._chain[0]
return _FakeProc(next_pid, self._chain[1:])
class _NoSuchProcess(Exception):
pass
class _AccessDenied(Exception):
pass
def _process(pid):
return _FakeProc(pid, list(parent_chain))
return types.SimpleNamespace(
Process=_process,
NoSuchProcess=_NoSuchProcess,
AccessDenied=_AccessDenied,
process_iter=lambda attrs: iter(proc_iter_rows),
)
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_excludes_parent_chain(_winp, tmp_path):
"""The .exe launcher (parent of os.getpid()) must NOT be flagged.
Simulates the real Windows topology: hermes.exe launcher (PID L) spawns
python.exe (PID os.getpid()). Both run from the same shim path. With the
old single-PID exclusion, L would be reported as a concurrent instance.
"""
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
me = os.getpid()
launcher_pid = me + 100 # the .exe launcher — our parent
rows = [
_make_proc(me, str(shim), "python.exe"),
_make_proc(launcher_pid, str(shim), "hermes.exe"),
]
fake_psutil = _fake_psutil_with_parent_chain(
parent_chain=[launcher_pid],
proc_iter_rows=rows,
)
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
# Both self AND the launcher are excluded; no false positive.
assert result == []
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_still_finds_unrelated_other_hermes(_winp, tmp_path):
"""A sibling hermes.exe outside our ancestor chain must still be reported."""
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
me = os.getpid()
launcher_pid = me + 100 # our .exe launcher (parent — must be excluded)
sibling_pid = me + 200 # an UNRELATED hermes.exe (must still be reported)
rows = [
_make_proc(me, str(shim), "python.exe"),
_make_proc(launcher_pid, str(shim), "hermes.exe"),
_make_proc(sibling_pid, str(shim), "hermes.exe"),
]
fake_psutil = _fake_psutil_with_parent_chain(
parent_chain=[launcher_pid],
proc_iter_rows=rows,
)
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == [(sibling_pid, "hermes.exe")]
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_parent_chain_walks_deep(_winp, tmp_path):
"""Multi-level ancestry (shell → launcher → python) is fully excluded."""
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
me = os.getpid()
parent_pid = me + 1
grandparent_pid = me + 2
greatgrandparent_pid = me + 3
rows = [
_make_proc(me, str(shim), "python.exe"),
_make_proc(parent_pid, str(shim), "hermes.exe"),
_make_proc(grandparent_pid, str(shim), "hermes.exe"),
_make_proc(greatgrandparent_pid, str(shim), "hermes.exe"),
]
fake_psutil = _fake_psutil_with_parent_chain(
parent_chain=[parent_pid, grandparent_pid, greatgrandparent_pid],
proc_iter_rows=rows,
)
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
assert result == []
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_parent_walk_handles_cycle(_winp, tmp_path):
"""A PID cycle in the parent chain must not hang the walk."""
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
me = os.getpid()
bogus_loop_pid = me + 1
rows = [_make_proc(me, str(shim), "python.exe")]
# Chain that points back to ``me`` — the loop-detection branch must break.
fake_psutil = _fake_psutil_with_parent_chain(
parent_chain=[bogus_loop_pid, me, bogus_loop_pid],
proc_iter_rows=rows,
)
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
# No crash, no hang; self + bogus_loop_pid excluded; no others reported.
assert result == []
@patch.object(cli_main, "_is_windows", return_value=True)
def test_detect_concurrent_parent_walk_handles_stub_without_process(_winp, tmp_path):
"""Partially-stubbed psutil (no Process attr) must NOT crash the helper.
The function documents itself as "never raises"; a unit-test stub that
only models ``process_iter`` must still complete cleanly with a sensible
result rather than escape ``AttributeError`` to the caller.
"""
scripts_dir = tmp_path
shim = scripts_dir / "hermes.exe"
shim.write_bytes(b"")
me = os.getpid()
other_pid = me + 1
rows = [
_make_proc(me, str(shim), "hermes.exe"),
_make_proc(other_pid, str(shim), "hermes.exe"),
]
# SimpleNamespace with ONLY process_iter — no Process / NoSuchProcess.
fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(rows))
with patch.dict(sys.modules, {"psutil": fake_psutil}):
result = cli_main._detect_concurrent_hermes_instances(scripts_dir)
# Parent-walk silently failed; self still excluded; other still reported.
assert result == [(other_pid, "hermes.exe")]
# ---------------------------------------------------------------------------
# _format_concurrent_instances_message
# ---------------------------------------------------------------------------
def test_format_message_mentions_pids_and_remediation(tmp_path):
matches = [(1234, "hermes.exe"), (5678, "hermes.exe")]
msg = cli_main._format_concurrent_instances_message(matches, tmp_path)
assert "1234" in msg
assert "5678" in msg
assert "hermes.exe" in msg
assert "Hermes Desktop" in msg
assert "--force" in msg
# Mentions the file that would have been overwritten
assert str(tmp_path / "hermes.exe") in msg
# ---------------------------------------------------------------------------
# _quarantine_running_hermes_exe — retry + reboot-deferred fallback
# ---------------------------------------------------------------------------
@patch.object(cli_main, "_is_windows", return_value=True)
def test_quarantine_succeeds_first_attempt(_winp, tmp_path):
"""When the rename works immediately, no warning, single rename pair returned."""
shim = tmp_path / "hermes.exe"
shim.write_bytes(b"old")
pairs = cli_main._quarantine_running_hermes_exe(tmp_path)
assert len(pairs) == 1
orig, quarantine = pairs[0]
assert orig == shim
assert quarantine.name.startswith("hermes.exe.old.")
assert quarantine.exists()
assert not shim.exists()
@patch.object(cli_main, "_is_windows", return_value=True)
def test_quarantine_retries_then_succeeds(_winp, tmp_path, monkeypatch):
"""A transient OSError on the first attempt should not be fatal."""
shim = tmp_path / "hermes.exe"
shim.write_bytes(b"old")
original_rename = Path.rename
call_count = {"n": 0}
def flaky_rename(self, target):
call_count["n"] += 1
if call_count["n"] == 1:
raise OSError(32, "share violation (simulated AV scan)")
return original_rename(self, target)
# Speed up the test: avoid actual sleeps in the backoff schedule.
monkeypatch.setattr(cli_main, "_hermes_exe_shims", lambda d: [shim])
with patch.object(Path, "rename", flaky_rename), patch(
"time.sleep", lambda *_a, **_k: None
):
pairs = cli_main._quarantine_running_hermes_exe(tmp_path)
assert call_count["n"] >= 2
assert len(pairs) == 1
assert not shim.exists()
@patch.object(cli_main, "_is_windows", return_value=True)
def test_quarantine_falls_back_to_reboot_schedule(_winp, tmp_path, capsys, monkeypatch):
"""When every retry fails, we schedule via MoveFileEx and warn helpfully."""
shim = tmp_path / "hermes.exe"
shim.write_bytes(b"locked")
def always_fails(self, target):
raise OSError(32, "The process cannot access the file (simulated lock)")
scheduled_calls: list[tuple[Path, Path]] = []
def fake_schedule(s: Path, q: Path) -> bool:
scheduled_calls.append((s, q))
return True
monkeypatch.setattr(cli_main, "_hermes_exe_shims", lambda d: [shim])
with patch.object(Path, "rename", always_fails), patch.object(
cli_main, "_schedule_replace_on_reboot", fake_schedule
), patch("time.sleep", lambda *_a, **_k: None):
pairs = cli_main._quarantine_running_hermes_exe(tmp_path)
captured = capsys.readouterr().out
# The reboot-deferred path was used.
assert scheduled_calls and scheduled_calls[0][0] == shim
# It is NOT added to the returned roll-back list (the issue calls this
# out — don't undo a deferred operation).
assert pairs == []
# The user got a clear message, not raw [WinError 32].
assert "scheduled" in captured.lower()
assert "reboot" in captured.lower()
@patch.object(cli_main, "_is_windows", return_value=True)
def test_quarantine_actionable_warning_when_everything_fails(
_winp, tmp_path, capsys, monkeypatch
):
"""When even MoveFileEx fails we should print remediation hints, not a bare error."""
shim = tmp_path / "hermes.exe"
shim.write_bytes(b"locked")
def always_fails(self, target):
raise OSError(32, "share violation")
monkeypatch.setattr(cli_main, "_hermes_exe_shims", lambda d: [shim])
with patch.object(Path, "rename", always_fails), patch.object(
cli_main, "_schedule_replace_on_reboot", lambda *_a, **_k: False
), patch("time.sleep", lambda *_a, **_k: None):
pairs = cli_main._quarantine_running_hermes_exe(tmp_path)
captured = capsys.readouterr().out
assert pairs == []
# New message format: no raw "[WinError 32]" dump; instead names the cause
# and tells the user what to do.
assert "another process" in captured.lower()
assert "Hermes Desktop" in captured or "gateway" in captured.lower()
# ---------------------------------------------------------------------------
# cmd_update integration — concurrent-instance gate
# ---------------------------------------------------------------------------
@patch.object(cli_main, "_is_windows", return_value=True)
def test_cmd_update_aborts_on_concurrent_instance(_winp, tmp_path, capsys):
"""If another hermes.exe is running, the update bails out before
touching the working tree (exit code 2)."""
scripts_dir = tmp_path / "Scripts"
scripts_dir.mkdir()
args = SimpleNamespace(
check=False,
gateway=False,
yes=False,
force=False,
backup=False,
no_backup=True,
)
with patch.object(
cli_main, "_venv_scripts_dir", return_value=scripts_dir
), patch.object(
cli_main,
"_detect_concurrent_hermes_instances",
return_value=[(4242, "hermes.exe")],
), patch.object(
cli_main, "_run_pre_update_backup"
) as mock_backup, patch.object(
cli_main, "_install_hangup_protection", return_value={}
), patch.object(
cli_main, "_finalize_update_output"
):
with pytest.raises(SystemExit) as excinfo:
cli_main.cmd_update(args)
assert excinfo.value.code == 2
# The pre-update backup runs AFTER the concurrent check; should not have
# been invoked.
mock_backup.assert_not_called()
captured = capsys.readouterr().out
assert "4242" in captured
assert "--force" in captured
@patch.object(cli_main, "_is_windows", return_value=True)
def test_cmd_update_force_bypasses_concurrent_check(_winp, tmp_path):
"""--force lets the update proceed past the concurrent-instance gate
(subsequent steps are mocked so we only verify the gate is skipped)."""
scripts_dir = tmp_path / "Scripts"
scripts_dir.mkdir()
args = SimpleNamespace(
check=False,
gateway=False,
yes=False,
force=True, # ← the bypass
backup=False,
no_backup=True,
)
detect = MagicMock(return_value=[(9, "hermes.exe")])
# Short-circuit out of _cmd_update_impl via a sentinel raise immediately
# AFTER the gate. _run_pre_update_backup is the first call after the gate.
sentinel = RuntimeError("reached post-gate body")
with patch.object(
cli_main, "_venv_scripts_dir", return_value=scripts_dir
), patch.object(
cli_main, "_detect_concurrent_hermes_instances", detect
), patch.object(
cli_main, "_run_pre_update_backup", side_effect=sentinel
), patch.object(
cli_main, "_install_hangup_protection", return_value={}
), patch.object(
cli_main, "_finalize_update_output"
):
with pytest.raises(RuntimeError, match="reached post-gate body"):
cli_main.cmd_update(args)
# When --force is set, we should not have even consulted psutil.
detect.assert_not_called()