fix(security): close TOCTOU window in hermes_cli/auth.py credential writers (#21194)

`_save_auth_store`, `_save_qwen_cli_tokens`, and `_write_shared_nous_state`
all created the temp file via `Path.open('w')` / `Path.write_text` and only
tightened permissions to 0o600 afterward. Between create and chmod the file
existed at the process umask (commonly 0o644 = world-readable on multi-user
hosts), briefly exposing OAuth access/refresh tokens for Nous, Codex,
Copilot, Claude, Qwen, Gemini, and every other native OAuth provider that
flows through auth.json.

Switch all three to `os.open(O_WRONLY|O_CREAT|O_EXCL, 0o600)` + `os.fdopen`
+ `fsync` so the file is atomic at 0o600 on creation. Tighten each parent
directory (`~/.hermes/`, Qwen auth dir, Nous shared auth dir) to 0o700 so
siblings can't traverse to the creds. `_save_auth_store` also gains a
per-process random temp suffix to match `agent/google_oauth.py` (#19673)
and `tools/mcp_oauth.py` (#21148).

Adds `tests/hermes_cli/test_auth_toctou_file_modes.py` asserting final
file mode 0o600 and parent dir mode 0o700 across all three writers, plus
an explicit `os.open(flags, mode)` check on the main auth.json writer
that would fail if anyone reintroduces the `Path.open('w')` pattern.
POSIX-only (mode bits skipped on Windows).
This commit is contained in:
Teknium 2026-05-07 05:12:05 -07:00 committed by GitHub
parent 991df4ef81
commit 042eb930e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 263 additions and 9 deletions

View file

@ -985,12 +985,27 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]:
def _save_auth_store(auth_store: Dict[str, Any]) -> Path:
auth_file = _auth_file_path()
auth_file.parent.mkdir(parents=True, exist_ok=True)
# Tighten parent dir to 0o700 so siblings can't traverse to creds.
# No-op on Windows (POSIX mode bits not enforced); ignore failures.
try:
os.chmod(auth_file.parent, 0o700)
except OSError:
pass
auth_store["version"] = AUTH_STORE_VERSION
auth_store["updated_at"] = datetime.now(timezone.utc).isoformat()
payload = json.dumps(auth_store, indent=2) + "\n"
tmp_path = auth_file.with_name(f"{auth_file.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
try:
with tmp_path.open("w", encoding="utf-8") as handle:
# Create with 0o600 atomically via os.open(O_EXCL) + fdopen to close
# the TOCTOU window where default umask (often 0o644) briefly exposed
# OAuth tokens to other local users between open() and chmod().
# Mirrors agent/google_oauth.py (#19673) and tools/mcp_oauth.py (#21148).
fd = os.open(
str(tmp_path),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(payload)
handle.flush()
os.fsync(handle.fileno())
@ -1554,10 +1569,33 @@ def _read_qwen_cli_tokens() -> Dict[str, Any]:
def _save_qwen_cli_tokens(tokens: Dict[str, Any]) -> Path:
auth_path = _qwen_cli_auth_path()
auth_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = auth_path.with_suffix(".tmp")
tmp_path.write_text(json.dumps(tokens, indent=2, sort_keys=True) + "\n", encoding="utf-8")
os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR)
tmp_path.replace(auth_path)
try:
os.chmod(auth_path.parent, 0o700)
except OSError:
pass
# Per-process random temp suffix avoids collisions between concurrent
# writers and stale leftovers from a crashed prior write.
tmp_path = auth_path.with_name(f"{auth_path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
# Create with 0o600 atomically via os.open(O_EXCL) — closes the TOCTOU
# window where write_text() + post-write chmod briefly exposed tokens
# at process umask (typically 0o644). See #19673, #21148.
fd = os.open(
str(tmp_path),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(json.dumps(tokens, indent=2, sort_keys=True) + "\n")
fh.flush()
os.fsync(fh.fileno())
atomic_replace(tmp_path, auth_path)
finally:
try:
if tmp_path.exists():
tmp_path.unlink()
except OSError:
pass
return auth_path
@ -2938,13 +2976,31 @@ def _write_shared_nous_state(state: Dict[str, Any]) -> None:
with _nous_shared_store_lock():
path = _nous_shared_store_path()
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(shared, indent=2, sort_keys=True))
try:
os.chmod(tmp, 0o600)
os.chmod(path.parent, 0o700)
except OSError:
pass
os.replace(tmp, path)
tmp = path.with_name(f"{path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}")
# Create with 0o600 atomically via os.open(O_EXCL) — closes the TOCTOU
# window where write_text() + post-write chmod briefly exposed Nous
# refresh_token at process umask. See #19673, #21148.
fd = os.open(
str(tmp),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as fh:
fh.write(json.dumps(shared, indent=2, sort_keys=True))
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp, path)
finally:
try:
if tmp.exists():
tmp.unlink()
except OSError:
pass
_oauth_trace(
"nous_shared_store_written",
path=str(path),