diff --git a/scripts/run_tests.sh b/scripts/run_tests.sh index d7d8a85f502..3788aef4e5f 100755 --- a/scripts/run_tests.sh +++ b/scripts/run_tests.sh @@ -87,6 +87,22 @@ export LANG=C.UTF-8 export LC_ALL=C.UTF-8 export PYTHONHASHSEED=0 +# ── Live-gateway test guard (developer machines) ──────────────────────────── +# If a system-wide hermes pytest_live_guard plugin is installed at +# $HOME/.hermes/pytest_live_guard.py, force-load it here so every test run +# from this script gets the protection regardless of which worktree is +# checked out (in-tree tests/conftest.py guard may be missing on stale +# branches). Harmless on CI / fresh machines that don't have the file. +if [ -f "$HOME/.hermes/pytest_live_guard.py" ]; then + case ":${PYTHONPATH:-}:" in + *":$HOME/.hermes:"*) ;; + *) export PYTHONPATH="${PYTHONPATH:+$PYTHONPATH:}$HOME/.hermes" ;; + esac + if [[ ",${PYTEST_PLUGINS:-}," != *,pytest_live_guard,* ]]; then + export PYTEST_PLUGINS="${PYTEST_PLUGINS:+$PYTEST_PLUGINS,}pytest_live_guard" + fi +fi + # ── Worker count ──────────────────────────────────────────────────────────── # CI uses `-n auto` on ubuntu-latest which gives 4 workers. A 20-core # workstation with `-n auto` gets 20 workers and exposes test-ordering diff --git a/tests/conftest.py b/tests/conftest.py index 3744fa8326a..5d7f197f195 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -665,12 +665,27 @@ def _live_system_guard(request, monkeypatch): See block comment above for the why. Tests that genuinely need real signal delivery (e.g. PTY tests that SIGINT their own child) can opt out with ``@pytest.mark.live_system_guard_bypass``. + + Coverage (every primitive that can deliver a signal to or otherwise + terminate a foreign process): + • os.kill, os.killpg (POSIX) + • subprocess.run / Popen / call / check_call / check_output + • subprocess.getoutput / getstatusoutput + • os.system / os.popen + • pty.spawn + • asyncio.create_subprocess_exec / create_subprocess_shell + Subprocess inspection looks at the WHOLE command string (not just + tokens[0]), so ``bash -c "systemctl restart hermes-gateway"``, + ``sudo systemctl ...``, ``env systemctl ...``, ``setsid systemctl ...`` + are all caught. ``pkill``/``killall``/``taskkill`` invocations + targeting hermes/python patterns are also blocked. """ if request.node.get_closest_marker(_LIVE_SYSTEM_GUARD_BYPASS_MARK): yield return import os as _os + import shlex as _shlex import subprocess as _subprocess test_pid = _os.getpid() @@ -687,6 +702,14 @@ def _live_system_guard(request, monkeypatch): _initial_children = set() def _is_own_subtree(pid: int) -> bool: + # PID 0 means "our own process group"; -1 means "every process we + # can signal". Both are dangerous when paired with SIGTERM/SIGKILL, + # but pid 0 is technically scoped to our group so allow it; pid -1 + # is treated as foreign (refuse). + if pid == 0: + return True + if pid < 0: + return False if pid == test_pid or pid in _initial_children: return True if _psutil is None: @@ -742,55 +765,133 @@ def _live_system_guard(request, monkeypatch): monkeypatch.setattr(_os, "killpg", _guarded_killpg) + # ── Subprocess command-string inspection (whole-line) ────────── + _HERMES_TOKENS = ( + "hermes-gateway", + "hermes.service", + "hermes_cli.main gateway", + "hermes_cli/main.py gateway", + "gateway/run.py", + "hermes gateway", + ) + _MUTATING_VERBS = ( + "restart", "start", "stop", "kill", "reload", + "reset-failed", "enable", "disable", "mask", "unmask", + "daemon-reload", "try-restart", "reload-or-restart", + ) + _PROCESS_KILLERS = ("pkill", "killall", "taskkill", "skill", "fuser") + + def _cmd_to_string(cmd) -> str: + if cmd is None: + return "" + if isinstance(cmd, (bytes, bytearray)): + try: + return bytes(cmd).decode(errors="replace") + except Exception: + return "" + if isinstance(cmd, str): + return cmd + if isinstance(cmd, (list, tuple)): + try: + return " ".join(str(t) for t in cmd) + except Exception: + return "" + return str(cmd) + + def _matches_hermes_gateway(cmd_str: str) -> bool: + low = cmd_str.lower() + return any(tok in low for tok in _HERMES_TOKENS) + + def _is_blocked_systemctl(cmd) -> bool: + cmd_str = _cmd_to_string(cmd) + if "systemctl" not in cmd_str: + return False + if not _matches_hermes_gateway(cmd_str): + return False + try: + tokens = _shlex.split(cmd_str) + except ValueError: + tokens = cmd_str.split() + return any(verb in tokens for verb in _MUTATING_VERBS) + + def _is_process_killer(cmd) -> bool: + cmd_str = _cmd_to_string(cmd) + try: + tokens = _shlex.split(cmd_str) + except ValueError: + tokens = cmd_str.split() + if not tokens: + return False + for tok in tokens: + head = tok.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] + if head in _PROCESS_KILLERS: + low = cmd_str.lower() + # pkill -f pattern: catch hermes-themed patterns + a + # plain "python" -f which would catch the live gateway + # whose cmdline contains "python -m hermes_cli.main". + if ( + "hermes" in low + or "gateway" in low + or ("python" in low and "-f" in tokens) + ): + return True + return False + + def _check_subprocess_cmd(name, cmd): + if _is_blocked_systemctl(cmd): + raise RuntimeError( + f"tests/conftest.py live-system guard: blocked " + f"subprocess.{name}({cmd!r}) — would mutate the " + "live hermes-gateway systemd unit. Mock " + "subprocess.run / _run_systemctl in the test, or " + "mark with @pytest.mark.live_system_guard_bypass." + ) + if _is_process_killer(cmd): + raise RuntimeError( + f"tests/conftest.py live-system guard: blocked " + f"subprocess.{name}({cmd!r}) — process-killer command " + "targeting hermes/python could hit the live gateway. " + "Mark with @pytest.mark.live_system_guard_bypass if " + "intentional." + ) + + def _wrap_subprocess(name, real): + def _guarded(cmd, *args, **kwargs): + _check_subprocess_cmd(name, cmd) + return real(cmd, *args, **kwargs) + _guarded.__name__ = f"_guarded_{name}" + # Make the wrapper subscriptable like the wrapped callable when + # the wrapped object is. ``subprocess.Popen[bytes]`` is used as + # a type annotation in third-party packages (mcp, etc.); replacing + # ``Popen`` with a plain function breaks ``Popen[bytes]`` at + # import time. Defer ``__class_getitem__`` to the original. + if hasattr(real, "__class_getitem__"): + _guarded.__class_getitem__ = real.__class_getitem__ + return _guarded + + def _wrap_popen(): + """Subclass Popen so isinstance checks AND Popen[bytes] still work.""" + real = _subprocess.Popen + + class _GuardedPopen(real): # type: ignore[misc, valid-type] + def __init__(self, cmd, *args, **kwargs): + _check_subprocess_cmd("Popen", cmd) + super().__init__(cmd, *args, **kwargs) + + _GuardedPopen.__name__ = "Popen" + _GuardedPopen.__qualname__ = "Popen" + return _GuardedPopen + real_run = _subprocess.run real_popen = _subprocess.Popen real_call = _subprocess.call real_check_call = _subprocess.check_call real_check_output = _subprocess.check_output - - def _is_blocked_systemctl(cmd) -> bool: - if isinstance(cmd, (list, tuple)): - tokens = [str(t) for t in cmd] - elif isinstance(cmd, (bytes, bytearray)): - tokens = bytes(cmd).decode(errors="replace").split() - elif isinstance(cmd, str): - tokens = cmd.split() - else: - return False - if not tokens: - return False - head = tokens[0].rsplit("/", 1)[-1] - if head != "systemctl": - return False - joined = " ".join(tokens).lower() - # Block any systemctl invocation that targets a hermes-gateway - # unit AND would change its run state. Status reads, list-units, - # and show calls remain allowed because they're side-effect-free. - if "hermes-gateway" not in joined and "hermes.service" not in joined: - return False - mutating = ( - "restart", "start", "stop", "kill", "reload", - "reset-failed", "enable", "disable", "mask", "unmask", - "daemon-reload", - ) - return any(verb in tokens for verb in mutating) - - def _wrap_subprocess(name, real): - def _guarded(cmd, *args, **kwargs): - if _is_blocked_systemctl(cmd): - raise RuntimeError( - f"tests/conftest.py live-system guard: blocked " - f"subprocess.{name}({cmd!r}) — would mutate the " - "live hermes-gateway systemd unit. Mock " - "subprocess.run / _run_systemctl in the test, or " - "mark with @pytest.mark.live_system_guard_bypass." - ) - return real(cmd, *args, **kwargs) - _guarded.__name__ = f"_guarded_{name}" - return _guarded + real_getoutput = _subprocess.getoutput + real_getstatusoutput = _subprocess.getstatusoutput monkeypatch.setattr(_subprocess, "run", _wrap_subprocess("run", real_run)) - monkeypatch.setattr(_subprocess, "Popen", _wrap_subprocess("Popen", real_popen)) + monkeypatch.setattr(_subprocess, "Popen", _wrap_popen()) monkeypatch.setattr(_subprocess, "call", _wrap_subprocess("call", real_call)) monkeypatch.setattr( _subprocess, "check_call", _wrap_subprocess("check_call", real_check_call) @@ -800,5 +901,65 @@ def _live_system_guard(request, monkeypatch): "check_output", _wrap_subprocess("check_output", real_check_output), ) + monkeypatch.setattr( + _subprocess, "getoutput", _wrap_subprocess("getoutput", real_getoutput) + ) + monkeypatch.setattr( + _subprocess, + "getstatusoutput", + _wrap_subprocess("getstatusoutput", real_getstatusoutput), + ) + + # os.system / os.popen — same risk class, completely unwrapped before. + real_os_system = _os.system + real_os_popen = _os.popen + + def _guarded_os_system(command): + _check_subprocess_cmd("os.system", command) + return real_os_system(command) + + def _guarded_os_popen(cmd, *args, **kwargs): + _check_subprocess_cmd("os.popen", cmd) + return real_os_popen(cmd, *args, **kwargs) + + monkeypatch.setattr(_os, "system", _guarded_os_system) + monkeypatch.setattr(_os, "popen", _guarded_os_popen) + + # pty.spawn — POSIX-only. + try: + import pty as _pty + if hasattr(_pty, "spawn"): + real_pty_spawn = _pty.spawn + + def _guarded_pty_spawn(argv, *args, **kwargs): + _check_subprocess_cmd("pty.spawn", argv) + return real_pty_spawn(argv, *args, **kwargs) + + monkeypatch.setattr(_pty, "spawn", _guarded_pty_spawn) + except Exception: + pass + + # asyncio.create_subprocess_* — bypasses subprocess module entirely. + try: + import asyncio as _asyncio + real_async_exec = _asyncio.create_subprocess_exec + real_async_shell = _asyncio.create_subprocess_shell + + async def _guarded_async_exec(program, *args, **kwargs): + _check_subprocess_cmd( + "asyncio.create_subprocess_exec", [program, *args] + ) + return await real_async_exec(program, *args, **kwargs) + + async def _guarded_async_shell(cmd, *args, **kwargs): + _check_subprocess_cmd("asyncio.create_subprocess_shell", cmd) + return await real_async_shell(cmd, *args, **kwargs) + + monkeypatch.setattr(_asyncio, "create_subprocess_exec", _guarded_async_exec) + monkeypatch.setattr( + _asyncio, "create_subprocess_shell", _guarded_async_shell + ) + except Exception: + pass yield diff --git a/tests/test_live_system_guard_self_test.py b/tests/test_live_system_guard_self_test.py new file mode 100644 index 00000000000..1856935b240 --- /dev/null +++ b/tests/test_live_system_guard_self_test.py @@ -0,0 +1,295 @@ +"""Self-test for the live-system guard fixture in tests/conftest.py. + +This file is the canary. If anyone removes a guard or weakens it, these +tests fail. If anyone adds a NEW kill primitive to the codebase without +adding it to the guard, the corresponding test added here will fail too. + +The guard exists to protect the developer's live ``hermes-gateway`` process +from being SIGTERMed by tests. See PR #23397 for the original incident +(5+ live gateway kills in 3 days). Per Teknium 2026-05-10: + + > "You better do such a deep scan and scrub of the tests that this + > never is possible ever again for all eternity." + +Every primitive that can deliver a signal to a foreign process or mutate +the live systemd unit MUST be exercised below. Adding a new primitive to +the guard? Add a test here too. +""" +from __future__ import annotations + +import os +import signal +import subprocess + +import pytest + +# A guaranteed-foreign PID: PID 1 (init). Owned by root, not us, and +# always exists. A sane guard refuses to signal it. +FOREIGN_PID = 1 + + +# ──────────────────── kill primitives ───────────────────────── + + +def test_os_kill_blocks_foreign_pid(): + with pytest.raises(RuntimeError, match="live-system guard"): + os.kill(FOREIGN_PID, signal.SIGTERM) + + +def test_os_kill_blocks_negative_one(): + """``os.kill(-1, sig)`` signals every process we can reach. Must be blocked.""" + with pytest.raises(RuntimeError, match="live-system guard"): + os.kill(-1, signal.SIGTERM) + + +@pytest.mark.skipif(not hasattr(os, "killpg"), reason="killpg POSIX-only") +def test_os_killpg_blocks_foreign_pgid(): + with pytest.raises(RuntimeError, match="live-system guard"): + os.killpg(FOREIGN_PID, signal.SIGTERM) + + +# ──────────────────── subprocess regex bypasses ──────────────── + + +def test_subprocess_run_systemctl_restart_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["systemctl", "--user", "restart", "hermes-gateway"]) + + +def test_subprocess_run_full_path_systemctl_blocked(): + """``/usr/bin/systemctl`` (full path) must be blocked too.""" + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["/usr/bin/systemctl", "--user", "stop", "hermes-gateway"]) + + +def test_subprocess_run_sudo_systemctl_blocked(): + """``sudo systemctl ...`` defeated the old head==systemctl check.""" + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["sudo", "systemctl", "restart", "hermes-gateway"]) + + +def test_subprocess_run_env_systemctl_blocked(): + """``env systemctl ...`` similarly defeated the old head check.""" + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["env", "systemctl", "--user", "restart", "hermes-gateway"]) + + +def test_subprocess_run_bash_c_systemctl_blocked(): + """``bash -c "systemctl ..."`` must also be caught.""" + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["bash", "-c", "systemctl --user restart hermes-gateway"]) + + +def test_subprocess_run_sh_c_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["sh", "-c", "systemctl --user stop hermes-gateway"]) + + +def test_subprocess_run_setsid_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["setsid", "systemctl", "kill", "hermes-gateway"]) + + +def test_subprocess_run_string_shell_true_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run( + "systemctl --user restart hermes-gateway", + shell=True, + ) + + +def test_subprocess_popen_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.Popen(["systemctl", "--user", "stop", "hermes-gateway"]) + + +def test_subprocess_call_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.call(["systemctl", "--user", "restart", "hermes-gateway"]) + + +def test_subprocess_check_call_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.check_call(["systemctl", "--user", "restart", "hermes-gateway"]) + + +def test_subprocess_check_output_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.check_output(["systemctl", "--user", "restart", "hermes-gateway"]) + + +def test_subprocess_getoutput_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.getoutput("systemctl --user restart hermes-gateway") + + +def test_subprocess_getstatusoutput_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.getstatusoutput("systemctl --user restart hermes-gateway") + + +# ──────────────────── os.system / os.popen ──────────────────── + + +def test_os_system_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + os.system("systemctl --user restart hermes-gateway") + + +def test_os_popen_systemctl_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + os.popen("systemctl --user restart hermes-gateway") + + +# ──────────────────── pty.spawn ──────────────────────────────── + + +def test_pty_spawn_systemctl_blocked(): + import pty + with pytest.raises(RuntimeError, match="live-system guard"): + pty.spawn(["systemctl", "--user", "restart", "hermes-gateway"]) + + +# ──────────────────── asyncio.create_subprocess_* ────────────── + + +def test_asyncio_create_subprocess_exec_systemctl_blocked(): + import asyncio + + async def _attempt(): + await asyncio.create_subprocess_exec( + "systemctl", "--user", "restart", "hermes-gateway" + ) + + with pytest.raises(RuntimeError, match="live-system guard"): + asyncio.run(_attempt()) + + +def test_asyncio_create_subprocess_shell_systemctl_blocked(): + import asyncio + + async def _attempt(): + await asyncio.create_subprocess_shell( + "systemctl --user restart hermes-gateway" + ) + + with pytest.raises(RuntimeError, match="live-system guard"): + asyncio.run(_attempt()) + + +# ──────────────────── pkill / killall / taskkill ─────────────── + + +def test_subprocess_pkill_hermes_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["pkill", "-f", "hermes"]) + + +def test_subprocess_pkill_hermes_gateway_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["pkill", "-f", "hermes-gateway"]) + + +def test_subprocess_pkill_python_dash_f_blocked(): + """``pkill -f python`` matches the gateway's "python -m hermes_cli.main".""" + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["pkill", "-f", "python"]) + + +def test_subprocess_killall_hermes_blocked(): + with pytest.raises(RuntimeError, match="live-system guard"): + subprocess.run(["killall", "hermes"]) + + +# ──────────────────── pass-through cases (must NOT raise) ────── + + +def test_systemctl_status_passes_through(): + """Read-only systemctl probes (status/show/list-units) are fine.""" + # Run with check=False so we don't fail on the gateway's exit code. + r = subprocess.run( + ["systemctl", "--user", "status", "hermes-gateway", "--no-pager"], + capture_output=True, + text=True, + check=False, + ) + assert r is not None # Did not raise — the guard let it through. + + +def test_systemctl_show_passes_through(): + r = subprocess.run( + ["systemctl", "--user", "show", "hermes-gateway", "--no-pager"], + capture_output=True, + text=True, + check=False, + ) + assert r is not None + + +def test_systemctl_list_units_passes_through(): + r = subprocess.run( + ["systemctl", "--user", "list-units", "fake-not-real-unit*", "--no-pager"], + capture_output=True, + text=True, + check=False, + ) + assert r is not None + + +def test_systemctl_unrelated_unit_passes_through(): + """systemctl restart of a non-hermes unit is allowed (we only protect hermes).""" + # Use --dry-run so we don't actually try to restart anything; just + # verify the guard doesn't block the call. systemctl supports + # --dry-run via the privileged API; on user scope it usually fails + # quickly without side effects. + r = subprocess.run( + ["systemctl", "--user", "show", "fake-not-real-unit"], + capture_output=True, + text=True, + check=False, + ) + assert r is not None + + +def test_kill_own_subtree_passes_through(): + """We CAN kill our own children — guard recognizes them via psutil.""" + p = subprocess.Popen(["sleep", "30"]) + try: + os.kill(p.pid, signal.SIGTERM) + finally: + p.wait(timeout=2) + # SIGTERM = 15; subprocess returncode is -15 on POSIX. + assert p.returncode in (-signal.SIGTERM, 128 + int(signal.SIGTERM)) + + +def test_subprocess_pkill_with_unrelated_pattern_passes_through(): + """``pkill -f some-unrelated-pattern`` (no hermes/python) is fine.""" + # We don't actually run pkill — just verify the guard would let it + # through by inspecting the matcher. Re-implementing the check here + # would duplicate the guard; instead spawn a noop to confirm no raise. + # Use 'true' so it succeeds quickly. + r = subprocess.run(["true"], capture_output=True) + assert r.returncode == 0 + + +def test_normal_subprocess_run_passes_through(): + """Plain non-systemctl subprocess.run should work normally.""" + r = subprocess.run(["echo", "hello"], capture_output=True, text=True) + assert r.stdout.strip() == "hello" + + +# ──────────────────── bypass marker ───────────────────────────── + + +@pytest.mark.live_system_guard_bypass +def test_bypass_marker_disables_guard(): + """The bypass marker exists for tests that genuinely need real signal delivery + (e.g. PTY tests SIGINTing their own child). Verify it works. + + We use it harmlessly here by signaling our own PID 0 (own group) so we + don't actually kill anything — but the call goes through real os.kill. + """ + # With bypass, the guard yields without installing the monkeypatch, + # so we get the real os.kill. Calling os.kill(os.getpid(), 0) just + # checks that the PID exists — harmless. + os.kill(os.getpid(), 0) # No exception — guard is OFF.