mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(windows): stop spamming cwd-missing + tirith-spawn warnings on every terminal call
Two log-spam fixes surfaced by a Windows user (Git Bash + Python 3.11.9):
1. LocalEnvironment cwd warn spam
============================
Git Bash's `pwd -P` emits paths like `/c/Users/x`. The base-class
`_extract_cwd_from_output` was assigning this verbatim to `self.cwd`
without validation, then `_resolve_safe_cwd`'s `os.path.isdir(/c/...)`
returned False on Windows, triggering:
LocalEnvironment cwd '/c/Users/NVIDIA' is missing on disk;
falling back to '/' so terminal commands keep working.
...on every terminal call. The pre-existing Windows-path translation
inside `_run_bash` ran AFTER the safe-cwd check, so it could never
prevent the warning.
Fix:
- New `_msys_to_windows_path` helper (idempotent, no-op off Windows).
- `_resolve_safe_cwd` normalizes before `isdir`, so a valid MSYS path
is recognized as the real directory it points at.
- `LocalEnvironment._update_cwd` and a new override of
`_extract_cwd_from_output` translate + validate before mutating
`self.cwd`. Stale / non-existent marker paths roll back to the
previous cwd instead of clobbering it.
- The fallback warning still fires when the directory really is gone
(deletion-recovery scenario from #17558 still covered).
2. tirith spawn-failed warn spam
=============================
When tirith isn't installed (background install in flight, or marked
failed for the day) and the configured path stays as the bare string
`tirith`, every `subprocess.run([tirith_path, ...])` raises OSError
and logged:
tirith spawn failed: [WinError 2] The system cannot find the file specified
...on every command. fail_open=True means behaviour is correct, but
the log noise is severe.
Fix:
- `_warn_once(key, ...)` thread-safe dedupe helper.
- Three hot-path warnings (`tirith path resolved to None`,
`tirith spawn failed: ...`, `tirith timed out after Ns`) now log
once per (exception class, errno) / timeout-value / path-none key.
- Dedupe set is cleared on `_clear_install_failed` so a successful
install lets a subsequent failure surface again.
Tests
=====
- `tests/tools/test_local_env_windows_msys.py`: 12 tests covering the
MSYS→Windows translator, the resolve fast-path, update_cwd validation,
and extract_cwd_from_output rollback.
- `tests/tools/test_tirith_security.py`: 4 new dedupe tests (15 spawn
failures → 1 log line; distinct exc types → 2 lines; timeout dedupe;
path-None dedupe).
Targeted runs:
test_local_env_windows_msys.py 12 passed
test_local_env_cwd_recovery.py 7 passed (pre-existing, no regressions)
test_tirith_security.py 67 passed (63 pre-existing + 4 new)
test_base_environment + local_* 37 passed (no regressions)
test_local_env_blocklist + neighbours 114 passed
Reported via Hermes log capture: 19× cwd warnings + 15× tirith warnings
in a single short session.
This commit is contained in:
parent
7fee1f61eb
commit
4aec25bc44
4 changed files with 441 additions and 14 deletions
200
tests/tools/test_local_env_windows_msys.py
Normal file
200
tests/tools/test_local_env_windows_msys.py
Normal file
|
|
@ -0,0 +1,200 @@
|
|||
"""Tests for the Windows / Git Bash MSYS-path normalization in
|
||||
``LocalEnvironment``.
|
||||
|
||||
Background
|
||||
----------
|
||||
On Windows, ``pwd -P`` inside Git Bash emits paths like
|
||||
``/c/Users/NVIDIA``. ``subprocess.Popen(..., cwd=...)`` only accepts
|
||||
native Windows paths (``C:\\Users\\NVIDIA``), and the validation done
|
||||
by ``_resolve_safe_cwd`` was also checking the MSYS form against
|
||||
``os.path.isdir``, which returns ``False`` on Windows. The combined
|
||||
effect was a warning logged on every single terminal call:
|
||||
|
||||
LocalEnvironment cwd '/c/Users/NVIDIA' is missing on disk;
|
||||
falling back to '/' so terminal commands keep working.
|
||||
|
||||
These tests fake the Windows env on Linux CI by patching ``_IS_WINDOWS``
|
||||
and ``os.path.isdir`` so the MSYS path tests as "missing" exactly like
|
||||
on the real OS.
|
||||
"""
|
||||
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.environments import local as local_mod
|
||||
from tools.environments.local import (
|
||||
LocalEnvironment,
|
||||
_msys_to_windows_path,
|
||||
_resolve_safe_cwd,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _msys_to_windows_path — pure-function unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMsysToWindowsPath:
|
||||
def test_noop_on_non_windows(self, monkeypatch):
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", False)
|
||||
# On a non-Windows host the function must never rewrite the path
|
||||
# — POSIX-style paths are real paths there.
|
||||
assert _msys_to_windows_path("/c/Users/NVIDIA") == "/c/Users/NVIDIA"
|
||||
assert _msys_to_windows_path("/home/teknium") == "/home/teknium"
|
||||
|
||||
def test_translates_drive_path(self, monkeypatch):
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
assert _msys_to_windows_path("/c/Users/NVIDIA") == r"C:\Users\NVIDIA"
|
||||
assert _msys_to_windows_path("/d/Projects/foo bar") == r"D:\Projects\foo bar"
|
||||
|
||||
def test_translates_bare_drive_root(self, monkeypatch):
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
# Bare "/c" alone should resolve to the drive root.
|
||||
assert _msys_to_windows_path("/c") == "C:\\"
|
||||
# Trailing slash on the drive letter is also a root.
|
||||
assert _msys_to_windows_path("/c/") == "C:\\"
|
||||
|
||||
def test_idempotent_on_already_windows_path(self, monkeypatch):
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
assert _msys_to_windows_path(r"C:\Users\NVIDIA") == r"C:\Users\NVIDIA"
|
||||
|
||||
def test_does_not_translate_multi_char_first_segment(self, monkeypatch):
|
||||
"""``/tmp/foo`` and ``/home/x`` must NOT be misread as drive paths
|
||||
just because they start with ``/`` and a single letter — the regex
|
||||
only matches when the first segment is exactly one character."""
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
assert _msys_to_windows_path("/tmp/foo") == "/tmp/foo"
|
||||
assert _msys_to_windows_path("/home/x") == "/home/x"
|
||||
|
||||
def test_empty_string(self, monkeypatch):
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
assert _msys_to_windows_path("") == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _resolve_safe_cwd — Windows fast path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestResolveSafeCwdWindows:
|
||||
def test_msys_path_resolves_to_native_when_native_exists(
|
||||
self, monkeypatch, tmp_path,
|
||||
):
|
||||
"""The whole point of this fix: a Git Bash ``/c/Users/x`` value
|
||||
should resolve to its native equivalent if that native dir exists,
|
||||
WITHOUT falling back to the temp dir."""
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
|
||||
# tmp_path is a real native dir on the test host. Build a fake
|
||||
# MSYS form pointing at it and prove the resolver finds it.
|
||||
native = str(tmp_path)
|
||||
# Construct a synthetic MSYS form for whatever tmp_path is.
|
||||
# On Linux CI tmp_path is /tmp/... ; the resolver shouldn't even
|
||||
# try to translate that (regex won't match), so emulate the
|
||||
# mapping by pointing the translator at the real native dir.
|
||||
with patch.object(
|
||||
local_mod, "_msys_to_windows_path", return_value=native
|
||||
):
|
||||
assert _resolve_safe_cwd("/c/whatever") == native
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# End-to-end: _update_cwd via marker file (Windows simulation)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestUpdateCwdWindowsMsys:
|
||||
def test_marker_file_msys_path_stored_in_native_form(
|
||||
self, monkeypatch, tmp_path,
|
||||
):
|
||||
"""When Git Bash writes ``/c/Users/x`` to the cwd marker file on
|
||||
Windows, ``_update_cwd`` must translate to native form before
|
||||
validating and storing — otherwise ``os.path.isdir`` rejects a
|
||||
perfectly real directory."""
|
||||
original = tmp_path / "starting"
|
||||
original.mkdir()
|
||||
|
||||
# Fake Windows for the test
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
|
||||
with patch.object(
|
||||
LocalEnvironment, "init_session", autospec=True, return_value=None
|
||||
):
|
||||
env = LocalEnvironment(cwd=str(original), timeout=10)
|
||||
|
||||
# Pretend Git Bash wrote an MSYS path that maps to tmp_path/"next"
|
||||
new_dir = tmp_path / "next"
|
||||
new_dir.mkdir()
|
||||
|
||||
with open(env._cwd_file, "w") as f:
|
||||
f.write("/c/whatever/from/bash")
|
||||
|
||||
# Translate the synthetic MSYS string to the real native dir.
|
||||
def fake_translate(p):
|
||||
if p == "/c/whatever/from/bash":
|
||||
return str(new_dir)
|
||||
return p
|
||||
|
||||
with patch.object(local_mod, "_msys_to_windows_path", side_effect=fake_translate):
|
||||
env._update_cwd({"output": "", "returncode": 0})
|
||||
|
||||
assert env.cwd == str(new_dir)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# End-to-end: _extract_cwd_from_output rollback when marker is invalid
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestExtractCwdFromOutputWindowsMsys:
|
||||
def test_stale_msys_marker_does_not_clobber_cwd(self, monkeypatch, tmp_path):
|
||||
"""When the cwd marker in stdout points at a non-existent path,
|
||||
``LocalEnvironment._extract_cwd_from_output`` must roll back to
|
||||
the previous cwd instead of propagating a bad value."""
|
||||
original = tmp_path / "starting"
|
||||
original.mkdir()
|
||||
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
|
||||
with patch.object(
|
||||
LocalEnvironment, "init_session", autospec=True, return_value=None
|
||||
):
|
||||
env = LocalEnvironment(cwd=str(original), timeout=10)
|
||||
|
||||
marker = env._cwd_marker
|
||||
result = {
|
||||
"output": f"some command output\n{marker}/c/no/such/path{marker}\n",
|
||||
"returncode": 0,
|
||||
}
|
||||
|
||||
# Translation produces a path that doesn't exist on disk → rollback.
|
||||
with patch.object(
|
||||
local_mod,
|
||||
"_msys_to_windows_path",
|
||||
return_value=str(tmp_path / "definitely-does-not-exist"),
|
||||
):
|
||||
env._extract_cwd_from_output(result)
|
||||
|
||||
assert env.cwd == str(original)
|
||||
|
||||
def test_valid_msys_marker_normalized_to_native(self, monkeypatch, tmp_path):
|
||||
original = tmp_path / "starting"
|
||||
original.mkdir()
|
||||
new_dir = tmp_path / "next"
|
||||
new_dir.mkdir()
|
||||
|
||||
monkeypatch.setattr(local_mod, "_IS_WINDOWS", True)
|
||||
|
||||
with patch.object(
|
||||
LocalEnvironment, "init_session", autospec=True, return_value=None
|
||||
):
|
||||
env = LocalEnvironment(cwd=str(original), timeout=10)
|
||||
|
||||
marker = env._cwd_marker
|
||||
result = {
|
||||
"output": f"x\n{marker}/c/whatever{marker}\n",
|
||||
"returncode": 0,
|
||||
}
|
||||
|
||||
with patch.object(local_mod, "_msys_to_windows_path", return_value=str(new_dir)):
|
||||
env._extract_cwd_from_output(result)
|
||||
|
||||
assert env.cwd == str(new_dir)
|
||||
|
|
@ -1007,3 +1007,120 @@ class TestHermesHomeIsolation:
|
|||
expected = os.path.join(os.path.expanduser("~"), ".hermes")
|
||||
result = _get_hermes_home()
|
||||
assert result == expected
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Warn-once dedupe (issue: tirith spawn failed spamming on Windows)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSpawnWarningDedup:
|
||||
"""When tirith isn't installed yet (background install in flight, or
|
||||
install marked failed), every terminal command spammed an identical
|
||||
``tirith spawn failed: [WinError 2]`` warning to ``errors.log``. The
|
||||
dedupe set in ``_warn_once`` collapses repeats by ``(exc class, errno)``
|
||||
while still surfacing the first occurrence so users see the failure.
|
||||
"""
|
||||
|
||||
@patch("tools.tirith_security.subprocess.run")
|
||||
@patch("tools.tirith_security._load_security_config")
|
||||
def test_repeated_spawn_failure_logs_once(self, mock_cfg, mock_run, caplog):
|
||||
mock_cfg.return_value = {
|
||||
"tirith_enabled": True, "tirith_path": "tirith",
|
||||
"tirith_timeout": 5, "tirith_fail_open": True,
|
||||
}
|
||||
mock_run.side_effect = FileNotFoundError("[WinError 2]")
|
||||
# Fresh dedupe state — clear any keys left by other tests.
|
||||
_tirith_mod._reset_spawn_warning_state()
|
||||
|
||||
with caplog.at_level("WARNING", logger="tools.tirith_security"):
|
||||
for _ in range(15):
|
||||
result = check_command_security("echo hi")
|
||||
# Behavior must remain the same on every call —
|
||||
# fail-open allow, with the exception captured in summary.
|
||||
assert result["action"] == "allow"
|
||||
assert "unavailable" in result["summary"]
|
||||
|
||||
spawn_warnings = [
|
||||
rec for rec in caplog.records
|
||||
if "tirith spawn failed" in rec.message
|
||||
]
|
||||
assert len(spawn_warnings) == 1, (
|
||||
f"expected exactly 1 spawn-failed warning across 15 commands, "
|
||||
f"got {len(spawn_warnings)}: {[r.message for r in spawn_warnings]}"
|
||||
)
|
||||
|
||||
@patch("tools.tirith_security.subprocess.run")
|
||||
@patch("tools.tirith_security._load_security_config")
|
||||
def test_distinct_exception_types_each_log_once(self, mock_cfg, mock_run, caplog):
|
||||
"""``FileNotFoundError`` and ``PermissionError`` are distinct
|
||||
failure modes and each deserves its own first-occurrence log
|
||||
line; the dedupe key includes the exception class."""
|
||||
mock_cfg.return_value = {
|
||||
"tirith_enabled": True, "tirith_path": "tirith",
|
||||
"tirith_timeout": 5, "tirith_fail_open": True,
|
||||
}
|
||||
_tirith_mod._reset_spawn_warning_state()
|
||||
|
||||
with caplog.at_level("WARNING", logger="tools.tirith_security"):
|
||||
mock_run.side_effect = FileNotFoundError("[WinError 2]")
|
||||
for _ in range(3):
|
||||
check_command_security("a")
|
||||
mock_run.side_effect = PermissionError("denied")
|
||||
for _ in range(3):
|
||||
check_command_security("b")
|
||||
|
||||
spawn_warnings = [
|
||||
rec for rec in caplog.records
|
||||
if "tirith spawn failed" in rec.message
|
||||
]
|
||||
assert len(spawn_warnings) == 2, (
|
||||
f"expected 2 distinct first-occurrence warnings, "
|
||||
f"got {len(spawn_warnings)}"
|
||||
)
|
||||
|
||||
@patch("tools.tirith_security.subprocess.run")
|
||||
@patch("tools.tirith_security._load_security_config")
|
||||
def test_repeated_timeout_logs_once(self, mock_cfg, mock_run, caplog):
|
||||
mock_cfg.return_value = {
|
||||
"tirith_enabled": True, "tirith_path": "tirith",
|
||||
"tirith_timeout": 5, "tirith_fail_open": True,
|
||||
}
|
||||
mock_run.side_effect = subprocess.TimeoutExpired(cmd="tirith", timeout=5)
|
||||
_tirith_mod._reset_spawn_warning_state()
|
||||
|
||||
with caplog.at_level("WARNING", logger="tools.tirith_security"):
|
||||
for _ in range(10):
|
||||
result = check_command_security("slow")
|
||||
assert result["action"] == "allow"
|
||||
|
||||
timeout_warnings = [
|
||||
rec for rec in caplog.records
|
||||
if "tirith timed out" in rec.message
|
||||
]
|
||||
assert len(timeout_warnings) == 1
|
||||
|
||||
@patch("tools.tirith_security._load_security_config")
|
||||
def test_path_none_logs_once(self, mock_cfg, caplog):
|
||||
"""``_resolve_tirith_path`` returning ``None`` (explicit path set
|
||||
but resolver returned None — unusual) should not spam the log
|
||||
either."""
|
||||
mock_cfg.return_value = {
|
||||
"tirith_enabled": True, "tirith_path": "tirith",
|
||||
"tirith_timeout": 5, "tirith_fail_open": True,
|
||||
}
|
||||
_tirith_mod._reset_spawn_warning_state()
|
||||
|
||||
with patch(
|
||||
"tools.tirith_security._resolve_tirith_path", return_value=None
|
||||
):
|
||||
with caplog.at_level("WARNING", logger="tools.tirith_security"):
|
||||
for _ in range(10):
|
||||
result = check_command_security("echo")
|
||||
assert result["action"] == "allow"
|
||||
assert "tirith path unavailable" in result["summary"]
|
||||
|
||||
none_warnings = [
|
||||
rec for rec in caplog.records
|
||||
if "tirith path resolved to None" in rec.message
|
||||
]
|
||||
assert len(none_warnings) == 1
|
||||
|
|
|
|||
|
|
@ -18,18 +18,44 @@ _IS_WINDOWS = platform.system() == "Windows"
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _msys_to_windows_path(cwd: str) -> str:
|
||||
"""Translate a Git Bash / MSYS-style POSIX path (``/c/Users/x``) to the
|
||||
native Windows form (``C:\\Users\\x``) so ``os.path.isdir`` and
|
||||
``subprocess.Popen(..., cwd=...)`` can find it.
|
||||
|
||||
No-ops on non-Windows hosts or for paths that aren't in MSYS form.
|
||||
Returns the input unchanged when no translation applies. This is
|
||||
idempotent — calling it on an already-Windows path returns it as-is.
|
||||
"""
|
||||
if not _IS_WINDOWS or not cwd:
|
||||
return cwd
|
||||
# Match leading "/<single letter>/" or exactly "/<letter>" (bare drive root).
|
||||
m = re.match(r'^/([a-zA-Z])(/.*)?$', cwd)
|
||||
if not m:
|
||||
return cwd
|
||||
drive = m.group(1).upper()
|
||||
tail = (m.group(2) or "").replace('/', '\\')
|
||||
return f"{drive}:{tail or chr(92)}" # chr(92) = backslash, avoid raw-string escape
|
||||
|
||||
|
||||
def _resolve_safe_cwd(cwd: str) -> str:
|
||||
"""Return ``cwd`` if it exists as a directory, else the nearest existing
|
||||
ancestor. Falls back to ``tempfile.gettempdir()`` only if walking up the
|
||||
path can't find any existing directory (effectively never on a healthy
|
||||
filesystem, but cheap belt-and-braces).
|
||||
|
||||
On Windows, also normalizes Git Bash / MSYS-style POSIX paths
|
||||
(``/c/Users/x``) to native Windows form before the isdir check so a
|
||||
perfectly valid ``pwd -P`` result from bash doesn't get rejected as
|
||||
"missing" (see ``_msys_to_windows_path``).
|
||||
|
||||
Used by ``_run_bash`` to recover when the configured cwd is gone — most
|
||||
commonly because a previous tool call deleted its own working directory
|
||||
(issue #17558). Without this guard, ``subprocess.Popen(..., cwd=...)``
|
||||
raises ``FileNotFoundError`` before bash starts, wedging every subsequent
|
||||
terminal call until the gateway restarts.
|
||||
"""
|
||||
cwd = _msys_to_windows_path(cwd) if _IS_WINDOWS else cwd
|
||||
if cwd and os.path.isdir(cwd):
|
||||
return cwd
|
||||
parent = os.path.dirname(cwd) if cwd else ""
|
||||
|
|
@ -455,21 +481,27 @@ class LocalEnvironment(BaseEnvironment):
|
|||
# (issue #17558). Popen would otherwise raise FileNotFoundError on
|
||||
# the cwd before bash starts, wedging every subsequent call until the
|
||||
# gateway restarts.
|
||||
#
|
||||
# On Windows, ``_resolve_safe_cwd`` also normalises Git Bash-style
|
||||
# POSIX paths (``/c/Users/...``) to native form so a perfectly valid
|
||||
# ``pwd -P`` result from bash isn't mistakenly treated as "missing"
|
||||
# and spammed as a warning on every command.
|
||||
safe_cwd = _resolve_safe_cwd(self.cwd)
|
||||
if safe_cwd != self.cwd:
|
||||
logger.warning(
|
||||
"LocalEnvironment cwd %r is missing on disk; "
|
||||
"falling back to %r so terminal commands keep working.",
|
||||
self.cwd,
|
||||
safe_cwd,
|
||||
)
|
||||
# MSYS → Windows translation alone shouldn't surface as a warning
|
||||
# (it's a benign normalization, not a recovery). Only warn when
|
||||
# the directory really doesn't exist on disk.
|
||||
normalized = _msys_to_windows_path(self.cwd) if _IS_WINDOWS else self.cwd
|
||||
if safe_cwd != normalized:
|
||||
logger.warning(
|
||||
"LocalEnvironment cwd %r is missing on disk; "
|
||||
"falling back to %r so terminal commands keep working.",
|
||||
self.cwd,
|
||||
safe_cwd,
|
||||
)
|
||||
self.cwd = safe_cwd
|
||||
|
||||
# On Windows, self.cwd may be a Git Bash-style path (/c/Users/...)
|
||||
# from pwd output. subprocess.Popen needs a native Windows path.
|
||||
_popen_cwd = self.cwd
|
||||
if _IS_WINDOWS and _popen_cwd and re.match(r'^/[a-zA-Z]/', _popen_cwd):
|
||||
_popen_cwd = _popen_cwd[1].upper() + ':' + _popen_cwd[2:].replace('/', '\\')
|
||||
|
||||
proc = subprocess.Popen(
|
||||
args,
|
||||
|
|
@ -571,10 +603,19 @@ class LocalEnvironment(BaseEnvironment):
|
|||
``pwd -P`` on a deleted cwd can leave a stale value in the marker
|
||||
file, and propagating it would re-wedge the next ``Popen``. The
|
||||
``_run_bash`` recovery path will resolve a safe fallback if needed.
|
||||
|
||||
On Windows, the value written by Git Bash's ``pwd -P`` is in
|
||||
MSYS form (``/c/Users/x``). Translate it to native Windows form
|
||||
before validating with ``os.path.isdir`` and before storing on
|
||||
``self.cwd``; otherwise the isdir check rejects every valid
|
||||
result and ``_run_bash`` later prints a misleading "cwd is
|
||||
missing" warning on every command.
|
||||
"""
|
||||
try:
|
||||
with open(self._cwd_file, encoding="utf-8") as f:
|
||||
cwd_path = f.read().strip()
|
||||
if _IS_WINDOWS:
|
||||
cwd_path = _msys_to_windows_path(cwd_path)
|
||||
if cwd_path and os.path.isdir(cwd_path):
|
||||
self.cwd = cwd_path
|
||||
except (OSError, FileNotFoundError):
|
||||
|
|
@ -583,6 +624,30 @@ class LocalEnvironment(BaseEnvironment):
|
|||
# Still strip the marker from output so it's not visible
|
||||
self._extract_cwd_from_output(result)
|
||||
|
||||
def _extract_cwd_from_output(self, result: dict):
|
||||
"""Same semantics as the base class, but on Windows the value
|
||||
emitted by ``pwd -P`` inside Git Bash is in MSYS form
|
||||
(``/c/Users/x``). Normalize to native Windows form and validate
|
||||
the directory exists before assigning to ``self.cwd`` — otherwise
|
||||
``_run_bash``'s safe-cwd recovery would warn on every subsequent
|
||||
command.
|
||||
|
||||
Always defers to the base class for stripping the marker text from
|
||||
``result["output"]`` so output formatting is identical.
|
||||
"""
|
||||
# Snapshot pre-existing cwd, defer to base for parsing + marker
|
||||
# stripping, then validate / normalize whatever it assigned.
|
||||
prev_cwd = self.cwd
|
||||
super()._extract_cwd_from_output(result)
|
||||
if self.cwd != prev_cwd:
|
||||
normalized = _msys_to_windows_path(self.cwd) if _IS_WINDOWS else self.cwd
|
||||
if normalized and os.path.isdir(normalized):
|
||||
self.cwd = normalized
|
||||
else:
|
||||
# Stale / non-existent path — keep previous cwd; _run_bash
|
||||
# will resolve a safe fallback on the next call if needed.
|
||||
self.cwd = prev_cwd
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up temp files."""
|
||||
for f in (self._snapshot_path, self._cwd_file):
|
||||
|
|
|
|||
|
|
@ -101,6 +101,34 @@ _install_failure_reason: str = "" # reason tag when _resolved_path is _INSTALL_
|
|||
_install_lock = threading.Lock()
|
||||
_install_thread: threading.Thread | None = None
|
||||
|
||||
# Warning de-duplication. The spawn/path warnings live in the hot path —
|
||||
# without this dedupe set, a Windows install where ``tirith`` isn't on PATH
|
||||
# (e.g. background install thread still running, or install marked failed)
|
||||
# spams ``tirith spawn failed: [WinError 2]...`` once per terminal command,
|
||||
# easily filling errors.log with hundreds of identical lines.
|
||||
_warned_messages: set[str] = set()
|
||||
_warned_lock = threading.Lock()
|
||||
|
||||
|
||||
def _warn_once(key: str, message: str, *args) -> None:
|
||||
"""``logger.warning`` but at-most-once per ``key`` for the process
|
||||
lifetime. Used to avoid drowning the log when a fail-open tirith
|
||||
misconfiguration fires on every command."""
|
||||
with _warned_lock:
|
||||
if key in _warned_messages:
|
||||
return
|
||||
_warned_messages.add(key)
|
||||
logger.warning(message, *args)
|
||||
|
||||
|
||||
def _reset_spawn_warning_state() -> None:
|
||||
"""Clear the warn-once dedupe set. Called when tirith is freshly
|
||||
(re)installed so a subsequent failure surfaces again — e.g. user
|
||||
deletes the binary mid-session.
|
||||
"""
|
||||
with _warned_lock:
|
||||
_warned_messages.clear()
|
||||
|
||||
# Disk-persistent failure marker — avoids retry across process restarts
|
||||
_MARKER_TTL = 86400 # 24 hours
|
||||
|
||||
|
|
@ -168,6 +196,10 @@ def _mark_install_failed(reason: str = ""):
|
|||
|
||||
def _clear_install_failed():
|
||||
"""Remove the failure marker after successful install."""
|
||||
# Reset the warn-once dedupe set so a subsequent failure (e.g. user
|
||||
# deletes the binary) surfaces in the log again instead of being
|
||||
# silently suppressed by a stale dedupe key from before the fix.
|
||||
_reset_spawn_warning_state()
|
||||
try:
|
||||
os.unlink(_failure_marker_path())
|
||||
except OSError:
|
||||
|
|
@ -632,7 +664,10 @@ def check_command_security(command: str) -> dict:
|
|||
fail_open = cfg["tirith_fail_open"]
|
||||
|
||||
if tirith_path is None:
|
||||
logger.warning("tirith path resolved to None; scanning disabled")
|
||||
_warn_once(
|
||||
"tirith_path_none",
|
||||
"tirith path resolved to None; scanning disabled",
|
||||
)
|
||||
if fail_open:
|
||||
return {"action": "allow", "findings": [], "summary": "tirith path unavailable"}
|
||||
return {"action": "block", "findings": [], "summary": "tirith path unavailable (fail-closed)"}
|
||||
|
|
@ -646,13 +681,23 @@ def check_command_security(command: str) -> dict:
|
|||
timeout=timeout,
|
||||
)
|
||||
except OSError as exc:
|
||||
# Covers FileNotFoundError, PermissionError, exec format error
|
||||
logger.warning("tirith spawn failed: %s", exc)
|
||||
# Covers FileNotFoundError, PermissionError, exec format error.
|
||||
# Dedupe by ``(errno, exc class)`` so a transient failure mode
|
||||
# surfaces once but doesn't drown the log on every command —
|
||||
# commonly seen on Windows when the configured path "tirith"
|
||||
# isn't on PATH yet (background install still running, or
|
||||
# install marked failed for the day).
|
||||
spawn_key = f"tirith_spawn_failed:{type(exc).__name__}:{getattr(exc, 'errno', '')}"
|
||||
_warn_once(spawn_key, "tirith spawn failed: %s", exc)
|
||||
if fail_open:
|
||||
return {"action": "allow", "findings": [], "summary": f"tirith unavailable: {exc}"}
|
||||
return {"action": "block", "findings": [], "summary": f"tirith spawn failed (fail-closed): {exc}"}
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning("tirith timed out after %ds", timeout)
|
||||
_warn_once(
|
||||
f"tirith_timeout:{timeout}",
|
||||
"tirith timed out after %ds",
|
||||
timeout,
|
||||
)
|
||||
if fail_open:
|
||||
return {"action": "allow", "findings": [], "summary": f"tirith timed out ({timeout}s)"}
|
||||
return {"action": "block", "findings": [], "summary": "tirith timed out (fail-closed)"}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue