diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index 1bcb1af77f..f0cbf8c256 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -985,12 +985,27 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]: def _save_auth_store(auth_store: Dict[str, Any]) -> Path: auth_file = _auth_file_path() auth_file.parent.mkdir(parents=True, exist_ok=True) + # Tighten parent dir to 0o700 so siblings can't traverse to creds. + # No-op on Windows (POSIX mode bits not enforced); ignore failures. + try: + os.chmod(auth_file.parent, 0o700) + except OSError: + pass auth_store["version"] = AUTH_STORE_VERSION auth_store["updated_at"] = datetime.now(timezone.utc).isoformat() payload = json.dumps(auth_store, indent=2) + "\n" tmp_path = auth_file.with_name(f"{auth_file.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}") try: - with tmp_path.open("w", encoding="utf-8") as handle: + # Create with 0o600 atomically via os.open(O_EXCL) + fdopen to close + # the TOCTOU window where default umask (often 0o644) briefly exposed + # OAuth tokens to other local users between open() and chmod(). + # Mirrors agent/google_oauth.py (#19673) and tools/mcp_oauth.py (#21148). + fd = os.open( + str(tmp_path), + os.O_WRONLY | os.O_CREAT | os.O_EXCL, + stat.S_IRUSR | stat.S_IWUSR, + ) + with os.fdopen(fd, "w", encoding="utf-8") as handle: handle.write(payload) handle.flush() os.fsync(handle.fileno()) @@ -1554,10 +1569,33 @@ def _read_qwen_cli_tokens() -> Dict[str, Any]: def _save_qwen_cli_tokens(tokens: Dict[str, Any]) -> Path: auth_path = _qwen_cli_auth_path() auth_path.parent.mkdir(parents=True, exist_ok=True) - tmp_path = auth_path.with_suffix(".tmp") - tmp_path.write_text(json.dumps(tokens, indent=2, sort_keys=True) + "\n", encoding="utf-8") - os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR) - tmp_path.replace(auth_path) + try: + os.chmod(auth_path.parent, 0o700) + except OSError: + pass + # Per-process random temp suffix avoids collisions between concurrent + # writers and stale leftovers from a crashed prior write. + tmp_path = auth_path.with_name(f"{auth_path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}") + # Create with 0o600 atomically via os.open(O_EXCL) — closes the TOCTOU + # window where write_text() + post-write chmod briefly exposed tokens + # at process umask (typically 0o644). See #19673, #21148. + fd = os.open( + str(tmp_path), + os.O_WRONLY | os.O_CREAT | os.O_EXCL, + stat.S_IRUSR | stat.S_IWUSR, + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(json.dumps(tokens, indent=2, sort_keys=True) + "\n") + fh.flush() + os.fsync(fh.fileno()) + atomic_replace(tmp_path, auth_path) + finally: + try: + if tmp_path.exists(): + tmp_path.unlink() + except OSError: + pass return auth_path @@ -2938,13 +2976,31 @@ def _write_shared_nous_state(state: Dict[str, Any]) -> None: with _nous_shared_store_lock(): path = _nous_shared_store_path() path.parent.mkdir(parents=True, exist_ok=True) - tmp = path.with_suffix(path.suffix + ".tmp") - tmp.write_text(json.dumps(shared, indent=2, sort_keys=True)) try: - os.chmod(tmp, 0o600) + os.chmod(path.parent, 0o700) except OSError: pass - os.replace(tmp, path) + tmp = path.with_name(f"{path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}") + # Create with 0o600 atomically via os.open(O_EXCL) — closes the TOCTOU + # window where write_text() + post-write chmod briefly exposed Nous + # refresh_token at process umask. See #19673, #21148. + fd = os.open( + str(tmp), + os.O_WRONLY | os.O_CREAT | os.O_EXCL, + stat.S_IRUSR | stat.S_IWUSR, + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(json.dumps(shared, indent=2, sort_keys=True)) + fh.flush() + os.fsync(fh.fileno()) + os.replace(tmp, path) + finally: + try: + if tmp.exists(): + tmp.unlink() + except OSError: + pass _oauth_trace( "nous_shared_store_written", path=str(path), diff --git a/tests/hermes_cli/test_auth_toctou_file_modes.py b/tests/hermes_cli/test_auth_toctou_file_modes.py new file mode 100644 index 0000000000..c89bafebfe --- /dev/null +++ b/tests/hermes_cli/test_auth_toctou_file_modes.py @@ -0,0 +1,198 @@ +"""Regression tests for TOCTOU-safe credential file writers in ``hermes_cli.auth``. + +Background +========== +The three writers below used to create a temp file via ``Path.write_text`` / +``Path.open('w')`` and only ``chmod``'d it to ``0o600`` afterward. Between +create and chmod the file existed at the process umask (typically ``0o644``), +briefly exposing OAuth tokens to other local users on multi-user hosts. The +fix switches them to ``os.open(O_EXCL, mode=0o600)`` + ``os.fdopen`` + +``fsync`` so the file is atomic at ``0o600`` on creation. Mirrors the fixes +shipped for ``agent/google_oauth.py`` (#19673) and ``tools/mcp_oauth.py`` +(#21148). + +These tests stay green only while the token file and its parent directory +end up at ``0o600`` / ``0o700`` after every write. POSIX-only — the mode-bit +enforcement does not exist on Windows. +""" + +from __future__ import annotations + +import json +import os +import stat +import sys +from unittest.mock import patch + +import pytest + + +pytestmark = pytest.mark.skipif( + sys.platform.startswith("win"), + reason="POSIX mode bits not enforced on Windows", +) + + +# --------------------------------------------------------------------------- +# _save_auth_store (~/.hermes/auth.json — every native OAuth provider) +# --------------------------------------------------------------------------- + + +def test_save_auth_store_writes_0o600_with_0o700_parent(tmp_path, monkeypatch): + """``_save_auth_store`` must land ``auth.json`` at 0o600 and parent at 0o700.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + old_umask = os.umask(0o022) # make the race observable if it regresses + try: + from hermes_cli import auth as auth_mod + + auth_store = { + "version": auth_mod.AUTH_STORE_VERSION, + "providers": {"openai-codex": {"tokens": {"access_token": "secret-x"}}}, + "active_provider": "openai-codex", + } + auth_path = auth_mod._save_auth_store(auth_store) + finally: + os.umask(old_umask) + + mode = stat.S_IMODE(auth_path.stat().st_mode) + parent_mode = stat.S_IMODE(auth_path.parent.stat().st_mode) + + assert mode == 0o600, ( + f"auth.json mode 0o{mode:o} != 0o600 — TOCTOU race regressed" + ) + assert parent_mode == 0o700, ( + f"auth.json parent dir mode 0o{parent_mode:o} != 0o700 — siblings can traverse" + ) + + # Content survived the rewrite + data = json.loads(auth_path.read_text()) + assert data["providers"]["openai-codex"]["tokens"]["access_token"] == "secret-x" + + +# --------------------------------------------------------------------------- +# _save_qwen_cli_tokens (Qwen CLI OAuth tokens) +# --------------------------------------------------------------------------- + + +def test_save_qwen_cli_tokens_writes_0o600_with_0o700_parent(tmp_path, monkeypatch): + """``_save_qwen_cli_tokens`` must land the token file at 0o600 and parent at 0o700.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # The Qwen CLI auth path lives under $HOME/.qwen by default — isolate it. + monkeypatch.setenv("HOME", str(tmp_path)) + old_umask = os.umask(0o022) + try: + from hermes_cli import auth as auth_mod + + tokens = { + "access_token": "qwen-secret", + "refresh_token": "qwen-refresh", + "token_type": "Bearer", + "expiry_date": 123, + } + auth_path = auth_mod._save_qwen_cli_tokens(tokens) + finally: + os.umask(old_umask) + + mode = stat.S_IMODE(auth_path.stat().st_mode) + parent_mode = stat.S_IMODE(auth_path.parent.stat().st_mode) + + assert mode == 0o600, ( + f"Qwen token file mode 0o{mode:o} != 0o600 — TOCTOU race regressed" + ) + assert parent_mode == 0o700, ( + f"Qwen token parent dir mode 0o{parent_mode:o} != 0o700" + ) + + data = json.loads(auth_path.read_text()) + assert data["access_token"] == "qwen-secret" + + +# --------------------------------------------------------------------------- +# Nous shared-credential store write (inside _write_shared_nous_state) +# --------------------------------------------------------------------------- + + +def test_shared_nous_store_writes_0o600_with_0o700_parent(tmp_path, monkeypatch): + """The Nous shared-credential store must land at 0o600 / parent 0o700.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # _nous_shared_store_path() refuses to touch the real shared store during + # pytest runs; redirect it into tmp_path explicitly. + monkeypatch.setenv("HERMES_SHARED_AUTH_DIR", str(tmp_path / "shared")) + old_umask = os.umask(0o022) + try: + from hermes_cli import auth as auth_mod + + state = { + "access_token": "nous-access-xxx", + "refresh_token": "nous-refresh-xxx", + "token_type": "Bearer", + "scope": "openid profile", + "client_id": "test-client", + "obtained_at": "2026-01-01T00:00:00Z", + "expires_at": "2026-01-01T01:00:00Z", + } + auth_mod._write_shared_nous_state(state) + path = auth_mod._nous_shared_store_path() + finally: + os.umask(old_umask) + + assert path.exists(), "shared Nous store was not written" + mode = stat.S_IMODE(path.stat().st_mode) + parent_mode = stat.S_IMODE(path.parent.stat().st_mode) + + assert mode == 0o600, ( + f"Nous shared store mode 0o{mode:o} != 0o600 — TOCTOU race regressed" + ) + assert parent_mode == 0o700, ( + f"Nous shared store parent dir mode 0o{parent_mode:o} != 0o700" + ) + + data = json.loads(path.read_text()) + assert data["refresh_token"] == "nous-refresh-xxx" + + +# --------------------------------------------------------------------------- +# Atomicity: verify ``os.open`` is called with an explicit 0o600 mode. +# --------------------------------------------------------------------------- + + +def test_save_auth_store_uses_os_open_with_0o600_mode(tmp_path, monkeypatch): + """Regression: the writer must call ``os.open`` with an explicit restricted + mode so the file is created at 0o600 atomically — closing the TOCTOU + window the previous ``Path.open('w')`` left open (fd inherited process + umask and was briefly 0o644 before post-write chmod).""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + observed_opens: list[tuple[str, int, int]] = [] + real_os_open = os.open + + def spying_os_open(path, flags, mode=0o777, *args, **kwargs): + observed_opens.append((str(path), flags, mode)) + return real_os_open(path, flags, mode, *args, **kwargs) + + with patch.object(os, "open", spying_os_open): + from hermes_cli import auth as auth_mod + + auth_mod._save_auth_store( + {"version": auth_mod.AUTH_STORE_VERSION, "providers": {}} + ) + + auth_tmp_opens = [ + (p, fl, m) for (p, fl, m) in observed_opens if "auth.json.tmp" in p + ] + assert auth_tmp_opens, ( + f"os.open was never called for the auth.json temp file; " + f"observed={observed_opens!r}" + ) + for path, flags, mode in auth_tmp_opens: + assert flags & os.O_CREAT, f"auth.json temp open missing O_CREAT: path={path}" + assert flags & os.O_EXCL, ( + f"auth.json temp open missing O_EXCL — TOCTOU-safe pattern regressed: " + f"path={path}, flags={flags}" + ) + # Must be exactly S_IRUSR | S_IWUSR (0o600) — no group/other bits. + expected = stat.S_IRUSR | stat.S_IWUSR + assert mode == expected, ( + f"auth.json temp open mode 0o{mode:o} != 0o{expected:o} — " + f"umask would apply and potentially expose tokens" + )