fix: protect profile-scoped google workspace oauth tokens

This commit is contained in:
kshitijk4poor 2026-04-03 11:55:45 +05:30 committed by Teknium
parent 92dcdbff66
commit 37e2ef6c3f
6 changed files with 250 additions and 10 deletions

View file

@ -23,12 +23,13 @@ Agent workflow:
import argparse
import json
import os
import subprocess
import sys
from pathlib import Path
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
from hermes_constants import display_hermes_home, get_hermes_home
HERMES_HOME = get_hermes_home()
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json"
@ -52,6 +53,39 @@ REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google
REDIRECT_URI = "http://localhost:1"
def _load_token_payload(path: Path = TOKEN_PATH) -> dict:
try:
return json.loads(path.read_text())
except FileNotFoundError:
return {}
except Exception:
return {}
def _normalize_scope_values(values) -> set[str]:
if not values:
return set()
if isinstance(values, str):
values = values.split()
return {str(value).strip() for value in values if str(value).strip()}
def _missing_scopes_from_payload(payload: dict) -> list[str]:
granted = _normalize_scope_values(payload.get("scopes") or payload.get("scope"))
if not granted:
return []
return sorted(scope for scope in SCOPES if scope not in granted)
def _format_missing_scopes(missing_scopes: list[str]) -> str:
bullets = "\n".join(f" - {scope}" for scope in missing_scopes)
return (
"Token is valid but missing required Google Workspace scopes:\n"
f"{bullets}\n"
"Run the Google Workspace setup again from this same Hermes profile to refresh consent."
)
def install_deps():
"""Install Google API packages if missing. Returns True on success."""
try:
@ -102,7 +136,12 @@ def check_auth():
print(f"TOKEN_CORRUPT: {e}")
return False
payload = _load_token_payload(TOKEN_PATH)
if creds.valid:
missing_scopes = _missing_scopes_from_payload(payload)
if missing_scopes:
print(f"AUTH_SCOPE_MISMATCH: {_format_missing_scopes(missing_scopes)}")
return False
print(f"AUTHENTICATED: Token valid at {TOKEN_PATH}")
return True
@ -110,6 +149,10 @@ def check_auth():
try:
creds.refresh(Request())
TOKEN_PATH.write_text(creds.to_json())
missing_scopes = _missing_scopes_from_payload(_load_token_payload(TOKEN_PATH))
if missing_scopes:
print(f"AUTH_SCOPE_MISMATCH: {_format_missing_scopes(missing_scopes)}")
return False
print(f"AUTHENTICATED: Token refreshed at {TOKEN_PATH}")
return True
except Exception as e:
@ -249,9 +292,17 @@ def exchange_auth_code(code: str):
sys.exit(1)
creds = flow.credentials
TOKEN_PATH.write_text(creds.to_json())
token_payload = json.loads(creds.to_json())
missing_scopes = _missing_scopes_from_payload(token_payload)
if missing_scopes:
print(f"ERROR: Refusing to save incomplete Google Workspace token. {_format_missing_scopes(missing_scopes)}")
print(f"Existing token at {TOKEN_PATH} was left unchanged.")
sys.exit(1)
TOKEN_PATH.write_text(json.dumps(token_payload, indent=2))
PENDING_AUTH_PATH.unlink(missing_ok=True)
print(f"OK: Authenticated. Token saved to {TOKEN_PATH}")
print(f"Profile-scoped token location: {display_hermes_home()}/google_token.json")
def revoke():