fix(security): close TOCTOU window when saving MCP OAuth credentials

_write_json (the persistence helper used by HermesTokenStorage for both
tokens and client_info) created the temp file via Path.write_text and
only chmod'd it to 0o600 afterward. Between create and chmod the file
existed on disk at the process umask (commonly 0o644 = world-readable),
briefly exposing MCP OAuth access/refresh tokens to other local users.

Use os.open with O_WRONLY|O_CREAT|O_EXCL and an explicit S_IRUSR|S_IWUSR
mode so the file is created atomically at 0o600, plus tighten the parent
dir to 0o700 so siblings can't traverse to the creds file. The temp name
also gains a per-process random suffix to avoid collisions between
concurrent writers and stale leftovers from a crashed prior write.

Mirrors the fix shipped for agent/google_oauth.py in #19673.

Adds a regression test asserting the resulting file mode is 0o600 and
the parent directory is 0o700 (skipped on Windows where POSIX mode bits
aren't enforced).
This commit is contained in:
Gutslabs 2026-05-07 13:45:10 +03:00 committed by Teknium
parent a5c9c83b78
commit 7d36e8346b
2 changed files with 67 additions and 6 deletions

View file

@ -2,6 +2,8 @@
import json import json
import os import os
import stat
import sys
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from unittest.mock import patch, MagicMock, AsyncMock from unittest.mock import patch, MagicMock, AsyncMock
@ -50,6 +52,37 @@ class TestHermesTokenStorage:
data = json.loads(token_path.read_text()) data = json.loads(token_path.read_text())
assert data["access_token"] == "abc123" assert data["access_token"] == "abc123"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="POSIX mode bits not enforced on Windows")
def test_token_file_created_with_0o600(self, tmp_path, monkeypatch):
"""Tokens must land on disk at 0o600 with no umask-default exposure window.
Regression for the TOCTOU race where ``write_text`` + post-write
``chmod`` briefly left credentials at the process umask (commonly
0o644 = world-readable) before tightening to owner-only. Mirrors
the fix shipped for ``agent/google_oauth.py`` in #19673.
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("perm-test-server")
import asyncio
mock_token = MagicMock()
mock_token.model_dump.return_value = {
"access_token": "secret-abc",
"token_type": "Bearer",
"refresh_token": "secret-ref",
}
asyncio.run(storage.set_tokens(mock_token))
token_path = tmp_path / "mcp-tokens" / "perm-test-server.json"
assert token_path.exists()
mode = stat.S_IMODE(token_path.stat().st_mode)
assert mode == 0o600, f"token file mode {oct(mode)} != 0o600 — TOCTOU race regressed"
parent_mode = stat.S_IMODE(token_path.parent.stat().st_mode)
assert parent_mode == 0o700, (
f"token parent dir mode {oct(parent_mode)} != 0o700 — siblings can traverse"
)
def test_roundtrip_client_info(self, tmp_path, monkeypatch): def test_roundtrip_client_info(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path)) monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server") storage = HermesTokenStorage("test-server")

View file

@ -37,7 +37,9 @@ import json
import logging import logging
import os import os
import re import re
import secrets
import socket import socket
import stat
import sys import sys
import threading import threading
import time import time
@ -160,15 +162,41 @@ def _read_json(path: Path) -> dict | None:
def _write_json(path: Path, data: dict) -> None: def _write_json(path: Path, data: dict) -> None:
"""Write a dict as JSON with restricted permissions (0o600).""" """Write a dict as JSON with restricted permissions (0o600).
Uses ``os.open`` with ``O_EXCL`` and an explicit mode so the file is
created atomically at 0o600. The previous ``write_text`` + post-write
``chmod`` opened a TOCTOU window where the temp file briefly inherited
the process umask (commonly 0o644 = world-readable), exposing OAuth
tokens to other local users between create and chmod. Mirrors the fix
in ``agent/google_oauth.py`` (#19673).
"""
path.parent.mkdir(parents=True, exist_ok=True) path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(".tmp") # Tighten parent dir to 0o700 so siblings can't traverse to the creds.
# No-op on Windows (POSIX mode bits aren't enforced); ignore failures.
try: try:
tmp.write_text(json.dumps(data, indent=2, default=str), encoding="utf-8") os.chmod(path.parent, 0o700)
os.chmod(tmp, 0o600)
tmp.rename(path)
except OSError: except OSError:
tmp.unlink(missing_ok=True) pass
# Per-process random suffix avoids collisions between concurrent
# writers and stale leftovers from a prior crashed write.
tmp = path.with_suffix(f".tmp.{os.getpid()}.{secrets.token_hex(4)}")
try:
fd = os.open(
str(tmp),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, default=str)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp, path)
except OSError:
try:
tmp.unlink(missing_ok=True)
except OSError:
pass
raise raise