diff --git a/tests/tools/test_local_env_windows_msys.py b/tests/tools/test_local_env_windows_msys.py new file mode 100644 index 00000000000..6987c965af6 --- /dev/null +++ b/tests/tools/test_local_env_windows_msys.py @@ -0,0 +1,200 @@ +"""Tests for the Windows / Git Bash MSYS-path normalization in +``LocalEnvironment``. + +Background +---------- +On Windows, ``pwd -P`` inside Git Bash emits paths like +``/c/Users/NVIDIA``. ``subprocess.Popen(..., cwd=...)`` only accepts +native Windows paths (``C:\\Users\\NVIDIA``), and the validation done +by ``_resolve_safe_cwd`` was also checking the MSYS form against +``os.path.isdir``, which returns ``False`` on Windows. The combined +effect was a warning logged on every single terminal call: + + LocalEnvironment cwd '/c/Users/NVIDIA' is missing on disk; + falling back to '/' so terminal commands keep working. + +These tests fake the Windows env on Linux CI by patching ``_IS_WINDOWS`` +and ``os.path.isdir`` so the MSYS path tests as "missing" exactly like +on the real OS. +""" + +import os +from unittest.mock import patch + +import pytest + +from tools.environments import local as local_mod +from tools.environments.local import ( + LocalEnvironment, + _msys_to_windows_path, + _resolve_safe_cwd, +) + + +# --------------------------------------------------------------------------- +# _msys_to_windows_path — pure-function unit tests +# --------------------------------------------------------------------------- + +class TestMsysToWindowsPath: + def test_noop_on_non_windows(self, monkeypatch): + monkeypatch.setattr(local_mod, "_IS_WINDOWS", False) + # On a non-Windows host the function must never rewrite the path + # — POSIX-style paths are real paths there. + assert _msys_to_windows_path("/c/Users/NVIDIA") == "/c/Users/NVIDIA" + assert _msys_to_windows_path("/home/teknium") == "/home/teknium" + + def test_translates_drive_path(self, monkeypatch): + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + assert _msys_to_windows_path("/c/Users/NVIDIA") == r"C:\Users\NVIDIA" + assert _msys_to_windows_path("/d/Projects/foo bar") == r"D:\Projects\foo bar" + + def test_translates_bare_drive_root(self, monkeypatch): + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + # Bare "/c" alone should resolve to the drive root. + assert _msys_to_windows_path("/c") == "C:\\" + # Trailing slash on the drive letter is also a root. + assert _msys_to_windows_path("/c/") == "C:\\" + + def test_idempotent_on_already_windows_path(self, monkeypatch): + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + assert _msys_to_windows_path(r"C:\Users\NVIDIA") == r"C:\Users\NVIDIA" + + def test_does_not_translate_multi_char_first_segment(self, monkeypatch): + """``/tmp/foo`` and ``/home/x`` must NOT be misread as drive paths + just because they start with ``/`` and a single letter — the regex + only matches when the first segment is exactly one character.""" + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + assert _msys_to_windows_path("/tmp/foo") == "/tmp/foo" + assert _msys_to_windows_path("/home/x") == "/home/x" + + def test_empty_string(self, monkeypatch): + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + assert _msys_to_windows_path("") == "" + + +# --------------------------------------------------------------------------- +# _resolve_safe_cwd — Windows fast path +# --------------------------------------------------------------------------- + +class TestResolveSafeCwdWindows: + def test_msys_path_resolves_to_native_when_native_exists( + self, monkeypatch, tmp_path, + ): + """The whole point of this fix: a Git Bash ``/c/Users/x`` value + should resolve to its native equivalent if that native dir exists, + WITHOUT falling back to the temp dir.""" + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + + # tmp_path is a real native dir on the test host. Build a fake + # MSYS form pointing at it and prove the resolver finds it. + native = str(tmp_path) + # Construct a synthetic MSYS form for whatever tmp_path is. + # On Linux CI tmp_path is /tmp/... ; the resolver shouldn't even + # try to translate that (regex won't match), so emulate the + # mapping by pointing the translator at the real native dir. + with patch.object( + local_mod, "_msys_to_windows_path", return_value=native + ): + assert _resolve_safe_cwd("/c/whatever") == native + + +# --------------------------------------------------------------------------- +# End-to-end: _update_cwd via marker file (Windows simulation) +# --------------------------------------------------------------------------- + +class TestUpdateCwdWindowsMsys: + def test_marker_file_msys_path_stored_in_native_form( + self, monkeypatch, tmp_path, + ): + """When Git Bash writes ``/c/Users/x`` to the cwd marker file on + Windows, ``_update_cwd`` must translate to native form before + validating and storing — otherwise ``os.path.isdir`` rejects a + perfectly real directory.""" + original = tmp_path / "starting" + original.mkdir() + + # Fake Windows for the test + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + + with patch.object( + LocalEnvironment, "init_session", autospec=True, return_value=None + ): + env = LocalEnvironment(cwd=str(original), timeout=10) + + # Pretend Git Bash wrote an MSYS path that maps to tmp_path/"next" + new_dir = tmp_path / "next" + new_dir.mkdir() + + with open(env._cwd_file, "w") as f: + f.write("/c/whatever/from/bash") + + # Translate the synthetic MSYS string to the real native dir. + def fake_translate(p): + if p == "/c/whatever/from/bash": + return str(new_dir) + return p + + with patch.object(local_mod, "_msys_to_windows_path", side_effect=fake_translate): + env._update_cwd({"output": "", "returncode": 0}) + + assert env.cwd == str(new_dir) + + +# --------------------------------------------------------------------------- +# End-to-end: _extract_cwd_from_output rollback when marker is invalid +# --------------------------------------------------------------------------- + +class TestExtractCwdFromOutputWindowsMsys: + def test_stale_msys_marker_does_not_clobber_cwd(self, monkeypatch, tmp_path): + """When the cwd marker in stdout points at a non-existent path, + ``LocalEnvironment._extract_cwd_from_output`` must roll back to + the previous cwd instead of propagating a bad value.""" + original = tmp_path / "starting" + original.mkdir() + + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + + with patch.object( + LocalEnvironment, "init_session", autospec=True, return_value=None + ): + env = LocalEnvironment(cwd=str(original), timeout=10) + + marker = env._cwd_marker + result = { + "output": f"some command output\n{marker}/c/no/such/path{marker}\n", + "returncode": 0, + } + + # Translation produces a path that doesn't exist on disk → rollback. + with patch.object( + local_mod, + "_msys_to_windows_path", + return_value=str(tmp_path / "definitely-does-not-exist"), + ): + env._extract_cwd_from_output(result) + + assert env.cwd == str(original) + + def test_valid_msys_marker_normalized_to_native(self, monkeypatch, tmp_path): + original = tmp_path / "starting" + original.mkdir() + new_dir = tmp_path / "next" + new_dir.mkdir() + + monkeypatch.setattr(local_mod, "_IS_WINDOWS", True) + + with patch.object( + LocalEnvironment, "init_session", autospec=True, return_value=None + ): + env = LocalEnvironment(cwd=str(original), timeout=10) + + marker = env._cwd_marker + result = { + "output": f"x\n{marker}/c/whatever{marker}\n", + "returncode": 0, + } + + with patch.object(local_mod, "_msys_to_windows_path", return_value=str(new_dir)): + env._extract_cwd_from_output(result) + + assert env.cwd == str(new_dir) diff --git a/tests/tools/test_tirith_security.py b/tests/tools/test_tirith_security.py index 20d20ccfa11..ecaf4f4e639 100644 --- a/tests/tools/test_tirith_security.py +++ b/tests/tools/test_tirith_security.py @@ -1007,3 +1007,120 @@ class TestHermesHomeIsolation: expected = os.path.join(os.path.expanduser("~"), ".hermes") result = _get_hermes_home() assert result == expected + + +# --------------------------------------------------------------------------- +# Warn-once dedupe (issue: tirith spawn failed spamming on Windows) +# --------------------------------------------------------------------------- + +class TestSpawnWarningDedup: + """When tirith isn't installed yet (background install in flight, or + install marked failed), every terminal command spammed an identical + ``tirith spawn failed: [WinError 2]`` warning to ``errors.log``. The + dedupe set in ``_warn_once`` collapses repeats by ``(exc class, errno)`` + while still surfacing the first occurrence so users see the failure. + """ + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_repeated_spawn_failure_logs_once(self, mock_cfg, mock_run, caplog): + mock_cfg.return_value = { + "tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True, + } + mock_run.side_effect = FileNotFoundError("[WinError 2]") + # Fresh dedupe state — clear any keys left by other tests. + _tirith_mod._reset_spawn_warning_state() + + with caplog.at_level("WARNING", logger="tools.tirith_security"): + for _ in range(15): + result = check_command_security("echo hi") + # Behavior must remain the same on every call — + # fail-open allow, with the exception captured in summary. + assert result["action"] == "allow" + assert "unavailable" in result["summary"] + + spawn_warnings = [ + rec for rec in caplog.records + if "tirith spawn failed" in rec.message + ] + assert len(spawn_warnings) == 1, ( + f"expected exactly 1 spawn-failed warning across 15 commands, " + f"got {len(spawn_warnings)}: {[r.message for r in spawn_warnings]}" + ) + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_distinct_exception_types_each_log_once(self, mock_cfg, mock_run, caplog): + """``FileNotFoundError`` and ``PermissionError`` are distinct + failure modes and each deserves its own first-occurrence log + line; the dedupe key includes the exception class.""" + mock_cfg.return_value = { + "tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True, + } + _tirith_mod._reset_spawn_warning_state() + + with caplog.at_level("WARNING", logger="tools.tirith_security"): + mock_run.side_effect = FileNotFoundError("[WinError 2]") + for _ in range(3): + check_command_security("a") + mock_run.side_effect = PermissionError("denied") + for _ in range(3): + check_command_security("b") + + spawn_warnings = [ + rec for rec in caplog.records + if "tirith spawn failed" in rec.message + ] + assert len(spawn_warnings) == 2, ( + f"expected 2 distinct first-occurrence warnings, " + f"got {len(spawn_warnings)}" + ) + + @patch("tools.tirith_security.subprocess.run") + @patch("tools.tirith_security._load_security_config") + def test_repeated_timeout_logs_once(self, mock_cfg, mock_run, caplog): + mock_cfg.return_value = { + "tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True, + } + mock_run.side_effect = subprocess.TimeoutExpired(cmd="tirith", timeout=5) + _tirith_mod._reset_spawn_warning_state() + + with caplog.at_level("WARNING", logger="tools.tirith_security"): + for _ in range(10): + result = check_command_security("slow") + assert result["action"] == "allow" + + timeout_warnings = [ + rec for rec in caplog.records + if "tirith timed out" in rec.message + ] + assert len(timeout_warnings) == 1 + + @patch("tools.tirith_security._load_security_config") + def test_path_none_logs_once(self, mock_cfg, caplog): + """``_resolve_tirith_path`` returning ``None`` (explicit path set + but resolver returned None — unusual) should not spam the log + either.""" + mock_cfg.return_value = { + "tirith_enabled": True, "tirith_path": "tirith", + "tirith_timeout": 5, "tirith_fail_open": True, + } + _tirith_mod._reset_spawn_warning_state() + + with patch( + "tools.tirith_security._resolve_tirith_path", return_value=None + ): + with caplog.at_level("WARNING", logger="tools.tirith_security"): + for _ in range(10): + result = check_command_security("echo") + assert result["action"] == "allow" + assert "tirith path unavailable" in result["summary"] + + none_warnings = [ + rec for rec in caplog.records + if "tirith path resolved to None" in rec.message + ] + assert len(none_warnings) == 1 diff --git a/tools/environments/local.py b/tools/environments/local.py index 7aa75a62d0c..3b9d65449fa 100644 --- a/tools/environments/local.py +++ b/tools/environments/local.py @@ -18,18 +18,44 @@ _IS_WINDOWS = platform.system() == "Windows" logger = logging.getLogger(__name__) +def _msys_to_windows_path(cwd: str) -> str: + """Translate a Git Bash / MSYS-style POSIX path (``/c/Users/x``) to the + native Windows form (``C:\\Users\\x``) so ``os.path.isdir`` and + ``subprocess.Popen(..., cwd=...)`` can find it. + + No-ops on non-Windows hosts or for paths that aren't in MSYS form. + Returns the input unchanged when no translation applies. This is + idempotent — calling it on an already-Windows path returns it as-is. + """ + if not _IS_WINDOWS or not cwd: + return cwd + # Match leading "//" or exactly "/" (bare drive root). + m = re.match(r'^/([a-zA-Z])(/.*)?$', cwd) + if not m: + return cwd + drive = m.group(1).upper() + tail = (m.group(2) or "").replace('/', '\\') + return f"{drive}:{tail or chr(92)}" # chr(92) = backslash, avoid raw-string escape + + def _resolve_safe_cwd(cwd: str) -> str: """Return ``cwd`` if it exists as a directory, else the nearest existing ancestor. Falls back to ``tempfile.gettempdir()`` only if walking up the path can't find any existing directory (effectively never on a healthy filesystem, but cheap belt-and-braces). + On Windows, also normalizes Git Bash / MSYS-style POSIX paths + (``/c/Users/x``) to native Windows form before the isdir check so a + perfectly valid ``pwd -P`` result from bash doesn't get rejected as + "missing" (see ``_msys_to_windows_path``). + Used by ``_run_bash`` to recover when the configured cwd is gone — most commonly because a previous tool call deleted its own working directory (issue #17558). Without this guard, ``subprocess.Popen(..., cwd=...)`` raises ``FileNotFoundError`` before bash starts, wedging every subsequent terminal call until the gateway restarts. """ + cwd = _msys_to_windows_path(cwd) if _IS_WINDOWS else cwd if cwd and os.path.isdir(cwd): return cwd parent = os.path.dirname(cwd) if cwd else "" @@ -455,21 +481,27 @@ class LocalEnvironment(BaseEnvironment): # (issue #17558). Popen would otherwise raise FileNotFoundError on # the cwd before bash starts, wedging every subsequent call until the # gateway restarts. + # + # On Windows, ``_resolve_safe_cwd`` also normalises Git Bash-style + # POSIX paths (``/c/Users/...``) to native form so a perfectly valid + # ``pwd -P`` result from bash isn't mistakenly treated as "missing" + # and spammed as a warning on every command. safe_cwd = _resolve_safe_cwd(self.cwd) if safe_cwd != self.cwd: - logger.warning( - "LocalEnvironment cwd %r is missing on disk; " - "falling back to %r so terminal commands keep working.", - self.cwd, - safe_cwd, - ) + # MSYS → Windows translation alone shouldn't surface as a warning + # (it's a benign normalization, not a recovery). Only warn when + # the directory really doesn't exist on disk. + normalized = _msys_to_windows_path(self.cwd) if _IS_WINDOWS else self.cwd + if safe_cwd != normalized: + logger.warning( + "LocalEnvironment cwd %r is missing on disk; " + "falling back to %r so terminal commands keep working.", + self.cwd, + safe_cwd, + ) self.cwd = safe_cwd - # On Windows, self.cwd may be a Git Bash-style path (/c/Users/...) - # from pwd output. subprocess.Popen needs a native Windows path. _popen_cwd = self.cwd - if _IS_WINDOWS and _popen_cwd and re.match(r'^/[a-zA-Z]/', _popen_cwd): - _popen_cwd = _popen_cwd[1].upper() + ':' + _popen_cwd[2:].replace('/', '\\') proc = subprocess.Popen( args, @@ -571,10 +603,19 @@ class LocalEnvironment(BaseEnvironment): ``pwd -P`` on a deleted cwd can leave a stale value in the marker file, and propagating it would re-wedge the next ``Popen``. The ``_run_bash`` recovery path will resolve a safe fallback if needed. + + On Windows, the value written by Git Bash's ``pwd -P`` is in + MSYS form (``/c/Users/x``). Translate it to native Windows form + before validating with ``os.path.isdir`` and before storing on + ``self.cwd``; otherwise the isdir check rejects every valid + result and ``_run_bash`` later prints a misleading "cwd is + missing" warning on every command. """ try: with open(self._cwd_file, encoding="utf-8") as f: cwd_path = f.read().strip() + if _IS_WINDOWS: + cwd_path = _msys_to_windows_path(cwd_path) if cwd_path and os.path.isdir(cwd_path): self.cwd = cwd_path except (OSError, FileNotFoundError): @@ -583,6 +624,30 @@ class LocalEnvironment(BaseEnvironment): # Still strip the marker from output so it's not visible self._extract_cwd_from_output(result) + def _extract_cwd_from_output(self, result: dict): + """Same semantics as the base class, but on Windows the value + emitted by ``pwd -P`` inside Git Bash is in MSYS form + (``/c/Users/x``). Normalize to native Windows form and validate + the directory exists before assigning to ``self.cwd`` — otherwise + ``_run_bash``'s safe-cwd recovery would warn on every subsequent + command. + + Always defers to the base class for stripping the marker text from + ``result["output"]`` so output formatting is identical. + """ + # Snapshot pre-existing cwd, defer to base for parsing + marker + # stripping, then validate / normalize whatever it assigned. + prev_cwd = self.cwd + super()._extract_cwd_from_output(result) + if self.cwd != prev_cwd: + normalized = _msys_to_windows_path(self.cwd) if _IS_WINDOWS else self.cwd + if normalized and os.path.isdir(normalized): + self.cwd = normalized + else: + # Stale / non-existent path — keep previous cwd; _run_bash + # will resolve a safe fallback on the next call if needed. + self.cwd = prev_cwd + def cleanup(self): """Clean up temp files.""" for f in (self._snapshot_path, self._cwd_file): diff --git a/tools/tirith_security.py b/tools/tirith_security.py index 350265d33a1..1c79892f424 100644 --- a/tools/tirith_security.py +++ b/tools/tirith_security.py @@ -101,6 +101,34 @@ _install_failure_reason: str = "" # reason tag when _resolved_path is _INSTALL_ _install_lock = threading.Lock() _install_thread: threading.Thread | None = None +# Warning de-duplication. The spawn/path warnings live in the hot path — +# without this dedupe set, a Windows install where ``tirith`` isn't on PATH +# (e.g. background install thread still running, or install marked failed) +# spams ``tirith spawn failed: [WinError 2]...`` once per terminal command, +# easily filling errors.log with hundreds of identical lines. +_warned_messages: set[str] = set() +_warned_lock = threading.Lock() + + +def _warn_once(key: str, message: str, *args) -> None: + """``logger.warning`` but at-most-once per ``key`` for the process + lifetime. Used to avoid drowning the log when a fail-open tirith + misconfiguration fires on every command.""" + with _warned_lock: + if key in _warned_messages: + return + _warned_messages.add(key) + logger.warning(message, *args) + + +def _reset_spawn_warning_state() -> None: + """Clear the warn-once dedupe set. Called when tirith is freshly + (re)installed so a subsequent failure surfaces again — e.g. user + deletes the binary mid-session. + """ + with _warned_lock: + _warned_messages.clear() + # Disk-persistent failure marker — avoids retry across process restarts _MARKER_TTL = 86400 # 24 hours @@ -168,6 +196,10 @@ def _mark_install_failed(reason: str = ""): def _clear_install_failed(): """Remove the failure marker after successful install.""" + # Reset the warn-once dedupe set so a subsequent failure (e.g. user + # deletes the binary) surfaces in the log again instead of being + # silently suppressed by a stale dedupe key from before the fix. + _reset_spawn_warning_state() try: os.unlink(_failure_marker_path()) except OSError: @@ -632,7 +664,10 @@ def check_command_security(command: str) -> dict: fail_open = cfg["tirith_fail_open"] if tirith_path is None: - logger.warning("tirith path resolved to None; scanning disabled") + _warn_once( + "tirith_path_none", + "tirith path resolved to None; scanning disabled", + ) if fail_open: return {"action": "allow", "findings": [], "summary": "tirith path unavailable"} return {"action": "block", "findings": [], "summary": "tirith path unavailable (fail-closed)"} @@ -646,13 +681,23 @@ def check_command_security(command: str) -> dict: timeout=timeout, ) except OSError as exc: - # Covers FileNotFoundError, PermissionError, exec format error - logger.warning("tirith spawn failed: %s", exc) + # Covers FileNotFoundError, PermissionError, exec format error. + # Dedupe by ``(errno, exc class)`` so a transient failure mode + # surfaces once but doesn't drown the log on every command — + # commonly seen on Windows when the configured path "tirith" + # isn't on PATH yet (background install still running, or + # install marked failed for the day). + spawn_key = f"tirith_spawn_failed:{type(exc).__name__}:{getattr(exc, 'errno', '')}" + _warn_once(spawn_key, "tirith spawn failed: %s", exc) if fail_open: return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"} return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"} except subprocess.TimeoutExpired: - logger.warning("tirith timed out after %ds", timeout) + _warn_once( + f"tirith_timeout:{timeout}", + "tirith timed out after %ds", + timeout, + ) if fail_open: return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"} return {"action": "block", "findings": [], "summary": "tirith timed out (fail-closed)"}