""" Hermes Agent — Web UI server. Provides a FastAPI backend serving the Vite/React frontend and REST API endpoints for managing configuration, environment variables, and sessions. Usage: python -m hermes_cli.main web # Start on http://127.0.0.1:9119 python -m hermes_cli.main web --port 8080 """ import asyncio import hmac import importlib.util import json import logging import os import secrets import subprocess import sys import threading import time import urllib.parse import urllib.request from pathlib import Path from typing import Any, Dict, List, Optional import yaml PROJECT_ROOT = Path(__file__).parent.parent.resolve() if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from hermes_cli import __version__, __release_date__ from hermes_cli.config import ( DEFAULT_CONFIG, OPTIONAL_ENV_VARS, get_config_path, get_env_path, get_hermes_home, load_config, load_env, save_config, save_env_value, remove_env_value, check_config_version, redact_key, ) from gateway.status import get_running_pid, read_runtime_status try: from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel except ImportError: raise SystemExit( "Web UI requires fastapi and uvicorn.\n" f"Install with: {sys.executable} -m pip install 'fastapi' 'uvicorn[standard]'" ) WEB_DIST = Path(os.environ["HERMES_WEB_DIST"]) if "HERMES_WEB_DIST" in os.environ else Path(__file__).parent / "web_dist" _log = logging.getLogger(__name__) app = FastAPI(title="Hermes Agent", version=__version__) # --------------------------------------------------------------------------- # Session token for protecting sensitive endpoints (reveal). # Generated fresh on every server start — dies when the process exits. # Injected into the SPA HTML so only the legitimate web UI can use it. # --------------------------------------------------------------------------- _SESSION_TOKEN = secrets.token_urlsafe(32) _SESSION_HEADER_NAME = "X-Hermes-Session-Token" # Simple rate limiter for the reveal endpoint _reveal_timestamps: List[float] = [] _REVEAL_MAX_PER_WINDOW = 5 _REVEAL_WINDOW_SECONDS = 30 # CORS: restrict to localhost origins only. The web UI is intended to run # locally; binding to 0.0.0.0 with allow_origins=["*"] would let any website # read/modify config and secrets. app.add_middleware( CORSMiddleware, allow_origin_regex=r"^https?://(localhost|127\.0\.0\.1)(:\d+)?$", allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # Endpoints that do NOT require the session token. Everything else under # /api/ is gated by the auth middleware below. Keep this list minimal — # only truly non-sensitive, read-only endpoints belong here. # --------------------------------------------------------------------------- _PUBLIC_API_PATHS: frozenset = frozenset({ "/api/status", "/api/config/defaults", "/api/config/schema", "/api/model/info", "/api/dashboard/themes", "/api/dashboard/plugins", "/api/dashboard/plugins/rescan", }) def _has_valid_session_token(request: Request) -> bool: """True if the request carries a valid dashboard session token. The dedicated session header avoids collisions with reverse proxies that already use ``Authorization`` (for example Caddy ``basic_auth``). We still accept the legacy Bearer path for backward compatibility with older dashboard bundles. """ session_header = request.headers.get(_SESSION_HEADER_NAME, "") if session_header and hmac.compare_digest( session_header.encode(), _SESSION_TOKEN.encode(), ): return True auth = request.headers.get("authorization", "") expected = f"Bearer {_SESSION_TOKEN}" return hmac.compare_digest(auth.encode(), expected.encode()) def _require_token(request: Request) -> None: """Validate the ephemeral session token. Raises 401 on mismatch.""" if not _has_valid_session_token(request): raise HTTPException(status_code=401, detail="Unauthorized") # Accepted Host header values for loopback binds. DNS rebinding attacks # point a victim browser at an attacker-controlled hostname (evil.test) # which resolves to 127.0.0.1 after a TTL flip — bypassing same-origin # checks because the browser now considers evil.test and our dashboard # "same origin". Validating the Host header at the app layer rejects any # request whose Host isn't one we bound for. See GHSA-ppp5-vxwm-4cf7. _LOOPBACK_HOST_VALUES: frozenset = frozenset({ "localhost", "127.0.0.1", "::1", }) def _is_accepted_host(host_header: str, bound_host: str) -> bool: """True if the Host header targets the interface we bound to. Accepts: - Exact bound host (with or without port suffix) - Loopback aliases when bound to loopback - Any host when bound to 0.0.0.0 (explicit opt-in to non-loopback, no protection possible at this layer) """ if not host_header: return False # Strip port suffix. IPv6 addresses use bracket notation: # [::1] — no port # [::1]:9119 — with port # Plain hosts/v4: # localhost:9119 # 127.0.0.1:9119 h = host_header.strip() if h.startswith("["): # IPv6 bracketed — port (if any) follows "]:" close = h.find("]") if close != -1: host_only = h[1:close] # strip brackets else: host_only = h.strip("[]") else: host_only = h.rsplit(":", 1)[0] if ":" in h else h host_only = host_only.lower() # 0.0.0.0 bind means operator explicitly opted into all-interfaces # (requires --insecure per web_server.start_server). No Host-layer # defence can protect that mode; rely on operator network controls. if bound_host in ("0.0.0.0", "::"): return True # Loopback bind: accept the loopback names bound_lc = bound_host.lower() if bound_lc in _LOOPBACK_HOST_VALUES: return host_only in _LOOPBACK_HOST_VALUES # Explicit non-loopback bind: require exact host match return host_only == bound_lc @app.middleware("http") async def host_header_middleware(request: Request, call_next): """Reject requests whose Host header doesn't match the bound interface. Defends against DNS rebinding: a victim browser on a localhost dashboard is tricked into fetching from an attacker hostname that TTL-flips to 127.0.0.1. CORS and same-origin checks don't help — the browser now treats the attacker origin as same-origin with the dashboard. Host-header validation at the app layer catches it. See GHSA-ppp5-vxwm-4cf7. """ # Store the bound host on app.state so this middleware can read it — # set by start_server() at listen time. bound_host = getattr(app.state, "bound_host", None) if bound_host: host_header = request.headers.get("host", "") if not _is_accepted_host(host_header, bound_host): return JSONResponse( status_code=400, content={ "detail": ( "Invalid Host header. Dashboard requests must use " "the hostname the server was bound to." ), }, ) return await call_next(request) @app.middleware("http") async def auth_middleware(request: Request, call_next): """Require the session token on all /api/ routes except the public list.""" path = request.url.path if path.startswith("/api/") and path not in _PUBLIC_API_PATHS and not path.startswith("/api/plugins/"): if not _has_valid_session_token(request): return JSONResponse( status_code=401, content={"detail": "Unauthorized"}, ) return await call_next(request) # --------------------------------------------------------------------------- # Config schema — auto-generated from DEFAULT_CONFIG # --------------------------------------------------------------------------- # Manual overrides for fields that need select options or custom types _SCHEMA_OVERRIDES: Dict[str, Dict[str, Any]] = { "model": { "type": "string", "description": "Default model (e.g. anthropic/claude-sonnet-4.6)", "category": "general", }, "model_context_length": { "type": "number", "description": "Context window override (0 = auto-detect from model metadata)", "category": "general", }, "terminal.backend": { "type": "select", "description": "Terminal execution backend", "options": ["local", "docker", "ssh", "modal", "daytona", "singularity"], }, "terminal.modal_mode": { "type": "select", "description": "Modal sandbox mode", "options": ["sandbox", "function"], }, "tts.provider": { "type": "select", "description": "Text-to-speech provider", "options": ["edge", "elevenlabs", "openai", "neutts"], }, "stt.provider": { "type": "select", "description": "Speech-to-text provider", "options": ["local", "openai", "mistral"], }, "display.skin": { "type": "select", "description": "CLI visual theme", "options": ["default", "ares", "mono", "slate"], }, "dashboard.theme": { "type": "select", "description": "Web dashboard visual theme", "options": ["default", "midnight", "ember", "mono", "cyberpunk", "rose"], }, "display.resume_display": { "type": "select", "description": "How resumed sessions display history", "options": ["minimal", "full", "off"], }, "display.busy_input_mode": { "type": "select", "description": "Input behavior while agent is running", "options": ["queue", "interrupt", "block"], }, "memory.provider": { "type": "select", "description": "Memory provider plugin", "options": ["builtin", "honcho"], }, "approvals.mode": { "type": "select", "description": "Dangerous command approval mode", "options": ["ask", "yolo", "deny"], }, "context.engine": { "type": "select", "description": "Context management engine", "options": ["default", "custom"], }, "human_delay.mode": { "type": "select", "description": "Simulated typing delay mode", "options": ["off", "typing", "fixed"], }, "logging.level": { "type": "select", "description": "Log level for agent.log", "options": ["DEBUG", "INFO", "WARNING", "ERROR"], }, "agent.service_tier": { "type": "select", "description": "API service tier (OpenAI/Anthropic)", "options": ["", "auto", "default", "flex"], }, "delegation.reasoning_effort": { "type": "select", "description": "Reasoning effort for delegated subagents", "options": ["", "low", "medium", "high"], }, } # Categories with fewer fields get merged into "general" to avoid tab sprawl. _CATEGORY_MERGE: Dict[str, str] = { "privacy": "security", "context": "agent", "skills": "agent", "cron": "agent", "network": "agent", "checkpoints": "agent", "approvals": "security", "human_delay": "display", "dashboard": "display", "code_execution": "agent", } # Display order for tabs — unlisted categories sort alphabetically after these. _CATEGORY_ORDER = [ "general", "agent", "terminal", "display", "delegation", "memory", "compression", "security", "browser", "voice", "tts", "stt", "logging", "discord", "auxiliary", ] def _infer_type(value: Any) -> str: """Infer a UI field type from a Python value.""" if isinstance(value, bool): return "boolean" if isinstance(value, int): return "number" if isinstance(value, float): return "number" if isinstance(value, list): return "list" if isinstance(value, dict): return "object" return "string" def _build_schema_from_config( config: Dict[str, Any], prefix: str = "", ) -> Dict[str, Dict[str, Any]]: """Walk DEFAULT_CONFIG and produce a flat dot-path → field schema dict.""" schema: Dict[str, Dict[str, Any]] = {} for key, value in config.items(): full_key = f"{prefix}.{key}" if prefix else key # Skip internal / version keys if full_key in ("_config_version",): continue # Category is the first path component for nested keys, or "general" # for top-level scalar fields (model, toolsets, timezone, etc.). if prefix: category = prefix.split(".")[0] elif isinstance(value, dict): category = key else: category = "general" if isinstance(value, dict): # Recurse into nested dicts schema.update(_build_schema_from_config(value, full_key)) else: entry: Dict[str, Any] = { "type": _infer_type(value), "description": full_key.replace(".", " → ").replace("_", " ").title(), "category": category, } # Apply manual overrides if full_key in _SCHEMA_OVERRIDES: entry.update(_SCHEMA_OVERRIDES[full_key]) # Merge small categories entry["category"] = _CATEGORY_MERGE.get(entry["category"], entry["category"]) schema[full_key] = entry return schema CONFIG_SCHEMA = _build_schema_from_config(DEFAULT_CONFIG) # Inject virtual fields that don't live in DEFAULT_CONFIG but are surfaced # by the normalize/denormalize cycle. Insert model_context_length right after # the "model" key so it renders adjacent in the frontend. _mcl_entry = _SCHEMA_OVERRIDES["model_context_length"] _ordered_schema: Dict[str, Dict[str, Any]] = {} for _k, _v in CONFIG_SCHEMA.items(): _ordered_schema[_k] = _v if _k == "model": _ordered_schema["model_context_length"] = _mcl_entry CONFIG_SCHEMA = _ordered_schema class ConfigUpdate(BaseModel): config: dict class EnvVarUpdate(BaseModel): key: str value: str class EnvVarDelete(BaseModel): key: str class EnvVarReveal(BaseModel): key: str _GATEWAY_HEALTH_URL = os.getenv("GATEWAY_HEALTH_URL") try: _GATEWAY_HEALTH_TIMEOUT = float(os.getenv("GATEWAY_HEALTH_TIMEOUT", "3")) except (ValueError, TypeError): _log.warning( "Invalid GATEWAY_HEALTH_TIMEOUT value %r — using default 3.0s", os.getenv("GATEWAY_HEALTH_TIMEOUT"), ) _GATEWAY_HEALTH_TIMEOUT = 3.0 def _probe_gateway_health() -> tuple[bool, dict | None]: """Probe the gateway via its HTTP health endpoint (cross-container). Uses ``/health/detailed`` first (returns full state), falling back to the simpler ``/health`` endpoint. Returns ``(is_alive, body_dict)``. Accepts any of these as ``GATEWAY_HEALTH_URL``: - ``http://gateway:8642`` (base URL — recommended) - ``http://gateway:8642/health`` (explicit health path) - ``http://gateway:8642/health/detailed`` (explicit detailed path) This is a **blocking** call — run via ``run_in_executor`` from async code. """ if not _GATEWAY_HEALTH_URL: return False, None # Normalise to base URL so we always probe the right paths regardless of # whether the user included /health or /health/detailed in the env var. base = _GATEWAY_HEALTH_URL.rstrip("/") if base.endswith("/health/detailed"): base = base[: -len("/health/detailed")] elif base.endswith("/health"): base = base[: -len("/health")] for path in (f"{base}/health/detailed", f"{base}/health"): try: req = urllib.request.Request(path, method="GET") with urllib.request.urlopen(req, timeout=_GATEWAY_HEALTH_TIMEOUT) as resp: if resp.status == 200: body = json.loads(resp.read()) return True, body except Exception: continue return False, None @app.get("/api/status") async def get_status(): current_ver, latest_ver = check_config_version() # --- Gateway liveness detection --- # Try local PID check first (same-host). If that fails and a remote # GATEWAY_HEALTH_URL is configured, probe the gateway over HTTP so the # dashboard works when the gateway runs in a separate container. gateway_pid = get_running_pid() gateway_running = gateway_pid is not None remote_health_body: dict | None = None if not gateway_running and _GATEWAY_HEALTH_URL: loop = asyncio.get_event_loop() alive, remote_health_body = await loop.run_in_executor( None, _probe_gateway_health ) if alive: gateway_running = True # PID from the remote container (display only — not locally valid) if remote_health_body: gateway_pid = remote_health_body.get("pid") gateway_state = None gateway_platforms: dict = {} gateway_exit_reason = None gateway_updated_at = None configured_gateway_platforms: set[str] | None = None try: from gateway.config import load_gateway_config gateway_config = load_gateway_config() configured_gateway_platforms = { platform.value for platform in gateway_config.get_connected_platforms() } except Exception: configured_gateway_platforms = None # Prefer the detailed health endpoint response (has full state) when the # local runtime status file is absent or stale (cross-container). runtime = read_runtime_status() if runtime is None and remote_health_body and remote_health_body.get("gateway_state"): runtime = remote_health_body if runtime: gateway_state = runtime.get("gateway_state") gateway_platforms = runtime.get("platforms") or {} if configured_gateway_platforms is not None: gateway_platforms = { key: value for key, value in gateway_platforms.items() if key in configured_gateway_platforms } gateway_exit_reason = runtime.get("exit_reason") gateway_updated_at = runtime.get("updated_at") if not gateway_running: gateway_state = gateway_state if gateway_state in ("stopped", "startup_failed") else "stopped" gateway_platforms = {} elif gateway_running and remote_health_body is not None: # The health probe confirmed the gateway is alive, but the local # runtime status file may be stale (cross-container). Override # stopped/None state so the dashboard shows the correct badge. if gateway_state in (None, "stopped"): gateway_state = "running" # If there was no runtime info at all but the health probe confirmed alive, # ensure we still report the gateway as running (no shared volume scenario). if gateway_running and gateway_state is None and remote_health_body is not None: gateway_state = "running" active_sessions = 0 try: from hermes_state import SessionDB db = SessionDB() try: sessions = db.list_sessions_rich(limit=50) now = time.time() active_sessions = sum( 1 for s in sessions if s.get("ended_at") is None and (now - s.get("last_active", s.get("started_at", 0))) < 300 ) finally: db.close() except Exception: pass return { "version": __version__, "release_date": __release_date__, "hermes_home": str(get_hermes_home()), "config_path": str(get_config_path()), "env_path": str(get_env_path()), "config_version": current_ver, "latest_config_version": latest_ver, "gateway_running": gateway_running, "gateway_pid": gateway_pid, "gateway_health_url": _GATEWAY_HEALTH_URL, "gateway_state": gateway_state, "gateway_platforms": gateway_platforms, "gateway_exit_reason": gateway_exit_reason, "gateway_updated_at": gateway_updated_at, "active_sessions": active_sessions, } # --------------------------------------------------------------------------- # Gateway + update actions (invoked from the Status page). # # Both commands are spawned as detached subprocesses so the HTTP request # returns immediately. stdin is closed (``DEVNULL``) so any stray ``input()`` # calls fail fast with EOF rather than hanging forever. stdout/stderr are # streamed to a per-action log file under ``~/.hermes/logs/.log`` so # the dashboard can tail them back to the user. # --------------------------------------------------------------------------- _ACTION_LOG_DIR: Path = get_hermes_home() / "logs" # Short ``name`` (from the URL) → absolute log file path. _ACTION_LOG_FILES: Dict[str, str] = { "gateway-restart": "gateway-restart.log", "hermes-update": "hermes-update.log", } # ``name`` → most recently spawned Popen handle. Used so ``status`` can # report liveness and exit code without shelling out to ``ps``. _ACTION_PROCS: Dict[str, subprocess.Popen] = {} def _spawn_hermes_action(subcommand: List[str], name: str) -> subprocess.Popen: """Spawn ``hermes `` detached and record the Popen handle. Uses the running interpreter's ``hermes_cli.main`` module so the action inherits the same venv/PYTHONPATH the web server is using. """ log_file_name = _ACTION_LOG_FILES[name] _ACTION_LOG_DIR.mkdir(parents=True, exist_ok=True) log_path = _ACTION_LOG_DIR / log_file_name log_file = open(log_path, "ab", buffering=0) log_file.write( f"\n=== {name} started {time.strftime('%Y-%m-%d %H:%M:%S')} ===\n".encode() ) cmd = [sys.executable, "-m", "hermes_cli.main", *subcommand] popen_kwargs: Dict[str, Any] = { "cwd": str(PROJECT_ROOT), "stdin": subprocess.DEVNULL, "stdout": log_file, "stderr": subprocess.STDOUT, "env": {**os.environ, "HERMES_NONINTERACTIVE": "1"}, } if sys.platform == "win32": popen_kwargs["creationflags"] = ( subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined] | getattr(subprocess, "DETACHED_PROCESS", 0) ) else: popen_kwargs["start_new_session"] = True proc = subprocess.Popen(cmd, **popen_kwargs) _ACTION_PROCS[name] = proc return proc def _tail_lines(path: Path, n: int) -> List[str]: """Return the last ``n`` lines of ``path``. Reads the whole file — fine for our small per-action logs. Binary-decoded with ``errors='replace'`` so log corruption doesn't 500 the endpoint.""" if not path.exists(): return [] try: text = path.read_text(errors="replace") except OSError: return [] lines = text.splitlines() return lines[-n:] if n > 0 else lines @app.post("/api/gateway/restart") async def restart_gateway(): """Kick off a ``hermes gateway restart`` in the background.""" try: proc = _spawn_hermes_action(["gateway", "restart"], "gateway-restart") except Exception as exc: _log.exception("Failed to spawn gateway restart") raise HTTPException(status_code=500, detail=f"Failed to restart gateway: {exc}") return { "ok": True, "pid": proc.pid, "name": "gateway-restart", } @app.post("/api/hermes/update") async def update_hermes(): """Kick off ``hermes update`` in the background.""" try: proc = _spawn_hermes_action(["update"], "hermes-update") except Exception as exc: _log.exception("Failed to spawn hermes update") raise HTTPException(status_code=500, detail=f"Failed to start update: {exc}") return { "ok": True, "pid": proc.pid, "name": "hermes-update", } @app.get("/api/actions/{name}/status") async def get_action_status(name: str, lines: int = 200): """Tail an action log and report whether the process is still running.""" log_file_name = _ACTION_LOG_FILES.get(name) if log_file_name is None: raise HTTPException(status_code=404, detail=f"Unknown action: {name}") log_path = _ACTION_LOG_DIR / log_file_name tail = _tail_lines(log_path, min(max(lines, 1), 2000)) proc = _ACTION_PROCS.get(name) if proc is None: running = False exit_code: Optional[int] = None pid: Optional[int] = None else: exit_code = proc.poll() running = exit_code is None pid = proc.pid return { "name": name, "running": running, "exit_code": exit_code, "pid": pid, "lines": tail, } @app.get("/api/sessions") async def get_sessions(limit: int = 20, offset: int = 0): try: from hermes_state import SessionDB db = SessionDB() try: sessions = db.list_sessions_rich(limit=limit, offset=offset) total = db.session_count() now = time.time() for s in sessions: s["is_active"] = ( s.get("ended_at") is None and (now - s.get("last_active", s.get("started_at", 0))) < 300 ) return {"sessions": sessions, "total": total, "limit": limit, "offset": offset} finally: db.close() except Exception as e: _log.exception("GET /api/sessions failed") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/api/sessions/search") async def search_sessions(q: str = "", limit: int = 20): """Full-text search across session message content using FTS5.""" if not q or not q.strip(): return {"results": []} try: from hermes_state import SessionDB db = SessionDB() try: # Auto-add prefix wildcards so partial words match # e.g. "nimb" → "nimb*" matches "nimby" # Preserve quoted phrases and existing wildcards as-is import re terms = [] for token in re.findall(r'"[^"]*"|\S+', q.strip()): if token.startswith('"') or token.endswith("*"): terms.append(token) else: terms.append(token + "*") prefix_query = " ".join(terms) matches = db.search_messages(query=prefix_query, limit=limit) # Group by session_id — return unique sessions with their best snippet seen: dict = {} for m in matches: sid = m["session_id"] if sid not in seen: seen[sid] = { "session_id": sid, "snippet": m.get("snippet", ""), "role": m.get("role"), "source": m.get("source"), "model": m.get("model"), "session_started": m.get("session_started"), } return {"results": list(seen.values())} finally: db.close() except Exception: _log.exception("GET /api/sessions/search failed") raise HTTPException(status_code=500, detail="Search failed") def _normalize_config_for_web(config: Dict[str, Any]) -> Dict[str, Any]: """Normalize config for the web UI. Hermes supports ``model`` as either a bare string (``"anthropic/claude-sonnet-4"``) or a dict (``{default: ..., provider: ..., base_url: ...}``). The schema is built from DEFAULT_CONFIG where ``model`` is a string, but user configs often have the dict form. Normalize to the string form so the frontend schema matches. Also surfaces ``model_context_length`` as a top-level field so the web UI can display and edit it. A value of 0 means "auto-detect". """ config = dict(config) # shallow copy model_val = config.get("model") if isinstance(model_val, dict): # Extract context_length before flattening the dict ctx_len = model_val.get("context_length", 0) config["model"] = model_val.get("default", model_val.get("name", "")) config["model_context_length"] = ctx_len if isinstance(ctx_len, int) else 0 else: config["model_context_length"] = 0 return config @app.get("/api/config") async def get_config(): config = _normalize_config_for_web(load_config()) # Strip internal keys that the frontend shouldn't see or send back return {k: v for k, v in config.items() if not k.startswith("_")} @app.get("/api/config/defaults") async def get_defaults(): return DEFAULT_CONFIG @app.get("/api/config/schema") async def get_schema(): return {"fields": CONFIG_SCHEMA, "category_order": _CATEGORY_ORDER} _EMPTY_MODEL_INFO: dict = { "model": "", "provider": "", "auto_context_length": 0, "config_context_length": 0, "effective_context_length": 0, "capabilities": {}, } @app.get("/api/model/info") def get_model_info(): """Return resolved model metadata for the currently configured model. Calls the same context-length resolution chain the agent uses, so the frontend can display "Auto-detected: 200K" alongside the override field. Also returns model capabilities (vision, reasoning, tools) when available. """ try: cfg = load_config() model_cfg = cfg.get("model", "") # Extract model name and provider from the config if isinstance(model_cfg, dict): model_name = model_cfg.get("default", model_cfg.get("name", "")) provider = model_cfg.get("provider", "") base_url = model_cfg.get("base_url", "") config_ctx = model_cfg.get("context_length") else: model_name = str(model_cfg) if model_cfg else "" provider = "" base_url = "" config_ctx = None if not model_name: return dict(_EMPTY_MODEL_INFO, provider=provider) # Resolve auto-detected context length (pass config_ctx=None to get # purely auto-detected value, then separately report the override) try: from agent.model_metadata import get_model_context_length auto_ctx = get_model_context_length( model=model_name, base_url=base_url, provider=provider, config_context_length=None, # ignore override — we want auto value ) except Exception: auto_ctx = 0 config_ctx_int = 0 if isinstance(config_ctx, int) and config_ctx > 0: config_ctx_int = config_ctx # Effective is what the agent actually uses effective_ctx = config_ctx_int if config_ctx_int > 0 else auto_ctx # Try to get model capabilities from models.dev caps = {} try: from agent.models_dev import get_model_capabilities mc = get_model_capabilities(provider=provider, model=model_name) if mc is not None: caps = { "supports_tools": mc.supports_tools, "supports_vision": mc.supports_vision, "supports_reasoning": mc.supports_reasoning, "context_window": mc.context_window, "max_output_tokens": mc.max_output_tokens, "model_family": mc.model_family, } except Exception: pass return { "model": model_name, "provider": provider, "auto_context_length": auto_ctx, "config_context_length": config_ctx_int, "effective_context_length": effective_ctx, "capabilities": caps, } except Exception: _log.exception("GET /api/model/info failed") return dict(_EMPTY_MODEL_INFO) def _denormalize_config_from_web(config: Dict[str, Any]) -> Dict[str, Any]: """Reverse _normalize_config_for_web before saving. Reconstructs ``model`` as a dict by reading the current on-disk config to recover model subkeys (provider, base_url, api_mode, etc.) that were stripped from the GET response. The frontend only sees model as a flat string; the rest is preserved transparently. Also handles ``model_context_length`` — writes it back into the model dict as ``context_length``. A value of 0 or absent means "auto-detect" (omitted from the dict so get_model_context_length() uses its normal resolution). """ config = dict(config) # Remove any _model_meta that might have leaked in (shouldn't happen # with the stripped GET response, but be defensive) config.pop("_model_meta", None) # Extract and remove model_context_length before processing model ctx_override = config.pop("model_context_length", 0) if not isinstance(ctx_override, int): try: ctx_override = int(ctx_override) except (TypeError, ValueError): ctx_override = 0 model_val = config.get("model") if isinstance(model_val, str) and model_val: # Read the current disk config to recover model subkeys try: disk_config = load_config() disk_model = disk_config.get("model") if isinstance(disk_model, dict): # Preserve all subkeys, update default with the new value disk_model["default"] = model_val # Write context_length into the model dict (0 = remove/auto) if ctx_override > 0: disk_model["context_length"] = ctx_override else: disk_model.pop("context_length", None) config["model"] = disk_model else: # Model was previously a bare string — upgrade to dict if # user is setting a context_length override if ctx_override > 0: config["model"] = { "default": model_val, "context_length": ctx_override, } except Exception: pass # can't read disk config — just use the string form return config @app.put("/api/config") async def update_config(body: ConfigUpdate): try: save_config(_denormalize_config_from_web(body.config)) return {"ok": True} except Exception as e: _log.exception("PUT /api/config failed") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/api/env") async def get_env_vars(): env_on_disk = load_env() result = {} for var_name, info in OPTIONAL_ENV_VARS.items(): value = env_on_disk.get(var_name) result[var_name] = { "is_set": bool(value), "redacted_value": redact_key(value) if value else None, "description": info.get("description", ""), "url": info.get("url"), "category": info.get("category", ""), "is_password": info.get("password", False), "tools": info.get("tools", []), "advanced": info.get("advanced", False), } return result @app.put("/api/env") async def set_env_var(body: EnvVarUpdate): try: save_env_value(body.key, body.value) return {"ok": True, "key": body.key} except Exception as e: _log.exception("PUT /api/env failed") raise HTTPException(status_code=500, detail="Internal server error") @app.delete("/api/env") async def remove_env_var(body: EnvVarDelete): try: removed = remove_env_value(body.key) if not removed: raise HTTPException(status_code=404, detail=f"{body.key} not found in .env") return {"ok": True, "key": body.key} except HTTPException: raise except Exception as e: _log.exception("DELETE /api/env failed") raise HTTPException(status_code=500, detail="Internal server error") @app.post("/api/env/reveal") async def reveal_env_var(body: EnvVarReveal, request: Request): """Return the real (unredacted) value of a single env var. Protected by: - Ephemeral session token (generated per server start, injected into SPA) - Rate limiting (max 5 reveals per 30s window) - Audit logging """ # --- Token check --- _require_token(request) # --- Rate limit --- now = time.time() cutoff = now - _REVEAL_WINDOW_SECONDS _reveal_timestamps[:] = [t for t in _reveal_timestamps if t > cutoff] if len(_reveal_timestamps) >= _REVEAL_MAX_PER_WINDOW: raise HTTPException(status_code=429, detail="Too many reveal requests. Try again shortly.") _reveal_timestamps.append(now) # --- Reveal --- env_on_disk = load_env() value = env_on_disk.get(body.key) if value is None: raise HTTPException(status_code=404, detail=f"{body.key} not found in .env") _log.info("env/reveal: %s", body.key) return {"key": body.key, "value": value} # --------------------------------------------------------------------------- # OAuth provider endpoints — status + disconnect (Phase 1) # --------------------------------------------------------------------------- # # Phase 1 surfaces *which OAuth providers exist* and whether each is # connected, plus a disconnect button. The actual login flow (PKCE for # Anthropic, device-code for Nous/Codex) still runs in the CLI for now; # Phase 2 will add in-browser flows. For unconnected providers we return # the canonical ``hermes auth add `` command so the dashboard # can surface a one-click copy. def _truncate_token(value: Optional[str], visible: int = 6) -> str: """Return ``...XXXXXX`` (last N chars) for safe display in the UI. We never expose more than the trailing ``visible`` characters of an OAuth access token. JWT prefixes (the part before the first dot) are stripped first when present so the visible suffix is always part of the signing region rather than a meaningless header chunk. """ if not value: return "" s = str(value) if "." in s and s.count(".") >= 2: # Looks like a JWT — show the trailing piece of the signature only. s = s.rsplit(".", 1)[-1] if len(s) <= visible: return s return f"…{s[-visible:]}" def _anthropic_oauth_status() -> Dict[str, Any]: """Combined status across the three Anthropic credential sources we read. Hermes resolves Anthropic creds in this order at runtime: 1. ``~/.hermes/.anthropic_oauth.json`` — Hermes-managed PKCE flow 2. ``~/.claude/.credentials.json`` — Claude Code CLI credentials (auto) 3. ``ANTHROPIC_TOKEN`` / ``ANTHROPIC_API_KEY`` env vars The dashboard reports the highest-priority source that's actually present. """ try: from agent.anthropic_adapter import ( read_hermes_oauth_credentials, read_claude_code_credentials, _HERMES_OAUTH_FILE, ) except ImportError: read_claude_code_credentials = None # type: ignore read_hermes_oauth_credentials = None # type: ignore _HERMES_OAUTH_FILE = None # type: ignore hermes_creds = None if read_hermes_oauth_credentials: try: hermes_creds = read_hermes_oauth_credentials() except Exception: hermes_creds = None if hermes_creds and hermes_creds.get("accessToken"): return { "logged_in": True, "source": "hermes_pkce", "source_label": f"Hermes PKCE ({_HERMES_OAUTH_FILE})", "token_preview": _truncate_token(hermes_creds.get("accessToken")), "expires_at": hermes_creds.get("expiresAt"), "has_refresh_token": bool(hermes_creds.get("refreshToken")), } cc_creds = None if read_claude_code_credentials: try: cc_creds = read_claude_code_credentials() except Exception: cc_creds = None if cc_creds and cc_creds.get("accessToken"): return { "logged_in": True, "source": "claude_code", "source_label": "Claude Code (~/.claude/.credentials.json)", "token_preview": _truncate_token(cc_creds.get("accessToken")), "expires_at": cc_creds.get("expiresAt"), "has_refresh_token": bool(cc_creds.get("refreshToken")), } env_token = os.getenv("ANTHROPIC_TOKEN") or os.getenv("CLAUDE_CODE_OAUTH_TOKEN") if env_token: return { "logged_in": True, "source": "env_var", "source_label": "ANTHROPIC_TOKEN environment variable", "token_preview": _truncate_token(env_token), "expires_at": None, "has_refresh_token": False, } return {"logged_in": False, "source": None} def _claude_code_only_status() -> Dict[str, Any]: """Surface Claude Code CLI credentials as their own provider entry. Independent of the Anthropic entry above so users can see whether their Claude Code subscription tokens are actively flowing into Hermes even when they also have a separate Hermes-managed PKCE login. """ try: from agent.anthropic_adapter import read_claude_code_credentials creds = read_claude_code_credentials() except Exception: creds = None if creds and creds.get("accessToken"): return { "logged_in": True, "source": "claude_code_cli", "source_label": "~/.claude/.credentials.json", "token_preview": _truncate_token(creds.get("accessToken")), "expires_at": creds.get("expiresAt"), "has_refresh_token": bool(creds.get("refreshToken")), } return {"logged_in": False, "source": None} # Provider catalog. The order matters — it's how we render the UI list. # ``cli_command`` is what the dashboard surfaces as the copy-to-clipboard # fallback while Phase 2 (in-browser flows) isn't built yet. # ``flow`` describes the OAuth shape so the future modal can pick the # right UI: ``pkce`` = open URL + paste callback code, ``device_code`` = # show code + verification URL + poll, ``external`` = read-only (delegated # to a third-party CLI like Claude Code or Qwen). _OAUTH_PROVIDER_CATALOG: tuple[Dict[str, Any], ...] = ( { "id": "anthropic", "name": "Anthropic (Claude API)", "flow": "pkce", "cli_command": "hermes auth add anthropic", "docs_url": "https://docs.claude.com/en/api/getting-started", "status_fn": _anthropic_oauth_status, }, { "id": "claude-code", "name": "Claude Code (subscription)", "flow": "external", "cli_command": "claude setup-token", "docs_url": "https://docs.claude.com/en/docs/claude-code", "status_fn": _claude_code_only_status, }, { "id": "nous", "name": "Nous Portal", "flow": "device_code", "cli_command": "hermes auth add nous", "docs_url": "https://portal.nousresearch.com", "status_fn": None, # dispatched via auth.get_nous_auth_status }, { "id": "openai-codex", "name": "OpenAI Codex (ChatGPT)", "flow": "device_code", "cli_command": "hermes auth add openai-codex", "docs_url": "https://platform.openai.com/docs", "status_fn": None, # dispatched via auth.get_codex_auth_status }, { "id": "qwen-oauth", "name": "Qwen (via Qwen CLI)", "flow": "external", "cli_command": "hermes auth add qwen-oauth", "docs_url": "https://github.com/QwenLM/qwen-code", "status_fn": None, # dispatched via auth.get_qwen_auth_status }, ) def _resolve_provider_status(provider_id: str, status_fn) -> Dict[str, Any]: """Dispatch to the right status helper for an OAuth provider entry.""" if status_fn is not None: try: return status_fn() except Exception as e: return {"logged_in": False, "error": str(e)} try: from hermes_cli import auth as hauth if provider_id == "nous": raw = hauth.get_nous_auth_status() return { "logged_in": bool(raw.get("logged_in")), "source": "nous_portal", "source_label": raw.get("portal_base_url") or "Nous Portal", "token_preview": _truncate_token(raw.get("access_token")), "expires_at": raw.get("access_expires_at"), "has_refresh_token": bool(raw.get("has_refresh_token")), } if provider_id == "openai-codex": raw = hauth.get_codex_auth_status() return { "logged_in": bool(raw.get("logged_in")), "source": raw.get("source") or "openai_codex", "source_label": raw.get("auth_mode") or "OpenAI Codex", "token_preview": _truncate_token(raw.get("api_key")), "expires_at": None, "has_refresh_token": False, "last_refresh": raw.get("last_refresh"), } if provider_id == "qwen-oauth": raw = hauth.get_qwen_auth_status() return { "logged_in": bool(raw.get("logged_in")), "source": "qwen_cli", "source_label": raw.get("auth_store_path") or "Qwen CLI", "token_preview": _truncate_token(raw.get("access_token")), "expires_at": raw.get("expires_at"), "has_refresh_token": bool(raw.get("has_refresh_token")), } except Exception as e: return {"logged_in": False, "error": str(e)} return {"logged_in": False} @app.get("/api/providers/oauth") async def list_oauth_providers(): """Enumerate every OAuth-capable LLM provider with current status. Response shape (per provider): id stable identifier (used in DELETE path) name human label flow "pkce" | "device_code" | "external" cli_command fallback CLI command for users to run manually docs_url external docs/portal link for the "Learn more" link status: logged_in bool — currently has usable creds source short slug ("hermes_pkce", "claude_code", ...) source_label human-readable origin (file path, env var name) token_preview last N chars of the token, never the full token expires_at ISO timestamp string or null has_refresh_token bool """ providers = [] for p in _OAUTH_PROVIDER_CATALOG: status = _resolve_provider_status(p["id"], p.get("status_fn")) providers.append({ "id": p["id"], "name": p["name"], "flow": p["flow"], "cli_command": p["cli_command"], "docs_url": p["docs_url"], "status": status, }) return {"providers": providers} @app.delete("/api/providers/oauth/{provider_id}") async def disconnect_oauth_provider(provider_id: str, request: Request): """Disconnect an OAuth provider. Token-protected (matches /env/reveal).""" _require_token(request) valid_ids = {p["id"] for p in _OAUTH_PROVIDER_CATALOG} if provider_id not in valid_ids: raise HTTPException( status_code=400, detail=f"Unknown provider: {provider_id}. " f"Available: {', '.join(sorted(valid_ids))}", ) # Anthropic and claude-code clear the same Hermes-managed PKCE file # AND forget the Claude Code import. We don't touch ~/.claude/* directly # — that's owned by the Claude Code CLI; users can re-auth there if they # want to undo a disconnect. if provider_id in ("anthropic", "claude-code"): try: from agent.anthropic_adapter import _HERMES_OAUTH_FILE if _HERMES_OAUTH_FILE.exists(): _HERMES_OAUTH_FILE.unlink() except Exception: pass # Also clear the credential pool entry if present. try: from hermes_cli.auth import clear_provider_auth clear_provider_auth("anthropic") except Exception: pass _log.info("oauth/disconnect: %s", provider_id) return {"ok": True, "provider": provider_id} try: from hermes_cli.auth import clear_provider_auth cleared = clear_provider_auth(provider_id) _log.info("oauth/disconnect: %s (cleared=%s)", provider_id, cleared) return {"ok": bool(cleared), "provider": provider_id} except Exception as e: _log.exception("disconnect %s failed", provider_id) raise HTTPException(status_code=500, detail=str(e)) # --------------------------------------------------------------------------- # OAuth Phase 2 — in-browser PKCE & device-code flows # --------------------------------------------------------------------------- # # Two flow shapes are supported: # # PKCE (Anthropic): # 1. POST /api/providers/oauth/anthropic/start # → server generates code_verifier + challenge, builds claude.ai # authorize URL, stashes verifier in _oauth_sessions[session_id] # → returns { session_id, flow: "pkce", auth_url } # 2. UI opens auth_url in a new tab. User authorizes, copies code. # 3. POST /api/providers/oauth/anthropic/submit { session_id, code } # → server exchanges (code + verifier) → tokens at console.anthropic.com # → persists to ~/.hermes/.anthropic_oauth.json AND credential pool # → returns { ok: true, status: "approved" } # # Device code (Nous, OpenAI Codex): # 1. POST /api/providers/oauth/{nous|openai-codex}/start # → server hits provider's device-auth endpoint # → gets { user_code, verification_url, device_code, interval, expires_in } # → spawns background poller thread that polls the token endpoint # every `interval` seconds until approved/expired # → stores poll status in _oauth_sessions[session_id] # → returns { session_id, flow: "device_code", user_code, # verification_url, expires_in, poll_interval } # 2. UI opens verification_url in a new tab and shows user_code. # 3. UI polls GET /api/providers/oauth/{provider}/poll/{session_id} # every 2s until status != "pending". # 4. On "approved" the background thread has already saved creds; UI # refreshes the providers list. # # Sessions are kept in-memory only (single-process FastAPI) and time out # after 15 minutes. A periodic cleanup runs on each /start call to GC # expired sessions so the dict doesn't grow without bound. _OAUTH_SESSION_TTL_SECONDS = 15 * 60 _oauth_sessions: Dict[str, Dict[str, Any]] = {} _oauth_sessions_lock = threading.Lock() # Import OAuth constants from canonical source instead of duplicating. # Guarded so hermes web still starts if anthropic_adapter is unavailable; # Phase 2 endpoints will return 501 in that case. try: from agent.anthropic_adapter import ( _OAUTH_CLIENT_ID as _ANTHROPIC_OAUTH_CLIENT_ID, _OAUTH_TOKEN_URL as _ANTHROPIC_OAUTH_TOKEN_URL, _OAUTH_REDIRECT_URI as _ANTHROPIC_OAUTH_REDIRECT_URI, _OAUTH_SCOPES as _ANTHROPIC_OAUTH_SCOPES, _generate_pkce as _generate_pkce_pair, ) _ANTHROPIC_OAUTH_AVAILABLE = True except ImportError: _ANTHROPIC_OAUTH_AVAILABLE = False _ANTHROPIC_OAUTH_AUTHORIZE_URL = "https://claude.ai/oauth/authorize" def _gc_oauth_sessions() -> None: """Drop expired sessions. Called opportunistically on /start.""" cutoff = time.time() - _OAUTH_SESSION_TTL_SECONDS with _oauth_sessions_lock: stale = [sid for sid, sess in _oauth_sessions.items() if sess["created_at"] < cutoff] for sid in stale: _oauth_sessions.pop(sid, None) def _new_oauth_session(provider_id: str, flow: str) -> tuple[str, Dict[str, Any]]: """Create + register a new OAuth session, return (session_id, session_dict).""" sid = secrets.token_urlsafe(16) sess = { "session_id": sid, "provider": provider_id, "flow": flow, "created_at": time.time(), "status": "pending", # pending | approved | denied | expired | error "error_message": None, } with _oauth_sessions_lock: _oauth_sessions[sid] = sess return sid, sess def _save_anthropic_oauth_creds(access_token: str, refresh_token: str, expires_at_ms: int) -> None: """Persist Anthropic PKCE creds to both Hermes file AND credential pool. Mirrors what auth_commands.add_command does so the dashboard flow leaves the system in the same state as ``hermes auth add anthropic``. """ from agent.anthropic_adapter import _HERMES_OAUTH_FILE payload = { "accessToken": access_token, "refreshToken": refresh_token, "expiresAt": expires_at_ms, } _HERMES_OAUTH_FILE.parent.mkdir(parents=True, exist_ok=True) _HERMES_OAUTH_FILE.write_text(json.dumps(payload, indent=2), encoding="utf-8") # Best-effort credential-pool insert. Failure here doesn't invalidate # the file write — pool registration only matters for the rotation # strategy, not for runtime credential resolution. try: from agent.credential_pool import ( PooledCredential, load_pool, AUTH_TYPE_OAUTH, SOURCE_MANUAL, ) import uuid pool = load_pool("anthropic") # Avoid duplicate entries: delete any prior dashboard-issued OAuth entry existing = [e for e in pool.entries() if getattr(e, "source", "").startswith(f"{SOURCE_MANUAL}:dashboard_pkce")] for e in existing: try: pool.remove_entry(getattr(e, "id", "")) except Exception: pass entry = PooledCredential( provider="anthropic", id=uuid.uuid4().hex[:6], label="dashboard PKCE", auth_type=AUTH_TYPE_OAUTH, priority=0, source=f"{SOURCE_MANUAL}:dashboard_pkce", access_token=access_token, refresh_token=refresh_token, expires_at_ms=expires_at_ms, ) pool.add_entry(entry) except Exception as e: _log.warning("anthropic pool add (dashboard) failed: %s", e) def _start_anthropic_pkce() -> Dict[str, Any]: """Begin PKCE flow. Returns the auth URL the UI should open.""" if not _ANTHROPIC_OAUTH_AVAILABLE: raise HTTPException(status_code=501, detail="Anthropic OAuth not available (missing adapter)") verifier, challenge = _generate_pkce_pair() sid, sess = _new_oauth_session("anthropic", "pkce") sess["verifier"] = verifier sess["state"] = verifier # Anthropic round-trips verifier as state params = { "code": "true", "client_id": _ANTHROPIC_OAUTH_CLIENT_ID, "response_type": "code", "redirect_uri": _ANTHROPIC_OAUTH_REDIRECT_URI, "scope": _ANTHROPIC_OAUTH_SCOPES, "code_challenge": challenge, "code_challenge_method": "S256", "state": verifier, } auth_url = f"{_ANTHROPIC_OAUTH_AUTHORIZE_URL}?{urllib.parse.urlencode(params)}" return { "session_id": sid, "flow": "pkce", "auth_url": auth_url, "expires_in": _OAUTH_SESSION_TTL_SECONDS, } def _submit_anthropic_pkce(session_id: str, code_input: str) -> Dict[str, Any]: """Exchange authorization code for tokens. Persists on success.""" with _oauth_sessions_lock: sess = _oauth_sessions.get(session_id) if not sess or sess["provider"] != "anthropic" or sess["flow"] != "pkce": raise HTTPException(status_code=404, detail="Unknown or expired session") if sess["status"] != "pending": return {"ok": False, "status": sess["status"], "message": sess.get("error_message")} # Anthropic's redirect callback page formats the code as `#`. # Strip the state suffix if present (we already have the verifier server-side). parts = code_input.strip().split("#", 1) code = parts[0].strip() if not code: return {"ok": False, "status": "error", "message": "No code provided"} state_from_callback = parts[1] if len(parts) > 1 else "" exchange_data = json.dumps({ "grant_type": "authorization_code", "client_id": _ANTHROPIC_OAUTH_CLIENT_ID, "code": code, "state": state_from_callback or sess["state"], "redirect_uri": _ANTHROPIC_OAUTH_REDIRECT_URI, "code_verifier": sess["verifier"], }).encode() req = urllib.request.Request( _ANTHROPIC_OAUTH_TOKEN_URL, data=exchange_data, headers={ "Content-Type": "application/json", "User-Agent": "hermes-dashboard/1.0", }, method="POST", ) try: with urllib.request.urlopen(req, timeout=20) as resp: result = json.loads(resp.read().decode()) except Exception as e: sess["status"] = "error" sess["error_message"] = f"Token exchange failed: {e}" return {"ok": False, "status": "error", "message": sess["error_message"]} access_token = result.get("access_token", "") refresh_token = result.get("refresh_token", "") expires_in = int(result.get("expires_in") or 3600) if not access_token: sess["status"] = "error" sess["error_message"] = "No access token returned" return {"ok": False, "status": "error", "message": sess["error_message"]} expires_at_ms = int(time.time() * 1000) + (expires_in * 1000) try: _save_anthropic_oauth_creds(access_token, refresh_token, expires_at_ms) except Exception as e: sess["status"] = "error" sess["error_message"] = f"Save failed: {e}" return {"ok": False, "status": "error", "message": sess["error_message"]} sess["status"] = "approved" _log.info("oauth/pkce: anthropic login completed (session=%s)", session_id) return {"ok": True, "status": "approved"} async def _start_device_code_flow(provider_id: str) -> Dict[str, Any]: """Initiate a device-code flow (Nous or OpenAI Codex). Calls the provider's device-auth endpoint via the existing CLI helpers, then spawns a background poller. Returns the user-facing display fields so the UI can render the verification page link + user code. """ from hermes_cli import auth as hauth if provider_id == "nous": from hermes_cli.auth import _request_device_code, PROVIDER_REGISTRY import httpx pconfig = PROVIDER_REGISTRY["nous"] portal_base_url = ( os.getenv("HERMES_PORTAL_BASE_URL") or os.getenv("NOUS_PORTAL_BASE_URL") or pconfig.portal_base_url ).rstrip("/") client_id = pconfig.client_id scope = pconfig.scope def _do_nous_device_request(): with httpx.Client(timeout=httpx.Timeout(15.0), headers={"Accept": "application/json"}) as client: return _request_device_code( client=client, portal_base_url=portal_base_url, client_id=client_id, scope=scope, ) device_data = await asyncio.get_event_loop().run_in_executor(None, _do_nous_device_request) sid, sess = _new_oauth_session("nous", "device_code") sess["device_code"] = str(device_data["device_code"]) sess["interval"] = int(device_data["interval"]) sess["expires_at"] = time.time() + int(device_data["expires_in"]) sess["portal_base_url"] = portal_base_url sess["client_id"] = client_id threading.Thread( target=_nous_poller, args=(sid,), daemon=True, name=f"oauth-poll-{sid[:6]}" ).start() return { "session_id": sid, "flow": "device_code", "user_code": str(device_data["user_code"]), "verification_url": str(device_data["verification_uri_complete"]), "expires_in": int(device_data["expires_in"]), "poll_interval": int(device_data["interval"]), } if provider_id == "openai-codex": # Codex uses fixed OpenAI device-auth endpoints; reuse the helper. sid, _ = _new_oauth_session("openai-codex", "device_code") # Use the helper but in a thread because it polls inline. # We can't extract just the start step without refactoring auth.py, # so we run the full helper in a worker and proxy the user_code + # verification_url back via the session dict. The helper prints # to stdout — we capture nothing here, just status. threading.Thread( target=_codex_full_login_worker, args=(sid,), daemon=True, name=f"oauth-codex-{sid[:6]}", ).start() # Block briefly until the worker has populated the user_code, OR error. deadline = time.time() + 10 while time.time() < deadline: with _oauth_sessions_lock: s = _oauth_sessions.get(sid) if s and (s.get("user_code") or s["status"] != "pending"): break await asyncio.sleep(0.1) with _oauth_sessions_lock: s = _oauth_sessions.get(sid, {}) if s.get("status") == "error": raise HTTPException(status_code=500, detail=s.get("error_message") or "device-auth failed") if not s.get("user_code"): raise HTTPException(status_code=504, detail="device-auth timed out before returning a user code") return { "session_id": sid, "flow": "device_code", "user_code": s["user_code"], "verification_url": s["verification_url"], "expires_in": int(s.get("expires_in") or 900), "poll_interval": int(s.get("interval") or 5), } raise HTTPException(status_code=400, detail=f"Provider {provider_id} does not support device-code flow") def _nous_poller(session_id: str) -> None: """Background poller that drives a Nous device-code flow to completion.""" from hermes_cli.auth import _poll_for_token, refresh_nous_oauth_from_state from datetime import datetime, timezone import httpx with _oauth_sessions_lock: sess = _oauth_sessions.get(session_id) if not sess: return portal_base_url = sess["portal_base_url"] client_id = sess["client_id"] device_code = sess["device_code"] interval = sess["interval"] expires_in = max(60, int(sess["expires_at"] - time.time())) try: with httpx.Client(timeout=httpx.Timeout(15.0), headers={"Accept": "application/json"}) as client: token_data = _poll_for_token( client=client, portal_base_url=portal_base_url, client_id=client_id, device_code=device_code, expires_in=expires_in, poll_interval=interval, ) # Same post-processing as _nous_device_code_login (mint agent key) now = datetime.now(timezone.utc) token_ttl = int(token_data.get("expires_in") or 0) auth_state = { "portal_base_url": portal_base_url, "inference_base_url": token_data.get("inference_base_url"), "client_id": client_id, "scope": token_data.get("scope"), "token_type": token_data.get("token_type", "Bearer"), "access_token": token_data["access_token"], "refresh_token": token_data.get("refresh_token"), "obtained_at": now.isoformat(), "expires_at": ( datetime.fromtimestamp(now.timestamp() + token_ttl, tz=timezone.utc).isoformat() if token_ttl else None ), "expires_in": token_ttl, } full_state = refresh_nous_oauth_from_state( auth_state, min_key_ttl_seconds=300, timeout_seconds=15.0, force_refresh=False, force_mint=True, ) from hermes_cli.auth import persist_nous_credentials persist_nous_credentials(full_state) with _oauth_sessions_lock: sess["status"] = "approved" _log.info("oauth/device: nous login completed (session=%s)", session_id) except Exception as e: _log.warning("nous device-code poll failed (session=%s): %s", session_id, e) with _oauth_sessions_lock: sess["status"] = "error" sess["error_message"] = str(e) def _codex_full_login_worker(session_id: str) -> None: """Run the complete OpenAI Codex device-code flow. Codex doesn't use the standard OAuth device-code endpoints; it has its own ``/api/accounts/deviceauth/usercode`` (JSON body, returns ``device_auth_id``) and ``/api/accounts/deviceauth/token`` (JSON body polled until 200). On success the response carries an ``authorization_code`` + ``code_verifier`` that get exchanged at CODEX_OAUTH_TOKEN_URL with grant_type=authorization_code. The flow is replicated inline (rather than calling _codex_device_code_login) because that helper prints/blocks/polls in a single function — we need to surface the user_code to the dashboard the moment we receive it, well before polling completes. """ try: import httpx from hermes_cli.auth import ( CODEX_OAUTH_CLIENT_ID, CODEX_OAUTH_TOKEN_URL, DEFAULT_CODEX_BASE_URL, ) issuer = "https://auth.openai.com" # Step 1: request device code with httpx.Client(timeout=httpx.Timeout(15.0)) as client: resp = client.post( f"{issuer}/api/accounts/deviceauth/usercode", json={"client_id": CODEX_OAUTH_CLIENT_ID}, headers={"Content-Type": "application/json"}, ) if resp.status_code != 200: raise RuntimeError(f"deviceauth/usercode returned {resp.status_code}") device_data = resp.json() user_code = device_data.get("user_code", "") device_auth_id = device_data.get("device_auth_id", "") poll_interval = max(3, int(device_data.get("interval", "5"))) if not user_code or not device_auth_id: raise RuntimeError("device-code response missing user_code or device_auth_id") verification_url = f"{issuer}/codex/device" with _oauth_sessions_lock: sess = _oauth_sessions.get(session_id) if not sess: return sess["user_code"] = user_code sess["verification_url"] = verification_url sess["device_auth_id"] = device_auth_id sess["interval"] = poll_interval sess["expires_in"] = 15 * 60 # OpenAI's effective limit sess["expires_at"] = time.time() + sess["expires_in"] # Step 2: poll until authorized deadline = time.time() + sess["expires_in"] code_resp = None with httpx.Client(timeout=httpx.Timeout(15.0)) as client: while time.time() < deadline: time.sleep(poll_interval) poll = client.post( f"{issuer}/api/accounts/deviceauth/token", json={"device_auth_id": device_auth_id, "user_code": user_code}, headers={"Content-Type": "application/json"}, ) if poll.status_code == 200: code_resp = poll.json() break if poll.status_code in (403, 404): continue # user hasn't authorized yet raise RuntimeError(f"deviceauth/token poll returned {poll.status_code}") if code_resp is None: with _oauth_sessions_lock: sess["status"] = "expired" sess["error_message"] = "Device code expired before approval" return # Step 3: exchange authorization_code for tokens authorization_code = code_resp.get("authorization_code", "") code_verifier = code_resp.get("code_verifier", "") if not authorization_code or not code_verifier: raise RuntimeError("device-auth response missing authorization_code/code_verifier") with httpx.Client(timeout=httpx.Timeout(15.0)) as client: token_resp = client.post( CODEX_OAUTH_TOKEN_URL, data={ "grant_type": "authorization_code", "code": authorization_code, "redirect_uri": f"{issuer}/deviceauth/callback", "client_id": CODEX_OAUTH_CLIENT_ID, "code_verifier": code_verifier, }, headers={"Content-Type": "application/x-www-form-urlencoded"}, ) if token_resp.status_code != 200: raise RuntimeError(f"token exchange returned {token_resp.status_code}") tokens = token_resp.json() access_token = tokens.get("access_token", "") refresh_token = tokens.get("refresh_token", "") if not access_token: raise RuntimeError("token exchange did not return access_token") # Persist via credential pool — same shape as auth_commands.add_command from agent.credential_pool import ( PooledCredential, load_pool, AUTH_TYPE_OAUTH, SOURCE_MANUAL, ) import uuid as _uuid pool = load_pool("openai-codex") base_url = ( os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/") or DEFAULT_CODEX_BASE_URL ) entry = PooledCredential( provider="openai-codex", id=_uuid.uuid4().hex[:6], label="dashboard device_code", auth_type=AUTH_TYPE_OAUTH, priority=0, source=f"{SOURCE_MANUAL}:dashboard_device_code", access_token=access_token, refresh_token=refresh_token, base_url=base_url, ) pool.add_entry(entry) with _oauth_sessions_lock: sess["status"] = "approved" _log.info("oauth/device: openai-codex login completed (session=%s)", session_id) except Exception as e: _log.warning("codex device-code worker failed (session=%s): %s", session_id, e) with _oauth_sessions_lock: s = _oauth_sessions.get(session_id) if s: s["status"] = "error" s["error_message"] = str(e) @app.post("/api/providers/oauth/{provider_id}/start") async def start_oauth_login(provider_id: str, request: Request): """Initiate an OAuth login flow. Token-protected.""" _require_token(request) _gc_oauth_sessions() valid = {p["id"] for p in _OAUTH_PROVIDER_CATALOG} if provider_id not in valid: raise HTTPException(status_code=400, detail=f"Unknown provider {provider_id}") catalog_entry = next(p for p in _OAUTH_PROVIDER_CATALOG if p["id"] == provider_id) if catalog_entry["flow"] == "external": raise HTTPException( status_code=400, detail=f"{provider_id} uses an external CLI; run `{catalog_entry['cli_command']}` manually", ) try: if catalog_entry["flow"] == "pkce": return _start_anthropic_pkce() if catalog_entry["flow"] == "device_code": return await _start_device_code_flow(provider_id) except HTTPException: raise except Exception as e: _log.exception("oauth/start %s failed", provider_id) raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=400, detail="Unsupported flow") class OAuthSubmitBody(BaseModel): session_id: str code: str @app.post("/api/providers/oauth/{provider_id}/submit") async def submit_oauth_code(provider_id: str, body: OAuthSubmitBody, request: Request): """Submit the auth code for PKCE flows. Token-protected.""" _require_token(request) if provider_id == "anthropic": return await asyncio.get_event_loop().run_in_executor( None, _submit_anthropic_pkce, body.session_id, body.code, ) raise HTTPException(status_code=400, detail=f"submit not supported for {provider_id}") @app.get("/api/providers/oauth/{provider_id}/poll/{session_id}") async def poll_oauth_session(provider_id: str, session_id: str): """Poll a device-code session's status (no auth — read-only state).""" with _oauth_sessions_lock: sess = _oauth_sessions.get(session_id) if not sess: raise HTTPException(status_code=404, detail="Session not found or expired") if sess["provider"] != provider_id: raise HTTPException(status_code=400, detail="Provider mismatch for session") return { "session_id": session_id, "status": sess["status"], "error_message": sess.get("error_message"), "expires_at": sess.get("expires_at"), } @app.delete("/api/providers/oauth/sessions/{session_id}") async def cancel_oauth_session(session_id: str, request: Request): """Cancel a pending OAuth session. Token-protected.""" _require_token(request) with _oauth_sessions_lock: sess = _oauth_sessions.pop(session_id, None) if sess is None: return {"ok": False, "message": "session not found"} return {"ok": True, "session_id": session_id} # --------------------------------------------------------------------------- # Session detail endpoints # --------------------------------------------------------------------------- @app.get("/api/sessions/{session_id}") async def get_session_detail(session_id: str): from hermes_state import SessionDB db = SessionDB() try: sid = db.resolve_session_id(session_id) session = db.get_session(sid) if sid else None if not session: raise HTTPException(status_code=404, detail="Session not found") return session finally: db.close() @app.get("/api/sessions/{session_id}/messages") async def get_session_messages(session_id: str): from hermes_state import SessionDB db = SessionDB() try: sid = db.resolve_session_id(session_id) if not sid: raise HTTPException(status_code=404, detail="Session not found") messages = db.get_messages(sid) return {"session_id": sid, "messages": messages} finally: db.close() @app.delete("/api/sessions/{session_id}") async def delete_session_endpoint(session_id: str): from hermes_state import SessionDB db = SessionDB() try: if not db.delete_session(session_id): raise HTTPException(status_code=404, detail="Session not found") return {"ok": True} finally: db.close() # --------------------------------------------------------------------------- # Log viewer endpoint # --------------------------------------------------------------------------- @app.get("/api/logs") async def get_logs( file: str = "agent", lines: int = 100, level: Optional[str] = None, component: Optional[str] = None, search: Optional[str] = None, ): from hermes_cli.logs import _read_tail, LOG_FILES log_name = LOG_FILES.get(file) if not log_name: raise HTTPException(status_code=400, detail=f"Unknown log file: {file}") log_path = get_hermes_home() / "logs" / log_name if not log_path.exists(): return {"file": file, "lines": []} try: from hermes_logging import COMPONENT_PREFIXES except ImportError: COMPONENT_PREFIXES = {} # Normalize "ALL" / "all" / empty → no filter. _matches_filters treats an # empty tuple as "must match a prefix" (startswith(()) is always False), # so passing () instead of None silently drops every line. min_level = level if level and level.upper() != "ALL" else None if component and component.lower() != "all": comp_prefixes = COMPONENT_PREFIXES.get(component) if comp_prefixes is None: raise HTTPException( status_code=400, detail=f"Unknown component: {component}. " f"Available: {', '.join(sorted(COMPONENT_PREFIXES))}", ) else: comp_prefixes = None has_filters = bool(min_level or comp_prefixes or search) result = _read_tail( log_path, min(lines, 500) if not search else 2000, has_filters=has_filters, min_level=min_level, component_prefixes=comp_prefixes, ) # Post-filter by search term (case-insensitive substring match). # _read_tail doesn't support free-text search, so we filter here and # trim to the requested line count afterward. if search: needle = search.lower() result = [l for l in result if needle in l.lower()][-min(lines, 500):] return {"file": file, "lines": result} # --------------------------------------------------------------------------- # Cron job management endpoints # --------------------------------------------------------------------------- class CronJobCreate(BaseModel): prompt: str schedule: str name: str = "" deliver: str = "local" class CronJobUpdate(BaseModel): updates: dict @app.get("/api/cron/jobs") async def list_cron_jobs(): from cron.jobs import list_jobs return list_jobs(include_disabled=True) @app.get("/api/cron/jobs/{job_id}") async def get_cron_job(job_id: str): from cron.jobs import get_job job = get_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.post("/api/cron/jobs") async def create_cron_job(body: CronJobCreate): from cron.jobs import create_job try: job = create_job(prompt=body.prompt, schedule=body.schedule, name=body.name, deliver=body.deliver) return job except Exception as e: _log.exception("POST /api/cron/jobs failed") raise HTTPException(status_code=400, detail=str(e)) @app.put("/api/cron/jobs/{job_id}") async def update_cron_job(job_id: str, body: CronJobUpdate): from cron.jobs import update_job job = update_job(job_id, body.updates) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.post("/api/cron/jobs/{job_id}/pause") async def pause_cron_job(job_id: str): from cron.jobs import pause_job job = pause_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.post("/api/cron/jobs/{job_id}/resume") async def resume_cron_job(job_id: str): from cron.jobs import resume_job job = resume_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.post("/api/cron/jobs/{job_id}/trigger") async def trigger_cron_job(job_id: str): from cron.jobs import trigger_job job = trigger_job(job_id) if not job: raise HTTPException(status_code=404, detail="Job not found") return job @app.delete("/api/cron/jobs/{job_id}") async def delete_cron_job(job_id: str): from cron.jobs import remove_job if not remove_job(job_id): raise HTTPException(status_code=404, detail="Job not found") return {"ok": True} # --------------------------------------------------------------------------- # Skills & Tools endpoints # --------------------------------------------------------------------------- class SkillToggle(BaseModel): name: str enabled: bool @app.get("/api/skills") async def get_skills(): from tools.skills_tool import _find_all_skills from hermes_cli.skills_config import get_disabled_skills config = load_config() disabled = get_disabled_skills(config) skills = _find_all_skills(skip_disabled=True) for s in skills: s["enabled"] = s["name"] not in disabled return skills @app.put("/api/skills/toggle") async def toggle_skill(body: SkillToggle): from hermes_cli.skills_config import get_disabled_skills, save_disabled_skills config = load_config() disabled = get_disabled_skills(config) if body.enabled: disabled.discard(body.name) else: disabled.add(body.name) save_disabled_skills(config, disabled) return {"ok": True, "name": body.name, "enabled": body.enabled} @app.get("/api/tools/toolsets") async def get_toolsets(): from hermes_cli.tools_config import ( _get_effective_configurable_toolsets, _get_platform_tools, _toolset_has_keys, ) from toolsets import resolve_toolset config = load_config() enabled_toolsets = _get_platform_tools( config, "cli", include_default_mcp_servers=False, ) result = [] for name, label, desc in _get_effective_configurable_toolsets(): try: tools = sorted(set(resolve_toolset(name))) except Exception: tools = [] is_enabled = name in enabled_toolsets result.append({ "name": name, "label": label, "description": desc, "enabled": is_enabled, "available": is_enabled, "configured": _toolset_has_keys(name, config), "tools": tools, }) return result # --------------------------------------------------------------------------- # Raw YAML config endpoint # --------------------------------------------------------------------------- class RawConfigUpdate(BaseModel): yaml_text: str @app.get("/api/config/raw") async def get_config_raw(): path = get_config_path() if not path.exists(): return {"yaml": ""} return {"yaml": path.read_text(encoding="utf-8")} @app.put("/api/config/raw") async def update_config_raw(body: RawConfigUpdate): try: parsed = yaml.safe_load(body.yaml_text) if not isinstance(parsed, dict): raise HTTPException(status_code=400, detail="YAML must be a mapping") save_config(parsed) return {"ok": True} except yaml.YAMLError as e: raise HTTPException(status_code=400, detail=f"Invalid YAML: {e}") # --------------------------------------------------------------------------- # Token / cost analytics endpoint # --------------------------------------------------------------------------- @app.get("/api/analytics/usage") async def get_usage_analytics(days: int = 30): from hermes_state import SessionDB from agent.insights import InsightsEngine db = SessionDB() try: cutoff = time.time() - (days * 86400) cur = db._conn.execute(""" SELECT date(started_at, 'unixepoch') as day, SUM(input_tokens) as input_tokens, SUM(output_tokens) as output_tokens, SUM(cache_read_tokens) as cache_read_tokens, SUM(reasoning_tokens) as reasoning_tokens, COALESCE(SUM(estimated_cost_usd), 0) as estimated_cost, COALESCE(SUM(actual_cost_usd), 0) as actual_cost, COUNT(*) as sessions, SUM(COALESCE(api_call_count, 0)) as api_calls FROM sessions WHERE started_at > ? GROUP BY day ORDER BY day """, (cutoff,)) daily = [dict(r) for r in cur.fetchall()] cur2 = db._conn.execute(""" SELECT model, SUM(input_tokens) as input_tokens, SUM(output_tokens) as output_tokens, COALESCE(SUM(estimated_cost_usd), 0) as estimated_cost, COUNT(*) as sessions, SUM(COALESCE(api_call_count, 0)) as api_calls FROM sessions WHERE started_at > ? AND model IS NOT NULL GROUP BY model ORDER BY SUM(input_tokens) + SUM(output_tokens) DESC """, (cutoff,)) by_model = [dict(r) for r in cur2.fetchall()] cur3 = db._conn.execute(""" SELECT SUM(input_tokens) as total_input, SUM(output_tokens) as total_output, SUM(cache_read_tokens) as total_cache_read, SUM(reasoning_tokens) as total_reasoning, COALESCE(SUM(estimated_cost_usd), 0) as total_estimated_cost, COALESCE(SUM(actual_cost_usd), 0) as total_actual_cost, COUNT(*) as total_sessions, SUM(COALESCE(api_call_count, 0)) as total_api_calls FROM sessions WHERE started_at > ? """, (cutoff,)) totals = dict(cur3.fetchone()) insights_report = InsightsEngine(db).generate(days=days) skills = insights_report.get("skills", { "summary": { "total_skill_loads": 0, "total_skill_edits": 0, "total_skill_actions": 0, "distinct_skills_used": 0, }, "top_skills": [], }) return { "daily": daily, "by_model": by_model, "totals": totals, "period_days": days, "skills": skills, } finally: db.close() def mount_spa(application: FastAPI): """Mount the built SPA. Falls back to index.html for client-side routing. The session token is injected into index.html via a ``' ) html = html.replace("", f"{token_script}", 1) return HTMLResponse( html, headers={"Cache-Control": "no-store, no-cache, must-revalidate"}, ) application.mount("/assets", StaticFiles(directory=WEB_DIST / "assets"), name="assets") @application.get("/{full_path:path}") async def serve_spa(full_path: str): file_path = WEB_DIST / full_path # Prevent path traversal via url-encoded sequences (%2e%2e/) if ( full_path and file_path.resolve().is_relative_to(WEB_DIST.resolve()) and file_path.exists() and file_path.is_file() ): return FileResponse(file_path) return _serve_index() # --------------------------------------------------------------------------- # Dashboard theme endpoints # --------------------------------------------------------------------------- # Built-in dashboard themes — label + description only. The actual color # definitions live in the frontend (web/src/themes/presets.ts). _BUILTIN_DASHBOARD_THEMES = [ {"name": "default", "label": "Hermes Teal", "description": "Classic dark teal — the canonical Hermes look"}, {"name": "midnight", "label": "Midnight", "description": "Deep blue-violet with cool accents"}, {"name": "ember", "label": "Ember", "description": "Warm crimson and bronze — forge vibes"}, {"name": "mono", "label": "Mono", "description": "Clean grayscale — minimal and focused"}, {"name": "cyberpunk", "label": "Cyberpunk", "description": "Neon green on black — matrix terminal"}, {"name": "rose", "label": "Rosé", "description": "Soft pink and warm ivory — easy on the eyes"}, ] def _parse_theme_layer(value: Any, default_hex: str, default_alpha: float = 1.0) -> Optional[Dict[str, Any]]: """Normalise a theme layer spec from YAML into `{hex, alpha}` form. Accepts shorthand (a bare hex string) or full dict form. Returns ``None`` on garbage input so the caller can fall back to a built-in default rather than blowing up. """ if value is None: return {"hex": default_hex, "alpha": default_alpha} if isinstance(value, str): return {"hex": value, "alpha": default_alpha} if isinstance(value, dict): hex_val = value.get("hex", default_hex) alpha_val = value.get("alpha", default_alpha) if not isinstance(hex_val, str): return None try: alpha_f = float(alpha_val) except (TypeError, ValueError): alpha_f = default_alpha return {"hex": hex_val, "alpha": max(0.0, min(1.0, alpha_f))} return None _THEME_DEFAULT_TYPOGRAPHY: Dict[str, str] = { "fontSans": 'system-ui, -apple-system, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif', "fontMono": 'ui-monospace, "SF Mono", "Cascadia Mono", Menlo, Consolas, monospace', "baseSize": "15px", "lineHeight": "1.55", "letterSpacing": "0", } _THEME_DEFAULT_LAYOUT: Dict[str, str] = { "radius": "0.5rem", "density": "comfortable", } _THEME_OVERRIDE_KEYS = { "card", "cardForeground", "popover", "popoverForeground", "primary", "primaryForeground", "secondary", "secondaryForeground", "muted", "mutedForeground", "accent", "accentForeground", "destructive", "destructiveForeground", "success", "warning", "border", "input", "ring", } # Well-known named asset slots themes can populate. Any other keys under # ``assets.custom`` are exposed as ``--theme-asset-custom-`` CSS vars # for plugin/shell use. _THEME_NAMED_ASSET_KEYS = {"bg", "hero", "logo", "crest", "sidebar", "header"} # Component-style buckets themes can override. The value under each bucket # is a mapping from camelCase property name to CSS string; each pair emits # ``--component--`` on :root. The frontend's shell # components (Card, App header, Backdrop, etc.) consume these vars so themes # can restyle chrome (clip-path, border-image, segmented progress, etc.) # without shipping their own CSS. _THEME_COMPONENT_BUCKETS = { "card", "header", "footer", "sidebar", "tab", "progress", "badge", "backdrop", "page", } _THEME_LAYOUT_VARIANTS = {"standard", "cockpit", "tiled"} # Cap on customCSS length so a malformed/oversized theme YAML can't blow up # the response payload or the