#!/usr/bin/env python3 """Google Workspace OAuth2 setup for Hermes Agent. Fully non-interactive — designed to be driven by the agent via terminal commands. The agent mediates between this script and the user (works on CLI, Telegram, Discord, etc.) Commands: setup.py --check # Is auth valid? Exit 0 = yes, 1 = no setup.py --client-secret /path/to.json # Store OAuth client credentials setup.py --auth-url # Print the OAuth URL for user to visit setup.py --auth-code CODE # Exchange auth code for token setup.py --revoke # Revoke and delete stored token setup.py --install-deps # Install Python dependencies only Agent workflow: 1. Run --check. If exit 0, auth is good — skip setup. 2. Ask user for client_secret.json path. Run --client-secret PATH. 3. Run --auth-url. Send the printed URL to the user. 4. User opens URL, authorizes, gets redirected to a page with a code. 5. User pastes the code. Agent runs --auth-code CODE. 6. Run --check to verify. Done. """ import argparse import json import os import subprocess import sys from pathlib import Path HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) TOKEN_PATH = HERMES_HOME / "google_token.json" CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json" PENDING_AUTH_PATH = HERMES_HOME / "google_oauth_pending.json" SCOPES = [ "https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/gmail.send", "https://www.googleapis.com/auth/gmail.modify", "https://www.googleapis.com/auth/calendar", "https://www.googleapis.com/auth/drive.readonly", "https://www.googleapis.com/auth/contacts.readonly", "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/documents.readonly", ] REQUIRED_PACKAGES = ["google-api-python-client", "google-auth-oauthlib", "google-auth-httplib2"] # OAuth redirect for "out of band" manual code copy flow. # Google deprecated OOB, so we use a localhost redirect and tell the user to # copy the code from the browser's URL bar (or the page body). REDIRECT_URI = "http://localhost:1" def install_deps(): """Install Google API packages if missing. Returns True on success.""" try: import googleapiclient # noqa: F401 import google_auth_oauthlib # noqa: F401 print("Dependencies already installed.") return True except ImportError: pass print("Installing Google API dependencies...") try: subprocess.check_call( [sys.executable, "-m", "pip", "install", "--quiet"] + REQUIRED_PACKAGES, stdout=subprocess.DEVNULL, ) print("Dependencies installed.") return True except subprocess.CalledProcessError as e: print(f"ERROR: Failed to install dependencies: {e}") print(f"Try manually: {sys.executable} -m pip install {' '.join(REQUIRED_PACKAGES)}") return False def _ensure_deps(): """Check deps are available, install if not, exit on failure.""" try: import googleapiclient # noqa: F401 import google_auth_oauthlib # noqa: F401 except ImportError: if not install_deps(): sys.exit(1) def check_auth(): """Check if stored credentials are valid. Prints status, exits 0 or 1.""" if not TOKEN_PATH.exists(): print(f"NOT_AUTHENTICATED: No token at {TOKEN_PATH}") return False _ensure_deps() from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request try: creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES) except Exception as e: print(f"TOKEN_CORRUPT: {e}") return False if creds.valid: print(f"AUTHENTICATED: Token valid at {TOKEN_PATH}") return True if creds.expired and creds.refresh_token: try: creds.refresh(Request()) TOKEN_PATH.write_text(creds.to_json()) print(f"AUTHENTICATED: Token refreshed at {TOKEN_PATH}") return True except Exception as e: print(f"REFRESH_FAILED: {e}") return False print("TOKEN_INVALID: Re-run setup.") return False def store_client_secret(path: str): """Copy and validate client_secret.json to Hermes home.""" src = Path(path).expanduser().resolve() if not src.exists(): print(f"ERROR: File not found: {src}") sys.exit(1) try: data = json.loads(src.read_text()) except json.JSONDecodeError: print("ERROR: File is not valid JSON.") sys.exit(1) if "installed" not in data and "web" not in data: print("ERROR: Not a Google OAuth client secret file (missing 'installed' key).") print("Download the correct file from: https://console.cloud.google.com/apis/credentials") sys.exit(1) CLIENT_SECRET_PATH.write_text(json.dumps(data, indent=2)) print(f"OK: Client secret saved to {CLIENT_SECRET_PATH}") def _save_pending_auth(*, state: str, code_verifier: str): """Persist the OAuth session bits needed for a later token exchange.""" PENDING_AUTH_PATH.write_text( json.dumps( { "state": state, "code_verifier": code_verifier, "redirect_uri": REDIRECT_URI, }, indent=2, ) ) def _load_pending_auth() -> dict: """Load the pending OAuth session created by get_auth_url().""" if not PENDING_AUTH_PATH.exists(): print("ERROR: No pending OAuth session found. Run --auth-url first.") sys.exit(1) try: data = json.loads(PENDING_AUTH_PATH.read_text()) except Exception as e: print(f"ERROR: Could not read pending OAuth session: {e}") print("Run --auth-url again to start a fresh OAuth session.") sys.exit(1) if not data.get("state") or not data.get("code_verifier"): print("ERROR: Pending OAuth session is missing PKCE data.") print("Run --auth-url again to start a fresh OAuth session.") sys.exit(1) return data def _extract_code_and_state(code_or_url: str) -> tuple[str, str | None]: """Accept either a raw auth code or the full redirect URL pasted by the user.""" if not code_or_url.startswith("http"): return code_or_url, None from urllib.parse import parse_qs, urlparse parsed = urlparse(code_or_url) params = parse_qs(parsed.query) if "code" not in params: print("ERROR: No 'code' parameter found in URL.") sys.exit(1) state = params.get("state", [None])[0] return params["code"][0], state def get_auth_url(): """Print the OAuth authorization URL. User visits this in a browser.""" if not CLIENT_SECRET_PATH.exists(): print("ERROR: No client secret stored. Run --client-secret first.") sys.exit(1) _ensure_deps() from google_auth_oauthlib.flow import Flow flow = Flow.from_client_secrets_file( str(CLIENT_SECRET_PATH), scopes=SCOPES, redirect_uri=REDIRECT_URI, autogenerate_code_verifier=True, ) auth_url, state = flow.authorization_url( access_type="offline", prompt="consent", ) _save_pending_auth(state=state, code_verifier=flow.code_verifier) # Print just the URL so the agent can extract it cleanly print(auth_url) def exchange_auth_code(code: str): """Exchange the authorization code for a token and save it.""" if not CLIENT_SECRET_PATH.exists(): print("ERROR: No client secret stored. Run --client-secret first.") sys.exit(1) pending_auth = _load_pending_auth() code, returned_state = _extract_code_and_state(code) if returned_state and returned_state != pending_auth["state"]: print("ERROR: OAuth state mismatch. Run --auth-url again to start a fresh session.") sys.exit(1) _ensure_deps() from google_auth_oauthlib.flow import Flow flow = Flow.from_client_secrets_file( str(CLIENT_SECRET_PATH), scopes=SCOPES, redirect_uri=pending_auth.get("redirect_uri", REDIRECT_URI), state=pending_auth["state"], code_verifier=pending_auth["code_verifier"], ) try: flow.fetch_token(code=code) except Exception as e: print(f"ERROR: Token exchange failed: {e}") print("The code may have expired. Run --auth-url to get a fresh URL.") sys.exit(1) creds = flow.credentials TOKEN_PATH.write_text(creds.to_json()) PENDING_AUTH_PATH.unlink(missing_ok=True) print(f"OK: Authenticated. Token saved to {TOKEN_PATH}") def revoke(): """Revoke stored token and delete it.""" if not TOKEN_PATH.exists(): print("No token to revoke.") return _ensure_deps() from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request try: creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES) if creds.expired and creds.refresh_token: creds.refresh(Request()) import urllib.request urllib.request.urlopen( urllib.request.Request( f"https://oauth2.googleapis.com/revoke?token={creds.token}", method="POST", headers={"Content-Type": "application/x-www-form-urlencoded"}, ) ) print("Token revoked with Google.") except Exception as e: print(f"Remote revocation failed (token may already be invalid): {e}") TOKEN_PATH.unlink(missing_ok=True) PENDING_AUTH_PATH.unlink(missing_ok=True) print(f"Deleted {TOKEN_PATH}") def main(): parser = argparse.ArgumentParser(description="Google Workspace OAuth setup for Hermes") group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--check", action="store_true", help="Check if auth is valid (exit 0=yes, 1=no)") group.add_argument("--client-secret", metavar="PATH", help="Store OAuth client_secret.json") group.add_argument("--auth-url", action="store_true", help="Print OAuth URL for user to visit") group.add_argument("--auth-code", metavar="CODE", help="Exchange auth code for token") group.add_argument("--revoke", action="store_true", help="Revoke and delete stored token") group.add_argument("--install-deps", action="store_true", help="Install Python dependencies") args = parser.parse_args() if args.check: sys.exit(0 if check_auth() else 1) elif args.client_secret: store_client_secret(args.client_secret) elif args.auth_url: get_auth_url() elif args.auth_code: exchange_auth_code(args.auth_code) elif args.revoke: revoke() elif args.install_deps: sys.exit(0 if install_deps() else 1) if __name__ == "__main__": main()