"""Tests for issue #26670 — concurrent hermes.exe detection and improved quarantine retry / reboot-deferred fallback during `hermes update` on Windows. These tests force ``_is_windows`` to return ``True`` via patching so the Windows-specific code paths can be exercised on any host. """ from __future__ import annotations import os import sys import types from pathlib import Path from types import SimpleNamespace from unittest.mock import MagicMock, patch import pytest from hermes_cli import main as cli_main # Tests in this module either exercise the REAL _detect_concurrent_hermes_instances # helper (and need the autouse stub in tests/hermes_cli/conftest.py disabled), # or supply their own explicit return value via patch.object. Mark the whole # module so the conftest fixture skips its default stub. pytestmark = pytest.mark.real_concurrent_gate # --------------------------------------------------------------------------- # _detect_concurrent_hermes_instances # --------------------------------------------------------------------------- def _make_proc(pid: int, exe: str, name: str = "hermes.exe"): """Build a duck-typed psutil Process stand-in with the .info dict.""" proc = MagicMock() proc.info = {"pid": pid, "exe": exe, "name": name} return proc @patch.object(cli_main, "_is_windows", return_value=True) def test_detect_concurrent_returns_empty_when_no_other_processes(_winp, tmp_path): scripts_dir = tmp_path (scripts_dir / "hermes.exe").write_bytes(b"") (scripts_dir / "hermes-gateway.exe").write_bytes(b"") fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter([])) with patch.dict(sys.modules, {"psutil": fake_psutil}): result = cli_main._detect_concurrent_hermes_instances(scripts_dir) assert result == [] @patch.object(cli_main, "_is_windows", return_value=True) def test_detect_concurrent_excludes_self_pid(_winp, tmp_path): scripts_dir = tmp_path shim = scripts_dir / "hermes.exe" shim.write_bytes(b"") my_pid = os.getpid() procs = [_make_proc(my_pid, str(shim), "hermes.exe")] fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(procs)) with patch.dict(sys.modules, {"psutil": fake_psutil}): result = cli_main._detect_concurrent_hermes_instances(scripts_dir) assert result == [] @patch.object(cli_main, "_is_windows", return_value=True) def test_detect_concurrent_finds_other_hermes_process(_winp, tmp_path): scripts_dir = tmp_path shim = scripts_dir / "hermes.exe" shim.write_bytes(b"") other_pid = os.getpid() + 1 procs = [ _make_proc(other_pid, str(shim), "hermes.exe"), _make_proc(os.getpid() + 2, r"C:\\Windows\\System32\\notepad.exe", "notepad.exe"), ] fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(procs)) with patch.dict(sys.modules, {"psutil": fake_psutil}): result = cli_main._detect_concurrent_hermes_instances(scripts_dir) assert result == [(other_pid, "hermes.exe")] @patch.object(cli_main, "_is_windows", return_value=True) def test_detect_concurrent_matches_case_insensitively(_winp, tmp_path): scripts_dir = tmp_path shim = scripts_dir / "hermes.exe" shim.write_bytes(b"") # Simulate the desktop spawning hermes.EXE (uppercase ext) from same path upper = str(shim).replace("hermes.exe", "HERMES.EXE") procs = [_make_proc(9999, upper, "HERMES.EXE")] fake_psutil = types.SimpleNamespace(process_iter=lambda attrs: iter(procs)) with patch.dict(sys.modules, {"psutil": fake_psutil}): result = cli_main._detect_concurrent_hermes_instances(scripts_dir) assert result == [(9999, "HERMES.EXE")] @patch.object(cli_main, "_is_windows", return_value=True) def test_detect_concurrent_no_psutil_returns_empty(_winp, tmp_path): scripts_dir = tmp_path (scripts_dir / "hermes.exe").write_bytes(b"") # Block psutil import — simulate environment without it. with patch.dict(sys.modules, {"psutil": None}): result = cli_main._detect_concurrent_hermes_instances(scripts_dir) assert result == [] @patch.object(cli_main, "_is_windows", return_value=False) def test_detect_concurrent_is_noop_off_windows(_winp, tmp_path): """No process enumeration off-Windows; the file-lock issue is Windows-only.""" assert cli_main._detect_concurrent_hermes_instances(tmp_path) == [] # --------------------------------------------------------------------------- # _format_concurrent_instances_message # --------------------------------------------------------------------------- def test_format_message_mentions_pids_and_remediation(tmp_path): matches = [(1234, "hermes.exe"), (5678, "hermes.exe")] msg = cli_main._format_concurrent_instances_message(matches, tmp_path) assert "1234" in msg assert "5678" in msg assert "hermes.exe" in msg assert "Hermes Desktop" in msg assert "--force" in msg # Mentions the file that would have been overwritten assert str(tmp_path / "hermes.exe") in msg # --------------------------------------------------------------------------- # _quarantine_running_hermes_exe — retry + reboot-deferred fallback # --------------------------------------------------------------------------- @patch.object(cli_main, "_is_windows", return_value=True) def test_quarantine_succeeds_first_attempt(_winp, tmp_path): """When the rename works immediately, no warning, single rename pair returned.""" shim = tmp_path / "hermes.exe" shim.write_bytes(b"old") pairs = cli_main._quarantine_running_hermes_exe(tmp_path) assert len(pairs) == 1 orig, quarantine = pairs[0] assert orig == shim assert quarantine.name.startswith("hermes.exe.old.") assert quarantine.exists() assert not shim.exists() @patch.object(cli_main, "_is_windows", return_value=True) def test_quarantine_retries_then_succeeds(_winp, tmp_path, monkeypatch): """A transient OSError on the first attempt should not be fatal.""" shim = tmp_path / "hermes.exe" shim.write_bytes(b"old") original_rename = Path.rename call_count = {"n": 0} def flaky_rename(self, target): call_count["n"] += 1 if call_count["n"] == 1: raise OSError(32, "share violation (simulated AV scan)") return original_rename(self, target) # Speed up the test: avoid actual sleeps in the backoff schedule. monkeypatch.setattr(cli_main, "_hermes_exe_shims", lambda d: [shim]) with patch.object(Path, "rename", flaky_rename), patch( "time.sleep", lambda *_a, **_k: None ): pairs = cli_main._quarantine_running_hermes_exe(tmp_path) assert call_count["n"] >= 2 assert len(pairs) == 1 assert not shim.exists() @patch.object(cli_main, "_is_windows", return_value=True) def test_quarantine_falls_back_to_reboot_schedule(_winp, tmp_path, capsys, monkeypatch): """When every retry fails, we schedule via MoveFileEx and warn helpfully.""" shim = tmp_path / "hermes.exe" shim.write_bytes(b"locked") def always_fails(self, target): raise OSError(32, "The process cannot access the file (simulated lock)") scheduled_calls: list[tuple[Path, Path]] = [] def fake_schedule(s: Path, q: Path) -> bool: scheduled_calls.append((s, q)) return True monkeypatch.setattr(cli_main, "_hermes_exe_shims", lambda d: [shim]) with patch.object(Path, "rename", always_fails), patch.object( cli_main, "_schedule_replace_on_reboot", fake_schedule ), patch("time.sleep", lambda *_a, **_k: None): pairs = cli_main._quarantine_running_hermes_exe(tmp_path) captured = capsys.readouterr().out # The reboot-deferred path was used. assert scheduled_calls and scheduled_calls[0][0] == shim # It is NOT added to the returned roll-back list (the issue calls this # out — don't undo a deferred operation). assert pairs == [] # The user got a clear message, not raw [WinError 32]. assert "scheduled" in captured.lower() assert "reboot" in captured.lower() @patch.object(cli_main, "_is_windows", return_value=True) def test_quarantine_actionable_warning_when_everything_fails( _winp, tmp_path, capsys, monkeypatch ): """When even MoveFileEx fails we should print remediation hints, not a bare error.""" shim = tmp_path / "hermes.exe" shim.write_bytes(b"locked") def always_fails(self, target): raise OSError(32, "share violation") monkeypatch.setattr(cli_main, "_hermes_exe_shims", lambda d: [shim]) with patch.object(Path, "rename", always_fails), patch.object( cli_main, "_schedule_replace_on_reboot", lambda *_a, **_k: False ), patch("time.sleep", lambda *_a, **_k: None): pairs = cli_main._quarantine_running_hermes_exe(tmp_path) captured = capsys.readouterr().out assert pairs == [] # New message format: no raw "[WinError 32]" dump; instead names the cause # and tells the user what to do. assert "another process" in captured.lower() assert "Hermes Desktop" in captured or "gateway" in captured.lower() # --------------------------------------------------------------------------- # cmd_update integration — concurrent-instance gate # --------------------------------------------------------------------------- @patch.object(cli_main, "_is_windows", return_value=True) def test_cmd_update_aborts_on_concurrent_instance(_winp, tmp_path, capsys): """If another hermes.exe is running, the update bails out before touching the working tree (exit code 2).""" scripts_dir = tmp_path / "Scripts" scripts_dir.mkdir() args = SimpleNamespace( check=False, gateway=False, yes=False, force=False, backup=False, no_backup=True, ) with patch.object( cli_main, "_venv_scripts_dir", return_value=scripts_dir ), patch.object( cli_main, "_detect_concurrent_hermes_instances", return_value=[(4242, "hermes.exe")], ), patch.object( cli_main, "_run_pre_update_backup" ) as mock_backup, patch.object( cli_main, "_install_hangup_protection", return_value={} ), patch.object( cli_main, "_finalize_update_output" ): with pytest.raises(SystemExit) as excinfo: cli_main.cmd_update(args) assert excinfo.value.code == 2 # The pre-update backup runs AFTER the concurrent check; should not have # been invoked. mock_backup.assert_not_called() captured = capsys.readouterr().out assert "4242" in captured assert "--force" in captured @patch.object(cli_main, "_is_windows", return_value=True) def test_cmd_update_force_bypasses_concurrent_check(_winp, tmp_path): """--force lets the update proceed past the concurrent-instance gate (subsequent steps are mocked so we only verify the gate is skipped).""" scripts_dir = tmp_path / "Scripts" scripts_dir.mkdir() args = SimpleNamespace( check=False, gateway=False, yes=False, force=True, # ← the bypass backup=False, no_backup=True, ) detect = MagicMock(return_value=[(9, "hermes.exe")]) # Short-circuit out of _cmd_update_impl via a sentinel raise immediately # AFTER the gate. _run_pre_update_backup is the first call after the gate. sentinel = RuntimeError("reached post-gate body") with patch.object( cli_main, "_venv_scripts_dir", return_value=scripts_dir ), patch.object( cli_main, "_detect_concurrent_hermes_instances", detect ), patch.object( cli_main, "_run_pre_update_backup", side_effect=sentinel ), patch.object( cli_main, "_install_hangup_protection", return_value={} ), patch.object( cli_main, "_finalize_update_output" ): with pytest.raises(RuntimeError, match="reached post-gate body"): cli_main.cmd_update(args) # When --force is set, we should not have even consulted psutil. detect.assert_not_called()