fix(google_chat): harden oauth credential persistence with atomic private writes (#24788)

This commit is contained in:
Zyrix 2026-05-25 03:58:52 +03:00 committed by GitHub
parent bf2f3b2469
commit 782681f904
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 129 additions and 22 deletions

View file

@ -61,6 +61,8 @@ import json
import logging
import os
import re
import secrets
import stat
import subprocess
import sys
from pathlib import Path
@ -89,6 +91,8 @@ except (ModuleNotFoundError, ImportError):
except ValueError:
return str(home)
from utils import atomic_replace
def _hermes_home() -> Path:
"""Resolve HERMES_HOME at call time (NOT module import).
@ -296,14 +300,11 @@ def list_authorized_emails() -> List[str]:
def _persist_credentials(creds: Any, token_path: Path) -> None:
"""Atomic-ish JSON write of refreshed credentials."""
"""Persist refreshed credentials atomically with private permissions."""
try:
token_path.parent.mkdir(parents=True, exist_ok=True)
token_path.write_text(
json.dumps(
_normalize_authorized_user_payload(json.loads(creds.to_json())),
indent=2,
)
_write_private_json(
token_path,
_normalize_authorized_user_payload(json.loads(creds.to_json())),
)
except Exception:
logger.debug(
@ -325,6 +326,38 @@ def _normalize_authorized_user_payload(payload: dict) -> dict:
return normalized
def _write_private_json(path: Path, data: Any) -> None:
"""Atomically write JSON with 0o600 permissions where supported."""
path.parent.mkdir(parents=True, exist_ok=True)
try:
os.chmod(path.parent, 0o700)
except OSError:
pass
tmp_path = path.with_suffix(f".tmp.{os.getpid()}.{secrets.token_hex(4)}")
try:
fd = os.open(
str(tmp_path),
os.O_WRONLY | os.O_CREAT | os.O_EXCL,
stat.S_IRUSR | stat.S_IWUSR,
)
with os.fdopen(fd, "w", encoding="utf-8") as fh:
json.dump(data, fh, indent=2, ensure_ascii=False)
fh.flush()
os.fsync(fh.fileno())
atomic_replace(tmp_path, path)
try:
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
except OSError:
pass
finally:
try:
if tmp_path.exists():
tmp_path.unlink()
except OSError:
pass
def _ensure_deps() -> None:
"""Check deps available; install if not; exit on failure."""
try:
@ -402,25 +435,21 @@ def store_client_secret(path: str) -> None:
sys.exit(1)
target = _client_secret_path()
target.parent.mkdir(parents=True, exist_ok=True)
target.write_text(json.dumps(data, indent=2))
_write_private_json(target, data)
print(f"OK: Client secret saved to {target}")
def _save_pending_auth(*, state: str, code_verifier: str,
email: Optional[str] = None) -> None:
pending = _pending_auth_path(email)
pending.parent.mkdir(parents=True, exist_ok=True)
pending.write_text(
json.dumps(
{
"state": state,
"code_verifier": code_verifier,
"redirect_uri": _REDIRECT_URI,
"email": email or "",
},
indent=2,
)
_write_private_json(
pending,
{
"state": state,
"code_verifier": code_verifier,
"redirect_uri": _REDIRECT_URI,
"email": email or "",
},
)
@ -548,8 +577,7 @@ def exchange_auth_code(code: str, email: Optional[str] = None) -> None:
token_payload["scopes"] = granted_scopes
token_path = _token_path(email)
token_path.parent.mkdir(parents=True, exist_ok=True)
token_path.write_text(json.dumps(token_payload, indent=2))
_write_private_json(token_path, token_payload)
_pending_auth_path(email).unlink(missing_ok=True)
print(f"OK: Authenticated. Token saved to {token_path}")