hermes-agent/hermes_agent/cli/web_server.py
alt-glitch 4b16341975 refactor(restructure): rewrite all imports for hermes_agent package
Rewrite all import statements, patch() targets, sys.modules keys,
importlib.import_module() strings, and subprocess -m references to use
hermes_agent.* paths.

Strip sys.path.insert hacks from production code (rely on editable install).
Update COMPONENT_PREFIXES for logger filtering.
Fix 3 hardcoded getLogger() calls to use __name__.
Update transport and tool registry discovery paths.
Update plugin module path strings.
Add legacy process-name patterns for gateway PID detection.
Add main() to skills_sync for console_script entry point.
Fix _get_bundled_dir() path traversal after move.

Part of #14182, #14183
2026-04-23 08:35:34 +05:30

2556 lines
95 KiB
Python

"""
Hermes Agent — Web UI server.
Provides a FastAPI backend serving the Vite/React frontend and REST API
endpoints for managing configuration, environment variables, and sessions.
Usage:
python -m hermes_agent.cli.main web # Start on http://127.0.0.1:9119
python -m hermes_agent.cli.main web --port 8080
"""
import asyncio
import hmac
import importlib.util
import json
import logging
import os
import secrets
import subprocess
import sys
import threading
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
from hermes_agent.cli import __version__, __release_date__
from hermes_agent.cli.config import (
DEFAULT_CONFIG,
OPTIONAL_ENV_VARS,
get_config_path,
get_env_path,
get_hermes_home,
load_config,
load_env,
save_config,
save_env_value,
remove_env_value,
check_config_version,
redact_key,
)
from hermes_agent.gateway.status import get_running_pid, read_runtime_status
try:
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
except ImportError:
raise SystemExit(
"Web UI requires fastapi and uvicorn.\n"
f"Install with: {sys.executable} -m pip install 'fastapi' 'uvicorn[standard]'"
)
WEB_DIST = Path(os.environ["HERMES_WEB_DIST"]) if "HERMES_WEB_DIST" in os.environ else Path(__file__).parent / "web_dist"
_log = logging.getLogger(__name__)
app = FastAPI(title="Hermes Agent", version=__version__)
# ---------------------------------------------------------------------------
# Session token for protecting sensitive endpoints (reveal).
# Generated fresh on every server start — dies when the process exits.
# Injected into the SPA HTML so only the legitimate web UI can use it.
# ---------------------------------------------------------------------------
_SESSION_TOKEN = secrets.token_urlsafe(32)
# Simple rate limiter for the reveal endpoint
_reveal_timestamps: List[float] = []
_REVEAL_MAX_PER_WINDOW = 5
_REVEAL_WINDOW_SECONDS = 30
# CORS: restrict to localhost origins only. The web UI is intended to run
# locally; binding to 0.0.0.0 with allow_origins=["*"] would let any website
# read/modify config and secrets.
app.add_middleware(
CORSMiddleware,
allow_origin_regex=r"^https?://(localhost|127\.0\.0\.1)(:\d+)?$",
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Endpoints that do NOT require the session token. Everything else under
# /api/ is gated by the auth middleware below. Keep this list minimal —
# only truly non-sensitive, read-only endpoints belong here.
# ---------------------------------------------------------------------------
_PUBLIC_API_PATHS: frozenset = frozenset({
"/api/status",
"/api/config/defaults",
"/api/config/schema",
"/api/model/info",
"/api/dashboard/themes",
"/api/dashboard/plugins",
"/api/dashboard/plugins/rescan",
})
def _require_token(request: Request) -> None:
"""Validate the ephemeral session token. Raises 401 on mismatch.
Uses ``hmac.compare_digest`` to prevent timing side-channels.
"""
auth = request.headers.get("authorization", "")
expected = f"Bearer {_SESSION_TOKEN}"
if not hmac.compare_digest(auth.encode(), expected.encode()):
raise HTTPException(status_code=401, detail="Unauthorized")
# Accepted Host header values for loopback binds. DNS rebinding attacks
# point a victim browser at an attacker-controlled hostname (evil.test)
# which resolves to 127.0.0.1 after a TTL flip — bypassing same-origin
# checks because the browser now considers evil.test and our dashboard
# "same origin". Validating the Host header at the app layer rejects any
# request whose Host isn't one we bound for. See GHSA-ppp5-vxwm-4cf7.
_LOOPBACK_HOST_VALUES: frozenset = frozenset({
"localhost", "127.0.0.1", "::1",
})
def _is_accepted_host(host_header: str, bound_host: str) -> bool:
"""True if the Host header targets the interface we bound to.
Accepts:
- Exact bound host (with or without port suffix)
- Loopback aliases when bound to loopback
- Any host when bound to 0.0.0.0 (explicit opt-in to non-loopback,
no protection possible at this layer)
"""
if not host_header:
return False
# Strip port suffix. IPv6 addresses use bracket notation:
# [::1] — no port
# [::1]:9119 — with port
# Plain hosts/v4:
# localhost:9119
# 127.0.0.1:9119
h = host_header.strip()
if h.startswith("["):
# IPv6 bracketed — port (if any) follows "]:"
close = h.find("]")
if close != -1:
host_only = h[1:close] # strip brackets
else:
host_only = h.strip("[]")
else:
host_only = h.rsplit(":", 1)[0] if ":" in h else h
host_only = host_only.lower()
# 0.0.0.0 bind means operator explicitly opted into all-interfaces
# (requires --insecure per web_server.start_server). No Host-layer
# defence can protect that mode; rely on operator network controls.
if bound_host in ("0.0.0.0", "::"):
return True
# Loopback bind: accept the loopback names
bound_lc = bound_host.lower()
if bound_lc in _LOOPBACK_HOST_VALUES:
return host_only in _LOOPBACK_HOST_VALUES
# Explicit non-loopback bind: require exact host match
return host_only == bound_lc
@app.middleware("http")
async def host_header_middleware(request: Request, call_next):
"""Reject requests whose Host header doesn't match the bound interface.
Defends against DNS rebinding: a victim browser on a localhost
dashboard is tricked into fetching from an attacker hostname that
TTL-flips to 127.0.0.1. CORS and same-origin checks don't help —
the browser now treats the attacker origin as same-origin with the
dashboard. Host-header validation at the app layer catches it.
See GHSA-ppp5-vxwm-4cf7.
"""
# Store the bound host on app.state so this middleware can read it —
# set by start_server() at listen time.
bound_host = getattr(app.state, "bound_host", None)
if bound_host:
host_header = request.headers.get("host", "")
if not _is_accepted_host(host_header, bound_host):
return JSONResponse(
status_code=400,
content={
"detail": (
"Invalid Host header. Dashboard requests must use "
"the hostname the server was bound to."
),
},
)
return await call_next(request)
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Require the session token on all /api/ routes except the public list."""
path = request.url.path
if path.startswith("/api/") and path not in _PUBLIC_API_PATHS and not path.startswith("/api/plugins/"):
auth = request.headers.get("authorization", "")
expected = f"Bearer {_SESSION_TOKEN}"
if not hmac.compare_digest(auth.encode(), expected.encode()):
return JSONResponse(
status_code=401,
content={"detail": "Unauthorized"},
)
return await call_next(request)
# ---------------------------------------------------------------------------
# Config schema — auto-generated from DEFAULT_CONFIG
# ---------------------------------------------------------------------------
# Manual overrides for fields that need select options or custom types
_SCHEMA_OVERRIDES: Dict[str, Dict[str, Any]] = {
"model": {
"type": "string",
"description": "Default model (e.g. anthropic/claude-sonnet-4.6)",
"category": "general",
},
"model_context_length": {
"type": "number",
"description": "Context window override (0 = auto-detect from model metadata)",
"category": "general",
},
"terminal.backend": {
"type": "select",
"description": "Terminal execution backend",
"options": ["local", "docker", "ssh", "modal", "daytona", "singularity"],
},
"terminal.modal_mode": {
"type": "select",
"description": "Modal sandbox mode",
"options": ["sandbox", "function"],
},
"tts.provider": {
"type": "select",
"description": "Text-to-speech provider",
"options": ["edge", "elevenlabs", "openai", "neutts"],
},
"stt.provider": {
"type": "select",
"description": "Speech-to-text provider",
"options": ["local", "openai", "mistral"],
},
"display.skin": {
"type": "select",
"description": "CLI visual theme",
"options": ["default", "ares", "mono", "slate"],
},
"dashboard.theme": {
"type": "select",
"description": "Web dashboard visual theme",
"options": ["default", "midnight", "ember", "mono", "cyberpunk", "rose"],
},
"display.resume_display": {
"type": "select",
"description": "How resumed sessions display history",
"options": ["minimal", "full", "off"],
},
"display.busy_input_mode": {
"type": "select",
"description": "Input behavior while agent is running",
"options": ["queue", "interrupt", "block"],
},
"memory.provider": {
"type": "select",
"description": "Memory provider plugin",
"options": ["builtin", "honcho"],
},
"approvals.mode": {
"type": "select",
"description": "Dangerous command approval mode",
"options": ["ask", "yolo", "deny"],
},
"context.engine": {
"type": "select",
"description": "Context management engine",
"options": ["default", "custom"],
},
"human_delay.mode": {
"type": "select",
"description": "Simulated typing delay mode",
"options": ["off", "typing", "fixed"],
},
"logging.level": {
"type": "select",
"description": "Log level for agent.log",
"options": ["DEBUG", "INFO", "WARNING", "ERROR"],
},
"hermes_agent.agent.service_tier": {
"type": "select",
"description": "API service tier (OpenAI/Anthropic)",
"options": ["", "auto", "default", "flex"],
},
"delegation.reasoning_effort": {
"type": "select",
"description": "Reasoning effort for delegated subagents",
"options": ["", "low", "medium", "high"],
},
}
# Categories with fewer fields get merged into "general" to avoid tab sprawl.
_CATEGORY_MERGE: Dict[str, str] = {
"privacy": "security",
"context": "agent",
"skills": "agent",
"cron": "agent",
"network": "agent",
"checkpoints": "agent",
"approvals": "security",
"human_delay": "display",
"dashboard": "display",
"code_execution": "agent",
}
# Display order for tabs — unlisted categories sort alphabetically after these.
_CATEGORY_ORDER = [
"general", "agent", "terminal", "display", "delegation",
"memory", "compression", "security", "browser", "voice",
"tts", "stt", "logging", "discord", "auxiliary",
]
def _infer_type(value: Any) -> str:
"""Infer a UI field type from a Python value."""
if isinstance(value, bool):
return "boolean"
if isinstance(value, int):
return "number"
if isinstance(value, float):
return "number"
if isinstance(value, list):
return "list"
if isinstance(value, dict):
return "object"
return "string"
def _build_schema_from_config(
config: Dict[str, Any],
prefix: str = "",
) -> Dict[str, Dict[str, Any]]:
"""Walk DEFAULT_CONFIG and produce a flat dot-path → field schema dict."""
schema: Dict[str, Dict[str, Any]] = {}
for key, value in config.items():
full_key = f"{prefix}.{key}" if prefix else key
# Skip internal / version keys
if full_key in ("_config_version",):
continue
# Category is the first path component for nested keys, or "general"
# for top-level scalar fields (model, toolsets, timezone, etc.).
if prefix:
category = prefix.split(".")[0]
elif isinstance(value, dict):
category = key
else:
category = "general"
if isinstance(value, dict):
# Recurse into nested dicts
schema.update(_build_schema_from_config(value, full_key))
else:
entry: Dict[str, Any] = {
"type": _infer_type(value),
"description": full_key.replace(".", "").replace("_", " ").title(),
"category": category,
}
# Apply manual overrides
if full_key in _SCHEMA_OVERRIDES:
entry.update(_SCHEMA_OVERRIDES[full_key])
# Merge small categories
entry["category"] = _CATEGORY_MERGE.get(entry["category"], entry["category"])
schema[full_key] = entry
return schema
CONFIG_SCHEMA = _build_schema_from_config(DEFAULT_CONFIG)
# Inject virtual fields that don't live in DEFAULT_CONFIG but are surfaced
# by the normalize/denormalize cycle. Insert model_context_length right after
# the "model" key so it renders adjacent in the frontend.
_mcl_entry = _SCHEMA_OVERRIDES["model_context_length"]
_ordered_schema: Dict[str, Dict[str, Any]] = {}
for _k, _v in CONFIG_SCHEMA.items():
_ordered_schema[_k] = _v
if _k == "model":
_ordered_schema["model_context_length"] = _mcl_entry
CONFIG_SCHEMA = _ordered_schema
class ConfigUpdate(BaseModel):
config: dict
class EnvVarUpdate(BaseModel):
key: str
value: str
class EnvVarDelete(BaseModel):
key: str
class EnvVarReveal(BaseModel):
key: str
_GATEWAY_HEALTH_URL = os.getenv("GATEWAY_HEALTH_URL")
_GATEWAY_HEALTH_TIMEOUT = float(os.getenv("GATEWAY_HEALTH_TIMEOUT", "3"))
def _probe_gateway_health() -> tuple[bool, dict | None]:
"""Probe the gateway via its HTTP health endpoint (cross-container).
Uses ``/health/detailed`` first (returns full state), falling back to
the simpler ``/health`` endpoint. Returns ``(is_alive, body_dict)``.
Accepts any of these as ``GATEWAY_HEALTH_URL``:
- ``http://gateway:8642`` (base URL — recommended)
- ``http://gateway:8642/health`` (explicit health path)
- ``http://gateway:8642/health/detailed`` (explicit detailed path)
This is a **blocking** call — run via ``run_in_executor`` from async code.
"""
if not _GATEWAY_HEALTH_URL:
return False, None
# Normalise to base URL so we always probe the right paths regardless of
# whether the user included /health or /health/detailed in the env var.
base = _GATEWAY_HEALTH_URL.rstrip("/")
if base.endswith("/health/detailed"):
base = base[: -len("/health/detailed")]
elif base.endswith("/health"):
base = base[: -len("/health")]
for path in (f"{base}/health/detailed", f"{base}/health"):
try:
req = urllib.request.Request(path, method="GET")
with urllib.request.urlopen(req, timeout=_GATEWAY_HEALTH_TIMEOUT) as resp:
if resp.status == 200:
body = json.loads(resp.read())
return True, body
except Exception:
continue
return False, None
@app.get("/api/status")
async def get_status():
current_ver, latest_ver = check_config_version()
# --- Gateway liveness detection ---
# Try local PID check first (same-host). If that fails and a remote
# GATEWAY_HEALTH_URL is configured, probe the gateway over HTTP so the
# dashboard works when the gateway runs in a separate container.
gateway_pid = get_running_pid()
gateway_running = gateway_pid is not None
remote_health_body: dict | None = None
if not gateway_running and _GATEWAY_HEALTH_URL:
loop = asyncio.get_event_loop()
alive, remote_health_body = await loop.run_in_executor(
None, _probe_gateway_health
)
if alive:
gateway_running = True
# PID from the remote container (display only — not locally valid)
if remote_health_body:
gateway_pid = remote_health_body.get("pid")
gateway_state = None
gateway_platforms: dict = {}
gateway_exit_reason = None
gateway_updated_at = None
configured_gateway_platforms: set[str] | None = None
try:
from hermes_agent.gateway.config import load_gateway_config
gateway_config = load_gateway_config()
configured_gateway_platforms = {
platform.value for platform in gateway_config.get_connected_platforms()
}
except Exception:
configured_gateway_platforms = None
# Prefer the detailed health endpoint response (has full state) when the
# local runtime status file is absent or stale (cross-container).
runtime = read_runtime_status()
if runtime is None and remote_health_body and remote_health_body.get("gateway_state"):
runtime = remote_health_body
if runtime:
gateway_state = runtime.get("gateway_state")
gateway_platforms = runtime.get("platforms") or {}
if configured_gateway_platforms is not None:
gateway_platforms = {
key: value
for key, value in gateway_platforms.items()
if key in configured_gateway_platforms
}
gateway_exit_reason = runtime.get("exit_reason")
gateway_updated_at = runtime.get("updated_at")
if not gateway_running:
gateway_state = gateway_state if gateway_state in ("stopped", "startup_failed") else "stopped"
gateway_platforms = {}
elif gateway_running and remote_health_body is not None:
# The health probe confirmed the gateway is alive, but the local
# runtime status file may be stale (cross-container). Override
# stopped/None state so the dashboard shows the correct badge.
if gateway_state in (None, "stopped"):
gateway_state = "running"
# If there was no runtime info at all but the health probe confirmed alive,
# ensure we still report the gateway as running (no shared volume scenario).
if gateway_running and gateway_state is None and remote_health_body is not None:
gateway_state = "running"
active_sessions = 0
try:
from hermes_agent.state import SessionDB
db = SessionDB()
try:
sessions = db.list_sessions_rich(limit=50)
now = time.time()
active_sessions = sum(
1 for s in sessions
if s.get("ended_at") is None
and (now - s.get("last_active", s.get("started_at", 0))) < 300
)
finally:
db.close()
except Exception:
pass
return {
"version": __version__,
"release_date": __release_date__,
"hermes_home": str(get_hermes_home()),
"config_path": str(get_config_path()),
"env_path": str(get_env_path()),
"config_version": current_ver,
"latest_config_version": latest_ver,
"gateway_running": gateway_running,
"gateway_pid": gateway_pid,
"gateway_health_url": _GATEWAY_HEALTH_URL,
"gateway_state": gateway_state,
"gateway_platforms": gateway_platforms,
"gateway_exit_reason": gateway_exit_reason,
"gateway_updated_at": gateway_updated_at,
"active_sessions": active_sessions,
}
# ---------------------------------------------------------------------------
# Gateway + update actions (invoked from the Status page).
#
# Both commands are spawned as detached subprocesses so the HTTP request
# returns immediately. stdin is closed (``DEVNULL``) so any stray ``input()``
# calls fail fast with EOF rather than hanging forever. stdout/stderr are
# streamed to a per-action log file under ``~/.hermes/logs/<action>.log`` so
# the dashboard can tail them back to the user.
# ---------------------------------------------------------------------------
_ACTION_LOG_DIR: Path = get_hermes_home() / "logs"
# Short ``name`` (from the URL) → absolute log file path.
_ACTION_LOG_FILES: Dict[str, str] = {
"gateway-restart": "gateway-restart.log",
"hermes-update": "hermes-update.log",
}
# ``name`` → most recently spawned Popen handle. Used so ``status`` can
# report liveness and exit code without shelling out to ``ps``.
_ACTION_PROCS: Dict[str, subprocess.Popen] = {}
def _spawn_hermes_action(subcommand: List[str], name: str) -> subprocess.Popen:
"""Spawn ``hermes <subcommand>`` detached and record the Popen handle.
Uses the running interpreter's ``hermes_agent.cli.main`` module so the action
inherits the same venv/PYTHONPATH the web server is using.
"""
log_file_name = _ACTION_LOG_FILES[name]
_ACTION_LOG_DIR.mkdir(parents=True, exist_ok=True)
log_path = _ACTION_LOG_DIR / log_file_name
log_file = open(log_path, "ab", buffering=0)
log_file.write(
f"\n=== {name} started {time.strftime('%Y-%m-%d %H:%M:%S')} ===\n".encode()
)
cmd = [sys.executable, "-m", "hermes_agent.cli.main", *subcommand]
popen_kwargs: Dict[str, Any] = {
"cwd": str(PROJECT_ROOT),
"stdin": subprocess.DEVNULL,
"stdout": log_file,
"stderr": subprocess.STDOUT,
"env": {**os.environ, "HERMES_NONINTERACTIVE": "1"},
}
if sys.platform == "win32":
popen_kwargs["creationflags"] = (
subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore[attr-defined]
| getattr(subprocess, "DETACHED_PROCESS", 0)
)
else:
popen_kwargs["start_new_session"] = True
proc = subprocess.Popen(cmd, **popen_kwargs)
_ACTION_PROCS[name] = proc
return proc
def _tail_lines(path: Path, n: int) -> List[str]:
"""Return the last ``n`` lines of ``path``. Reads the whole file — fine
for our small per-action logs. Binary-decoded with ``errors='replace'``
so log corruption doesn't 500 the endpoint."""
if not path.exists():
return []
try:
text = path.read_text(errors="replace")
except OSError:
return []
lines = text.splitlines()
return lines[-n:] if n > 0 else lines
@app.post("/api/gateway/restart")
async def restart_gateway():
"""Kick off a ``hermes gateway restart`` in the background."""
try:
proc = _spawn_hermes_action(["gateway", "restart"], "gateway-restart")
except Exception as exc:
_log.exception("Failed to spawn gateway restart")
raise HTTPException(status_code=500, detail=f"Failed to restart gateway: {exc}")
return {
"ok": True,
"pid": proc.pid,
"name": "gateway-restart",
}
@app.post("/api/hermes/update")
async def update_hermes():
"""Kick off ``hermes update`` in the background."""
try:
proc = _spawn_hermes_action(["update"], "hermes-update")
except Exception as exc:
_log.exception("Failed to spawn hermes update")
raise HTTPException(status_code=500, detail=f"Failed to start update: {exc}")
return {
"ok": True,
"pid": proc.pid,
"name": "hermes-update",
}
@app.get("/api/actions/{name}/status")
async def get_action_status(name: str, lines: int = 200):
"""Tail an action log and report whether the process is still running."""
log_file_name = _ACTION_LOG_FILES.get(name)
if log_file_name is None:
raise HTTPException(status_code=404, detail=f"Unknown action: {name}")
log_path = _ACTION_LOG_DIR / log_file_name
tail = _tail_lines(log_path, min(max(lines, 1), 2000))
proc = _ACTION_PROCS.get(name)
if proc is None:
running = False
exit_code: Optional[int] = None
pid: Optional[int] = None
else:
exit_code = proc.poll()
running = exit_code is None
pid = proc.pid
return {
"name": name,
"running": running,
"exit_code": exit_code,
"pid": pid,
"lines": tail,
}
@app.get("/api/sessions")
async def get_sessions(limit: int = 20, offset: int = 0):
try:
from hermes_agent.state import SessionDB
db = SessionDB()
try:
sessions = db.list_sessions_rich(limit=limit, offset=offset)
total = db.session_count()
now = time.time()
for s in sessions:
s["is_active"] = (
s.get("ended_at") is None
and (now - s.get("last_active", s.get("started_at", 0))) < 300
)
return {"sessions": sessions, "total": total, "limit": limit, "offset": offset}
finally:
db.close()
except Exception as e:
_log.exception("GET /api/sessions failed")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/api/sessions/search")
async def search_sessions(q: str = "", limit: int = 20):
"""Full-text search across session message content using FTS5."""
if not q or not q.strip():
return {"results": []}
try:
from hermes_agent.state import SessionDB
db = SessionDB()
try:
# Auto-add prefix wildcards so partial words match
# e.g. "nimb" → "nimb*" matches "nimby"
# Preserve quoted phrases and existing wildcards as-is
import re
terms = []
for token in re.findall(r'"[^"]*"|\S+', q.strip()):
if token.startswith('"') or token.endswith("*"):
terms.append(token)
else:
terms.append(token + "*")
prefix_query = " ".join(terms)
matches = db.search_messages(query=prefix_query, limit=limit)
# Group by session_id — return unique sessions with their best snippet
seen: dict = {}
for m in matches:
sid = m["session_id"]
if sid not in seen:
seen[sid] = {
"session_id": sid,
"snippet": m.get("snippet", ""),
"role": m.get("role"),
"source": m.get("source"),
"model": m.get("model"),
"session_started": m.get("session_started"),
}
return {"results": list(seen.values())}
finally:
db.close()
except Exception:
_log.exception("GET /api/sessions/search failed")
raise HTTPException(status_code=500, detail="Search failed")
def _normalize_config_for_web(config: Dict[str, Any]) -> Dict[str, Any]:
"""Normalize config for the web UI.
Hermes supports ``model`` as either a bare string (``"anthropic/claude-sonnet-4"``)
or a dict (``{default: ..., provider: ..., base_url: ...}``). The schema is built
from DEFAULT_CONFIG where ``model`` is a string, but user configs often have the
dict form. Normalize to the string form so the frontend schema matches.
Also surfaces ``model_context_length`` as a top-level field so the web UI can
display and edit it. A value of 0 means "auto-detect".
"""
config = dict(config) # shallow copy
model_val = config.get("model")
if isinstance(model_val, dict):
# Extract context_length before flattening the dict
ctx_len = model_val.get("context_length", 0)
config["model"] = model_val.get("default", model_val.get("name", ""))
config["model_context_length"] = ctx_len if isinstance(ctx_len, int) else 0
else:
config["model_context_length"] = 0
return config
@app.get("/api/config")
async def get_config():
config = _normalize_config_for_web(load_config())
# Strip internal keys that the frontend shouldn't see or send back
return {k: v for k, v in config.items() if not k.startswith("_")}
@app.get("/api/config/defaults")
async def get_defaults():
return DEFAULT_CONFIG
@app.get("/api/config/schema")
async def get_schema():
return {"fields": CONFIG_SCHEMA, "category_order": _CATEGORY_ORDER}
_EMPTY_MODEL_INFO: dict = {
"model": "",
"provider": "",
"auto_context_length": 0,
"config_context_length": 0,
"effective_context_length": 0,
"capabilities": {},
}
@app.get("/api/model/info")
def get_model_info():
"""Return resolved model metadata for the currently configured model.
Calls the same context-length resolution chain the agent uses, so the
frontend can display "Auto-detected: 200K" alongside the override field.
Also returns model capabilities (vision, reasoning, tools) when available.
"""
try:
cfg = load_config()
model_cfg = cfg.get("model", "")
# Extract model name and provider from the config
if isinstance(model_cfg, dict):
model_name = model_cfg.get("default", model_cfg.get("name", ""))
provider = model_cfg.get("provider", "")
base_url = model_cfg.get("base_url", "")
config_ctx = model_cfg.get("context_length")
else:
model_name = str(model_cfg) if model_cfg else ""
provider = ""
base_url = ""
config_ctx = None
if not model_name:
return dict(_EMPTY_MODEL_INFO, provider=provider)
# Resolve auto-detected context length (pass config_ctx=None to get
# purely auto-detected value, then separately report the override)
try:
from hermes_agent.providers.metadata import get_model_context_length
auto_ctx = get_model_context_length(
model=model_name,
base_url=base_url,
provider=provider,
config_context_length=None, # ignore override — we want auto value
)
except Exception:
auto_ctx = 0
config_ctx_int = 0
if isinstance(config_ctx, int) and config_ctx > 0:
config_ctx_int = config_ctx
# Effective is what the agent actually uses
effective_ctx = config_ctx_int if config_ctx_int > 0 else auto_ctx
# Try to get model capabilities from models.dev
caps = {}
try:
from hermes_agent.providers.metadata_dev import get_model_capabilities
mc = get_model_capabilities(provider=provider, model=model_name)
if mc is not None:
caps = {
"supports_tools": mc.supports_tools,
"supports_vision": mc.supports_vision,
"supports_reasoning": mc.supports_reasoning,
"context_window": mc.context_window,
"max_output_tokens": mc.max_output_tokens,
"model_family": mc.model_family,
}
except Exception:
pass
return {
"model": model_name,
"provider": provider,
"auto_context_length": auto_ctx,
"config_context_length": config_ctx_int,
"effective_context_length": effective_ctx,
"capabilities": caps,
}
except Exception:
_log.exception("GET /api/model/info failed")
return dict(_EMPTY_MODEL_INFO)
def _denormalize_config_from_web(config: Dict[str, Any]) -> Dict[str, Any]:
"""Reverse _normalize_config_for_web before saving.
Reconstructs ``model`` as a dict by reading the current on-disk config
to recover model subkeys (provider, base_url, api_mode, etc.) that were
stripped from the GET response. The frontend only sees model as a flat
string; the rest is preserved transparently.
Also handles ``model_context_length`` — writes it back into the model dict
as ``context_length``. A value of 0 or absent means "auto-detect" (omitted
from the dict so get_model_context_length() uses its normal resolution).
"""
config = dict(config)
# Remove any _model_meta that might have leaked in (shouldn't happen
# with the stripped GET response, but be defensive)
config.pop("_model_meta", None)
# Extract and remove model_context_length before processing model
ctx_override = config.pop("model_context_length", 0)
if not isinstance(ctx_override, int):
try:
ctx_override = int(ctx_override)
except (TypeError, ValueError):
ctx_override = 0
model_val = config.get("model")
if isinstance(model_val, str) and model_val:
# Read the current disk config to recover model subkeys
try:
disk_config = load_config()
disk_model = disk_config.get("model")
if isinstance(disk_model, dict):
# Preserve all subkeys, update default with the new value
disk_model["default"] = model_val
# Write context_length into the model dict (0 = remove/auto)
if ctx_override > 0:
disk_model["context_length"] = ctx_override
else:
disk_model.pop("context_length", None)
config["model"] = disk_model
else:
# Model was previously a bare string — upgrade to dict if
# user is setting a context_length override
if ctx_override > 0:
config["model"] = {
"default": model_val,
"context_length": ctx_override,
}
except Exception:
pass # can't read disk config — just use the string form
return config
@app.put("/api/config")
async def update_config(body: ConfigUpdate):
try:
save_config(_denormalize_config_from_web(body.config))
return {"ok": True}
except Exception as e:
_log.exception("PUT /api/config failed")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/api/env")
async def get_env_vars():
env_on_disk = load_env()
result = {}
for var_name, info in OPTIONAL_ENV_VARS.items():
value = env_on_disk.get(var_name)
result[var_name] = {
"is_set": bool(value),
"redacted_value": redact_key(value) if value else None,
"description": info.get("description", ""),
"url": info.get("url"),
"category": info.get("category", ""),
"is_password": info.get("password", False),
"tools": info.get("tools", []),
"advanced": info.get("advanced", False),
}
return result
@app.put("/api/env")
async def set_env_var(body: EnvVarUpdate):
try:
save_env_value(body.key, body.value)
return {"ok": True, "key": body.key}
except Exception as e:
_log.exception("PUT /api/env failed")
raise HTTPException(status_code=500, detail="Internal server error")
@app.delete("/api/env")
async def remove_env_var(body: EnvVarDelete):
try:
removed = remove_env_value(body.key)
if not removed:
raise HTTPException(status_code=404, detail=f"{body.key} not found in .env")
return {"ok": True, "key": body.key}
except HTTPException:
raise
except Exception as e:
_log.exception("DELETE /api/env failed")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/api/env/reveal")
async def reveal_env_var(body: EnvVarReveal, request: Request):
"""Return the real (unredacted) value of a single env var.
Protected by:
- Ephemeral session token (generated per server start, injected into SPA)
- Rate limiting (max 5 reveals per 30s window)
- Audit logging
"""
# --- Token check ---
_require_token(request)
# --- Rate limit ---
now = time.time()
cutoff = now - _REVEAL_WINDOW_SECONDS
_reveal_timestamps[:] = [t for t in _reveal_timestamps if t > cutoff]
if len(_reveal_timestamps) >= _REVEAL_MAX_PER_WINDOW:
raise HTTPException(status_code=429, detail="Too many reveal requests. Try again shortly.")
_reveal_timestamps.append(now)
# --- Reveal ---
env_on_disk = load_env()
value = env_on_disk.get(body.key)
if value is None:
raise HTTPException(status_code=404, detail=f"{body.key} not found in .env")
_log.info("env/reveal: %s", body.key)
return {"key": body.key, "value": value}
# ---------------------------------------------------------------------------
# OAuth provider endpoints — status + disconnect (Phase 1)
# ---------------------------------------------------------------------------
#
# Phase 1 surfaces *which OAuth providers exist* and whether each is
# connected, plus a disconnect button. The actual login flow (PKCE for
# Anthropic, device-code for Nous/Codex) still runs in the CLI for now;
# Phase 2 will add in-browser flows. For unconnected providers we return
# the canonical ``hermes auth add <provider>`` command so the dashboard
# can surface a one-click copy.
def _truncate_token(value: Optional[str], visible: int = 6) -> str:
"""Return ``...XXXXXX`` (last N chars) for safe display in the UI.
We never expose more than the trailing ``visible`` characters of an
OAuth access token. JWT prefixes (the part before the first dot) are
stripped first when present so the visible suffix is always part of
the signing region rather than a meaningless header chunk.
"""
if not value:
return ""
s = str(value)
if "." in s and s.count(".") >= 2:
# Looks like a JWT — show the trailing piece of the signature only.
s = s.rsplit(".", 1)[-1]
if len(s) <= visible:
return s
return f"{s[-visible:]}"
def _anthropic_oauth_status() -> Dict[str, Any]:
"""Combined status across the three Anthropic credential sources we read.
Hermes resolves Anthropic creds in this order at runtime:
1. ``~/.hermes/.anthropic_oauth.json`` — Hermes-managed PKCE flow
2. ``~/.claude/.credentials.json`` — Claude Code CLI credentials (auto)
3. ``ANTHROPIC_TOKEN`` / ``ANTHROPIC_API_KEY`` env vars
The dashboard reports the highest-priority source that's actually present.
"""
try:
from hermes_agent.providers.anthropic_adapter import (
read_hermes_oauth_credentials,
read_claude_code_credentials,
_HERMES_OAUTH_FILE,
)
except ImportError:
read_claude_code_credentials = None # type: ignore
read_hermes_oauth_credentials = None # type: ignore
_HERMES_OAUTH_FILE = None # type: ignore
hermes_creds = None
if read_hermes_oauth_credentials:
try:
hermes_creds = read_hermes_oauth_credentials()
except Exception:
hermes_creds = None
if hermes_creds and hermes_creds.get("accessToken"):
return {
"logged_in": True,
"source": "hermes_pkce",
"source_label": f"Hermes PKCE ({_HERMES_OAUTH_FILE})",
"token_preview": _truncate_token(hermes_creds.get("accessToken")),
"expires_at": hermes_creds.get("expiresAt"),
"has_refresh_token": bool(hermes_creds.get("refreshToken")),
}
cc_creds = None
if read_claude_code_credentials:
try:
cc_creds = read_claude_code_credentials()
except Exception:
cc_creds = None
if cc_creds and cc_creds.get("accessToken"):
return {
"logged_in": True,
"source": "claude_code",
"source_label": "Claude Code (~/.claude/.credentials.json)",
"token_preview": _truncate_token(cc_creds.get("accessToken")),
"expires_at": cc_creds.get("expiresAt"),
"has_refresh_token": bool(cc_creds.get("refreshToken")),
}
env_token = os.getenv("ANTHROPIC_TOKEN") or os.getenv("CLAUDE_CODE_OAUTH_TOKEN")
if env_token:
return {
"logged_in": True,
"source": "env_var",
"source_label": "ANTHROPIC_TOKEN environment variable",
"token_preview": _truncate_token(env_token),
"expires_at": None,
"has_refresh_token": False,
}
return {"logged_in": False, "source": None}
def _claude_code_only_status() -> Dict[str, Any]:
"""Surface Claude Code CLI credentials as their own provider entry.
Independent of the Anthropic entry above so users can see whether their
Claude Code subscription tokens are actively flowing into Hermes even
when they also have a separate Hermes-managed PKCE login.
"""
try:
from hermes_agent.providers.anthropic_adapter import read_claude_code_credentials
creds = read_claude_code_credentials()
except Exception:
creds = None
if creds and creds.get("accessToken"):
return {
"logged_in": True,
"source": "claude_code_cli",
"source_label": "~/.claude/.credentials.json",
"token_preview": _truncate_token(creds.get("accessToken")),
"expires_at": creds.get("expiresAt"),
"has_refresh_token": bool(creds.get("refreshToken")),
}
return {"logged_in": False, "source": None}
# Provider catalog. The order matters — it's how we render the UI list.
# ``cli_command`` is what the dashboard surfaces as the copy-to-clipboard
# fallback while Phase 2 (in-browser flows) isn't built yet.
# ``flow`` describes the OAuth shape so the future modal can pick the
# right UI: ``pkce`` = open URL + paste callback code, ``device_code`` =
# show code + verification URL + poll, ``external`` = read-only (delegated
# to a third-party CLI like Claude Code or Qwen).
_OAUTH_PROVIDER_CATALOG: tuple[Dict[str, Any], ...] = (
{
"id": "anthropic",
"name": "Anthropic (Claude API)",
"flow": "pkce",
"cli_command": "hermes auth add anthropic",
"docs_url": "https://docs.claude.com/en/api/getting-started",
"status_fn": _anthropic_oauth_status,
},
{
"id": "claude-code",
"name": "Claude Code (subscription)",
"flow": "external",
"cli_command": "claude setup-token",
"docs_url": "https://docs.claude.com/en/docs/claude-code",
"status_fn": _claude_code_only_status,
},
{
"id": "nous",
"name": "Nous Portal",
"flow": "device_code",
"cli_command": "hermes auth add nous",
"docs_url": "https://portal.nousresearch.com",
"status_fn": None, # dispatched via auth.get_nous_auth_status
},
{
"id": "openai-codex",
"name": "OpenAI Codex (ChatGPT)",
"flow": "device_code",
"cli_command": "hermes auth add openai-codex",
"docs_url": "https://platform.openai.com/docs",
"status_fn": None, # dispatched via auth.get_codex_auth_status
},
{
"id": "qwen-oauth",
"name": "Qwen (via Qwen CLI)",
"flow": "external",
"cli_command": "hermes auth add qwen-oauth",
"docs_url": "https://github.com/QwenLM/qwen-code",
"status_fn": None, # dispatched via auth.get_qwen_auth_status
},
)
def _resolve_provider_status(provider_id: str, status_fn) -> Dict[str, Any]:
"""Dispatch to the right status helper for an OAuth provider entry."""
if status_fn is not None:
try:
return status_fn()
except Exception as e:
return {"logged_in": False, "error": str(e)}
try:
from hermes_agent.cli import auth as hauth
if provider_id == "nous":
raw = hauth.get_nous_auth_status()
return {
"logged_in": bool(raw.get("logged_in")),
"source": "nous_portal",
"source_label": raw.get("portal_base_url") or "Nous Portal",
"token_preview": _truncate_token(raw.get("access_token")),
"expires_at": raw.get("access_expires_at"),
"has_refresh_token": bool(raw.get("has_refresh_token")),
}
if provider_id == "openai-codex":
raw = hauth.get_codex_auth_status()
return {
"logged_in": bool(raw.get("logged_in")),
"source": raw.get("source") or "openai_codex",
"source_label": raw.get("auth_mode") or "OpenAI Codex",
"token_preview": _truncate_token(raw.get("api_key")),
"expires_at": None,
"has_refresh_token": False,
"last_refresh": raw.get("last_refresh"),
}
if provider_id == "qwen-oauth":
raw = hauth.get_qwen_auth_status()
return {
"logged_in": bool(raw.get("logged_in")),
"source": "qwen_cli",
"source_label": raw.get("auth_store_path") or "Qwen CLI",
"token_preview": _truncate_token(raw.get("access_token")),
"expires_at": raw.get("expires_at"),
"has_refresh_token": bool(raw.get("has_refresh_token")),
}
except Exception as e:
return {"logged_in": False, "error": str(e)}
return {"logged_in": False}
@app.get("/api/providers/oauth")
async def list_oauth_providers():
"""Enumerate every OAuth-capable LLM provider with current status.
Response shape (per provider):
id stable identifier (used in DELETE path)
name human label
flow "pkce" | "device_code" | "external"
cli_command fallback CLI command for users to run manually
docs_url external docs/portal link for the "Learn more" link
status:
logged_in bool — currently has usable creds
source short slug ("hermes_pkce", "claude_code", ...)
source_label human-readable origin (file path, env var name)
token_preview last N chars of the token, never the full token
expires_at ISO timestamp string or null
has_refresh_token bool
"""
providers = []
for p in _OAUTH_PROVIDER_CATALOG:
status = _resolve_provider_status(p["id"], p.get("status_fn"))
providers.append({
"id": p["id"],
"name": p["name"],
"flow": p["flow"],
"cli_command": p["cli_command"],
"docs_url": p["docs_url"],
"status": status,
})
return {"providers": providers}
@app.delete("/api/providers/oauth/{provider_id}")
async def disconnect_oauth_provider(provider_id: str, request: Request):
"""Disconnect an OAuth provider. Token-protected (matches /env/reveal)."""
_require_token(request)
valid_ids = {p["id"] for p in _OAUTH_PROVIDER_CATALOG}
if provider_id not in valid_ids:
raise HTTPException(
status_code=400,
detail=f"Unknown provider: {provider_id}. "
f"Available: {', '.join(sorted(valid_ids))}",
)
# Anthropic and claude-code clear the same Hermes-managed PKCE file
# AND forget the Claude Code import. We don't touch ~/.claude/* directly
# — that's owned by the Claude Code CLI; users can re-auth there if they
# want to undo a disconnect.
if provider_id in ("anthropic", "claude-code"):
try:
from hermes_agent.providers.anthropic_adapter import _HERMES_OAUTH_FILE
if _HERMES_OAUTH_FILE.exists():
_HERMES_OAUTH_FILE.unlink()
except Exception:
pass
# Also clear the credential pool entry if present.
try:
from hermes_agent.cli.auth.auth import clear_provider_auth
clear_provider_auth("anthropic")
except Exception:
pass
_log.info("oauth/disconnect: %s", provider_id)
return {"ok": True, "provider": provider_id}
try:
from hermes_agent.cli.auth.auth import clear_provider_auth
cleared = clear_provider_auth(provider_id)
_log.info("oauth/disconnect: %s (cleared=%s)", provider_id, cleared)
return {"ok": bool(cleared), "provider": provider_id}
except Exception as e:
_log.exception("disconnect %s failed", provider_id)
raise HTTPException(status_code=500, detail=str(e))
# ---------------------------------------------------------------------------
# OAuth Phase 2 — in-browser PKCE & device-code flows
# ---------------------------------------------------------------------------
#
# Two flow shapes are supported:
#
# PKCE (Anthropic):
# 1. POST /api/providers/oauth/anthropic/start
# → server generates code_verifier + challenge, builds claude.ai
# authorize URL, stashes verifier in _oauth_sessions[session_id]
# → returns { session_id, flow: "pkce", auth_url }
# 2. UI opens auth_url in a new tab. User authorizes, copies code.
# 3. POST /api/providers/oauth/anthropic/submit { session_id, code }
# → server exchanges (code + verifier) → tokens at console.anthropic.com
# → persists to ~/.hermes/.anthropic_oauth.json AND credential pool
# → returns { ok: true, status: "approved" }
#
# Device code (Nous, OpenAI Codex):
# 1. POST /api/providers/oauth/{nous|openai-codex}/start
# → server hits provider's device-auth endpoint
# → gets { user_code, verification_url, device_code, interval, expires_in }
# → spawns background poller thread that polls the token endpoint
# every `interval` seconds until approved/expired
# → stores poll status in _oauth_sessions[session_id]
# → returns { session_id, flow: "device_code", user_code,
# verification_url, expires_in, poll_interval }
# 2. UI opens verification_url in a new tab and shows user_code.
# 3. UI polls GET /api/providers/oauth/{provider}/poll/{session_id}
# every 2s until status != "pending".
# 4. On "approved" the background thread has already saved creds; UI
# refreshes the providers list.
#
# Sessions are kept in-memory only (single-process FastAPI) and time out
# after 15 minutes. A periodic cleanup runs on each /start call to GC
# expired sessions so the dict doesn't grow without bound.
_OAUTH_SESSION_TTL_SECONDS = 15 * 60
_oauth_sessions: Dict[str, Dict[str, Any]] = {}
_oauth_sessions_lock = threading.Lock()
# Import OAuth constants from canonical source instead of duplicating.
# Guarded so hermes web still starts if anthropic_adapter is unavailable;
# Phase 2 endpoints will return 501 in that case.
try:
from hermes_agent.providers.anthropic_adapter import (
_OAUTH_CLIENT_ID as _ANTHROPIC_OAUTH_CLIENT_ID,
_OAUTH_TOKEN_URL as _ANTHROPIC_OAUTH_TOKEN_URL,
_OAUTH_REDIRECT_URI as _ANTHROPIC_OAUTH_REDIRECT_URI,
_OAUTH_SCOPES as _ANTHROPIC_OAUTH_SCOPES,
_generate_pkce as _generate_pkce_pair,
)
_ANTHROPIC_OAUTH_AVAILABLE = True
except ImportError:
_ANTHROPIC_OAUTH_AVAILABLE = False
_ANTHROPIC_OAUTH_AUTHORIZE_URL = "https://claude.ai/oauth/authorize"
def _gc_oauth_sessions() -> None:
"""Drop expired sessions. Called opportunistically on /start."""
cutoff = time.time() - _OAUTH_SESSION_TTL_SECONDS
with _oauth_sessions_lock:
stale = [sid for sid, sess in _oauth_sessions.items() if sess["created_at"] < cutoff]
for sid in stale:
_oauth_sessions.pop(sid, None)
def _new_oauth_session(provider_id: str, flow: str) -> tuple[str, Dict[str, Any]]:
"""Create + register a new OAuth session, return (session_id, session_dict)."""
sid = secrets.token_urlsafe(16)
sess = {
"session_id": sid,
"provider": provider_id,
"flow": flow,
"created_at": time.time(),
"status": "pending", # pending | approved | denied | expired | error
"error_message": None,
}
with _oauth_sessions_lock:
_oauth_sessions[sid] = sess
return sid, sess
def _save_anthropic_oauth_creds(access_token: str, refresh_token: str, expires_at_ms: int) -> None:
"""Persist Anthropic PKCE creds to both Hermes file AND credential pool.
Mirrors what auth_commands.add_command does so the dashboard flow leaves
the system in the same state as ``hermes auth add anthropic``.
"""
from hermes_agent.providers.anthropic_adapter import _HERMES_OAUTH_FILE
payload = {
"accessToken": access_token,
"refreshToken": refresh_token,
"expiresAt": expires_at_ms,
}
_HERMES_OAUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
_HERMES_OAUTH_FILE.write_text(json.dumps(payload, indent=2), encoding="utf-8")
# Best-effort credential-pool insert. Failure here doesn't invalidate
# the file write — pool registration only matters for the rotation
# strategy, not for runtime credential resolution.
try:
from hermes_agent.providers.credential_pool import (
PooledCredential,
load_pool,
AUTH_TYPE_OAUTH,
SOURCE_MANUAL,
)
import uuid
pool = load_pool("anthropic")
# Avoid duplicate entries: delete any prior dashboard-issued OAuth entry
existing = [e for e in pool.entries() if getattr(e, "source", "").startswith(f"{SOURCE_MANUAL}:dashboard_pkce")]
for e in existing:
try:
pool.remove_entry(getattr(e, "id", ""))
except Exception:
pass
entry = PooledCredential(
provider="anthropic",
id=uuid.uuid4().hex[:6],
label="dashboard PKCE",
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:dashboard_pkce",
access_token=access_token,
refresh_token=refresh_token,
expires_at_ms=expires_at_ms,
)
pool.add_entry(entry)
except Exception as e:
_log.warning("anthropic pool add (dashboard) failed: %s", e)
def _start_anthropic_pkce() -> Dict[str, Any]:
"""Begin PKCE flow. Returns the auth URL the UI should open."""
if not _ANTHROPIC_OAUTH_AVAILABLE:
raise HTTPException(status_code=501, detail="Anthropic OAuth not available (missing adapter)")
verifier, challenge = _generate_pkce_pair()
sid, sess = _new_oauth_session("anthropic", "pkce")
sess["verifier"] = verifier
sess["state"] = verifier # Anthropic round-trips verifier as state
params = {
"code": "true",
"client_id": _ANTHROPIC_OAUTH_CLIENT_ID,
"response_type": "code",
"redirect_uri": _ANTHROPIC_OAUTH_REDIRECT_URI,
"scope": _ANTHROPIC_OAUTH_SCOPES,
"code_challenge": challenge,
"code_challenge_method": "S256",
"state": verifier,
}
auth_url = f"{_ANTHROPIC_OAUTH_AUTHORIZE_URL}?{urllib.parse.urlencode(params)}"
return {
"session_id": sid,
"flow": "pkce",
"auth_url": auth_url,
"expires_in": _OAUTH_SESSION_TTL_SECONDS,
}
def _submit_anthropic_pkce(session_id: str, code_input: str) -> Dict[str, Any]:
"""Exchange authorization code for tokens. Persists on success."""
with _oauth_sessions_lock:
sess = _oauth_sessions.get(session_id)
if not sess or sess["provider"] != "anthropic" or sess["flow"] != "pkce":
raise HTTPException(status_code=404, detail="Unknown or expired session")
if sess["status"] != "pending":
return {"ok": False, "status": sess["status"], "message": sess.get("error_message")}
# Anthropic's redirect callback page formats the code as `<code>#<state>`.
# Strip the state suffix if present (we already have the verifier server-side).
parts = code_input.strip().split("#", 1)
code = parts[0].strip()
if not code:
return {"ok": False, "status": "error", "message": "No code provided"}
state_from_callback = parts[1] if len(parts) > 1 else ""
exchange_data = json.dumps({
"grant_type": "authorization_code",
"client_id": _ANTHROPIC_OAUTH_CLIENT_ID,
"code": code,
"state": state_from_callback or sess["state"],
"redirect_uri": _ANTHROPIC_OAUTH_REDIRECT_URI,
"code_verifier": sess["verifier"],
}).encode()
req = urllib.request.Request(
_ANTHROPIC_OAUTH_TOKEN_URL,
data=exchange_data,
headers={
"Content-Type": "application/json",
"User-Agent": "hermes-dashboard/1.0",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=20) as resp:
result = json.loads(resp.read().decode())
except Exception as e:
sess["status"] = "error"
sess["error_message"] = f"Token exchange failed: {e}"
return {"ok": False, "status": "error", "message": sess["error_message"]}
access_token = result.get("access_token", "")
refresh_token = result.get("refresh_token", "")
expires_in = int(result.get("expires_in") or 3600)
if not access_token:
sess["status"] = "error"
sess["error_message"] = "No access token returned"
return {"ok": False, "status": "error", "message": sess["error_message"]}
expires_at_ms = int(time.time() * 1000) + (expires_in * 1000)
try:
_save_anthropic_oauth_creds(access_token, refresh_token, expires_at_ms)
except Exception as e:
sess["status"] = "error"
sess["error_message"] = f"Save failed: {e}"
return {"ok": False, "status": "error", "message": sess["error_message"]}
sess["status"] = "approved"
_log.info("oauth/pkce: anthropic login completed (session=%s)", session_id)
return {"ok": True, "status": "approved"}
async def _start_device_code_flow(provider_id: str) -> Dict[str, Any]:
"""Initiate a device-code flow (Nous or OpenAI Codex).
Calls the provider's device-auth endpoint via the existing CLI helpers,
then spawns a background poller. Returns the user-facing display fields
so the UI can render the verification page link + user code.
"""
from hermes_agent.cli import auth as hauth
if provider_id == "nous":
from hermes_agent.cli.auth.auth import _request_device_code, PROVIDER_REGISTRY
import httpx
pconfig = PROVIDER_REGISTRY["nous"]
portal_base_url = (
os.getenv("HERMES_PORTAL_BASE_URL")
or os.getenv("NOUS_PORTAL_BASE_URL")
or pconfig.portal_base_url
).rstrip("/")
client_id = pconfig.client_id
scope = pconfig.scope
def _do_nous_device_request():
with httpx.Client(timeout=httpx.Timeout(15.0), headers={"Accept": "application/json"}) as client:
return _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
)
device_data = await asyncio.get_event_loop().run_in_executor(None, _do_nous_device_request)
sid, sess = _new_oauth_session("nous", "device_code")
sess["device_code"] = str(device_data["device_code"])
sess["interval"] = int(device_data["interval"])
sess["expires_at"] = time.time() + int(device_data["expires_in"])
sess["portal_base_url"] = portal_base_url
sess["client_id"] = client_id
threading.Thread(
target=_nous_poller, args=(sid,), daemon=True, name=f"oauth-poll-{sid[:6]}"
).start()
return {
"session_id": sid,
"flow": "device_code",
"user_code": str(device_data["user_code"]),
"verification_url": str(device_data["verification_uri_complete"]),
"expires_in": int(device_data["expires_in"]),
"poll_interval": int(device_data["interval"]),
}
if provider_id == "openai-codex":
# Codex uses fixed OpenAI device-auth endpoints; reuse the helper.
sid, _ = _new_oauth_session("openai-codex", "device_code")
# Use the helper but in a thread because it polls inline.
# We can't extract just the start step without refactoring auth.py,
# so we run the full helper in a worker and proxy the user_code +
# verification_url back via the session dict. The helper prints
# to stdout — we capture nothing here, just status.
threading.Thread(
target=_codex_full_login_worker, args=(sid,), daemon=True,
name=f"oauth-codex-{sid[:6]}",
).start()
# Block briefly until the worker has populated the user_code, OR error.
deadline = time.time() + 10
while time.time() < deadline:
with _oauth_sessions_lock:
s = _oauth_sessions.get(sid)
if s and (s.get("user_code") or s["status"] != "pending"):
break
await asyncio.sleep(0.1)
with _oauth_sessions_lock:
s = _oauth_sessions.get(sid, {})
if s.get("status") == "error":
raise HTTPException(status_code=500, detail=s.get("error_message") or "device-auth failed")
if not s.get("user_code"):
raise HTTPException(status_code=504, detail="device-auth timed out before returning a user code")
return {
"session_id": sid,
"flow": "device_code",
"user_code": s["user_code"],
"verification_url": s["verification_url"],
"expires_in": int(s.get("expires_in") or 900),
"poll_interval": int(s.get("interval") or 5),
}
raise HTTPException(status_code=400, detail=f"Provider {provider_id} does not support device-code flow")
def _nous_poller(session_id: str) -> None:
"""Background poller that drives a Nous device-code flow to completion."""
from hermes_agent.cli.auth.auth import _poll_for_token, refresh_nous_oauth_from_state
from datetime import datetime, timezone
import httpx
with _oauth_sessions_lock:
sess = _oauth_sessions.get(session_id)
if not sess:
return
portal_base_url = sess["portal_base_url"]
client_id = sess["client_id"]
device_code = sess["device_code"]
interval = sess["interval"]
expires_in = max(60, int(sess["expires_at"] - time.time()))
try:
with httpx.Client(timeout=httpx.Timeout(15.0), headers={"Accept": "application/json"}) as client:
token_data = _poll_for_token(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
device_code=device_code,
expires_in=expires_in,
poll_interval=interval,
)
# Same post-processing as _nous_device_code_login (mint agent key)
now = datetime.now(timezone.utc)
token_ttl = int(token_data.get("expires_in") or 0)
auth_state = {
"portal_base_url": portal_base_url,
"inference_base_url": token_data.get("inference_base_url"),
"client_id": client_id,
"scope": token_data.get("scope"),
"token_type": token_data.get("token_type", "Bearer"),
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"obtained_at": now.isoformat(),
"expires_at": (
datetime.fromtimestamp(now.timestamp() + token_ttl, tz=timezone.utc).isoformat()
if token_ttl else None
),
"expires_in": token_ttl,
}
full_state = refresh_nous_oauth_from_state(
auth_state, min_key_ttl_seconds=300, timeout_seconds=15.0,
force_refresh=False, force_mint=True,
)
from hermes_agent.cli.auth.auth import persist_nous_credentials
persist_nous_credentials(full_state)
with _oauth_sessions_lock:
sess["status"] = "approved"
_log.info("oauth/device: nous login completed (session=%s)", session_id)
except Exception as e:
_log.warning("nous device-code poll failed (session=%s): %s", session_id, e)
with _oauth_sessions_lock:
sess["status"] = "error"
sess["error_message"] = str(e)
def _codex_full_login_worker(session_id: str) -> None:
"""Run the complete OpenAI Codex device-code flow.
Codex doesn't use the standard OAuth device-code endpoints; it has its
own ``/api/accounts/deviceauth/usercode`` (JSON body, returns
``device_auth_id``) and ``/api/accounts/deviceauth/token`` (JSON body
polled until 200). On success the response carries an
``authorization_code`` + ``code_verifier`` that get exchanged at
CODEX_OAUTH_TOKEN_URL with grant_type=authorization_code.
The flow is replicated inline (rather than calling
_codex_device_code_login) because that helper prints/blocks/polls in a
single function — we need to surface the user_code to the dashboard the
moment we receive it, well before polling completes.
"""
try:
import httpx
from hermes_agent.cli.auth.auth import (
CODEX_OAUTH_CLIENT_ID,
CODEX_OAUTH_TOKEN_URL,
DEFAULT_CODEX_BASE_URL,
)
issuer = "https://auth.openai.com"
# Step 1: request device code
with httpx.Client(timeout=httpx.Timeout(15.0)) as client:
resp = client.post(
f"{issuer}/api/accounts/deviceauth/usercode",
json={"client_id": CODEX_OAUTH_CLIENT_ID},
headers={"Content-Type": "application/json"},
)
if resp.status_code != 200:
raise RuntimeError(f"deviceauth/usercode returned {resp.status_code}")
device_data = resp.json()
user_code = device_data.get("user_code", "")
device_auth_id = device_data.get("device_auth_id", "")
poll_interval = max(3, int(device_data.get("interval", "5")))
if not user_code or not device_auth_id:
raise RuntimeError("device-code response missing user_code or device_auth_id")
verification_url = f"{issuer}/codex/device"
with _oauth_sessions_lock:
sess = _oauth_sessions.get(session_id)
if not sess:
return
sess["user_code"] = user_code
sess["verification_url"] = verification_url
sess["device_auth_id"] = device_auth_id
sess["interval"] = poll_interval
sess["expires_in"] = 15 * 60 # OpenAI's effective limit
sess["expires_at"] = time.time() + sess["expires_in"]
# Step 2: poll until authorized
deadline = time.time() + sess["expires_in"]
code_resp = None
with httpx.Client(timeout=httpx.Timeout(15.0)) as client:
while time.time() < deadline:
time.sleep(poll_interval)
poll = client.post(
f"{issuer}/api/accounts/deviceauth/token",
json={"device_auth_id": device_auth_id, "user_code": user_code},
headers={"Content-Type": "application/json"},
)
if poll.status_code == 200:
code_resp = poll.json()
break
if poll.status_code in (403, 404):
continue # user hasn't authorized yet
raise RuntimeError(f"deviceauth/token poll returned {poll.status_code}")
if code_resp is None:
with _oauth_sessions_lock:
sess["status"] = "expired"
sess["error_message"] = "Device code expired before approval"
return
# Step 3: exchange authorization_code for tokens
authorization_code = code_resp.get("authorization_code", "")
code_verifier = code_resp.get("code_verifier", "")
if not authorization_code or not code_verifier:
raise RuntimeError("device-auth response missing authorization_code/code_verifier")
with httpx.Client(timeout=httpx.Timeout(15.0)) as client:
token_resp = client.post(
CODEX_OAUTH_TOKEN_URL,
data={
"grant_type": "authorization_code",
"code": authorization_code,
"redirect_uri": f"{issuer}/deviceauth/callback",
"client_id": CODEX_OAUTH_CLIENT_ID,
"code_verifier": code_verifier,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if token_resp.status_code != 200:
raise RuntimeError(f"token exchange returned {token_resp.status_code}")
tokens = token_resp.json()
access_token = tokens.get("access_token", "")
refresh_token = tokens.get("refresh_token", "")
if not access_token:
raise RuntimeError("token exchange did not return access_token")
# Persist via credential pool — same shape as auth_commands.add_command
from hermes_agent.providers.credential_pool import (
PooledCredential,
load_pool,
AUTH_TYPE_OAUTH,
SOURCE_MANUAL,
)
import uuid as _uuid
pool = load_pool("openai-codex")
base_url = (
os.getenv("HERMES_CODEX_BASE_URL", "").strip().rstrip("/")
or DEFAULT_CODEX_BASE_URL
)
entry = PooledCredential(
provider="openai-codex",
id=_uuid.uuid4().hex[:6],
label="dashboard device_code",
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:dashboard_device_code",
access_token=access_token,
refresh_token=refresh_token,
base_url=base_url,
)
pool.add_entry(entry)
with _oauth_sessions_lock:
sess["status"] = "approved"
_log.info("oauth/device: openai-codex login completed (session=%s)", session_id)
except Exception as e:
_log.warning("codex device-code worker failed (session=%s): %s", session_id, e)
with _oauth_sessions_lock:
s = _oauth_sessions.get(session_id)
if s:
s["status"] = "error"
s["error_message"] = str(e)
@app.post("/api/providers/oauth/{provider_id}/start")
async def start_oauth_login(provider_id: str, request: Request):
"""Initiate an OAuth login flow. Token-protected."""
_require_token(request)
_gc_oauth_sessions()
valid = {p["id"] for p in _OAUTH_PROVIDER_CATALOG}
if provider_id not in valid:
raise HTTPException(status_code=400, detail=f"Unknown provider {provider_id}")
catalog_entry = next(p for p in _OAUTH_PROVIDER_CATALOG if p["id"] == provider_id)
if catalog_entry["flow"] == "external":
raise HTTPException(
status_code=400,
detail=f"{provider_id} uses an external CLI; run `{catalog_entry['cli_command']}` manually",
)
try:
if catalog_entry["flow"] == "pkce":
return _start_anthropic_pkce()
if catalog_entry["flow"] == "device_code":
return await _start_device_code_flow(provider_id)
except HTTPException:
raise
except Exception as e:
_log.exception("oauth/start %s failed", provider_id)
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=400, detail="Unsupported flow")
class OAuthSubmitBody(BaseModel):
session_id: str
code: str
@app.post("/api/providers/oauth/{provider_id}/submit")
async def submit_oauth_code(provider_id: str, body: OAuthSubmitBody, request: Request):
"""Submit the auth code for PKCE flows. Token-protected."""
_require_token(request)
if provider_id == "anthropic":
return await asyncio.get_event_loop().run_in_executor(
None, _submit_anthropic_pkce, body.session_id, body.code,
)
raise HTTPException(status_code=400, detail=f"submit not supported for {provider_id}")
@app.get("/api/providers/oauth/{provider_id}/poll/{session_id}")
async def poll_oauth_session(provider_id: str, session_id: str):
"""Poll a device-code session's status (no auth — read-only state)."""
with _oauth_sessions_lock:
sess = _oauth_sessions.get(session_id)
if not sess:
raise HTTPException(status_code=404, detail="Session not found or expired")
if sess["provider"] != provider_id:
raise HTTPException(status_code=400, detail="Provider mismatch for session")
return {
"session_id": session_id,
"status": sess["status"],
"error_message": sess.get("error_message"),
"expires_at": sess.get("expires_at"),
}
@app.delete("/api/providers/oauth/sessions/{session_id}")
async def cancel_oauth_session(session_id: str, request: Request):
"""Cancel a pending OAuth session. Token-protected."""
_require_token(request)
with _oauth_sessions_lock:
sess = _oauth_sessions.pop(session_id, None)
if sess is None:
return {"ok": False, "message": "session not found"}
return {"ok": True, "session_id": session_id}
# ---------------------------------------------------------------------------
# Session detail endpoints
# ---------------------------------------------------------------------------
@app.get("/api/sessions/{session_id}")
async def get_session_detail(session_id: str):
from hermes_agent.state import SessionDB
db = SessionDB()
try:
sid = db.resolve_session_id(session_id)
session = db.get_session(sid) if sid else None
if not session:
raise HTTPException(status_code=404, detail="Session not found")
return session
finally:
db.close()
@app.get("/api/sessions/{session_id}/messages")
async def get_session_messages(session_id: str):
from hermes_agent.state import SessionDB
db = SessionDB()
try:
sid = db.resolve_session_id(session_id)
if not sid:
raise HTTPException(status_code=404, detail="Session not found")
messages = db.get_messages(sid)
return {"session_id": sid, "messages": messages}
finally:
db.close()
@app.delete("/api/sessions/{session_id}")
async def delete_session_endpoint(session_id: str):
from hermes_agent.state import SessionDB
db = SessionDB()
try:
if not db.delete_session(session_id):
raise HTTPException(status_code=404, detail="Session not found")
return {"ok": True}
finally:
db.close()
# ---------------------------------------------------------------------------
# Log viewer endpoint
# ---------------------------------------------------------------------------
@app.get("/api/logs")
async def get_logs(
file: str = "agent",
lines: int = 100,
level: Optional[str] = None,
component: Optional[str] = None,
search: Optional[str] = None,
):
from hermes_agent.cli.logs import _read_tail, LOG_FILES
log_name = LOG_FILES.get(file)
if not log_name:
raise HTTPException(status_code=400, detail=f"Unknown log file: {file}")
log_path = get_hermes_home() / "logs" / log_name
if not log_path.exists():
return {"file": file, "lines": []}
try:
from hermes_agent.logging import COMPONENT_PREFIXES
except ImportError:
COMPONENT_PREFIXES = {}
# Normalize "ALL" / "all" / empty → no filter. _matches_filters treats an
# empty tuple as "must match a prefix" (startswith(()) is always False),
# so passing () instead of None silently drops every line.
min_level = level if level and level.upper() != "ALL" else None
if component and component.lower() != "all":
comp_prefixes = COMPONENT_PREFIXES.get(component)
if comp_prefixes is None:
raise HTTPException(
status_code=400,
detail=f"Unknown component: {component}. "
f"Available: {', '.join(sorted(COMPONENT_PREFIXES))}",
)
else:
comp_prefixes = None
has_filters = bool(min_level or comp_prefixes or search)
result = _read_tail(
log_path, min(lines, 500) if not search else 2000,
has_filters=has_filters,
min_level=min_level,
component_prefixes=comp_prefixes,
)
# Post-filter by search term (case-insensitive substring match).
# _read_tail doesn't support free-text search, so we filter here and
# trim to the requested line count afterward.
if search:
needle = search.lower()
result = [l for l in result if needle in l.lower()][-min(lines, 500):]
return {"file": file, "lines": result}
# ---------------------------------------------------------------------------
# Cron job management endpoints
# ---------------------------------------------------------------------------
class CronJobCreate(BaseModel):
prompt: str
schedule: str
name: str = ""
deliver: str = "local"
class CronJobUpdate(BaseModel):
updates: dict
@app.get("/api/cron/jobs")
async def list_cron_jobs():
from hermes_agent.cron.jobs import list_jobs
return list_jobs(include_disabled=True)
@app.get("/api/cron/jobs/{job_id}")
async def get_cron_job(job_id: str):
from hermes_agent.cron.jobs import get_job
job = get_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/api/cron/jobs")
async def create_cron_job(body: CronJobCreate):
from hermes_agent.cron.jobs import create_job
try:
job = create_job(prompt=body.prompt, schedule=body.schedule,
name=body.name, deliver=body.deliver)
return job
except Exception as e:
_log.exception("POST /api/cron/jobs failed")
raise HTTPException(status_code=400, detail=str(e))
@app.put("/api/cron/jobs/{job_id}")
async def update_cron_job(job_id: str, body: CronJobUpdate):
from hermes_agent.cron.jobs import update_job
job = update_job(job_id, body.updates)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/api/cron/jobs/{job_id}/pause")
async def pause_cron_job(job_id: str):
from hermes_agent.cron.jobs import pause_job
job = pause_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/api/cron/jobs/{job_id}/resume")
async def resume_cron_job(job_id: str):
from hermes_agent.cron.jobs import resume_job
job = resume_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.post("/api/cron/jobs/{job_id}/trigger")
async def trigger_cron_job(job_id: str):
from hermes_agent.cron.jobs import trigger_job
job = trigger_job(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found")
return job
@app.delete("/api/cron/jobs/{job_id}")
async def delete_cron_job(job_id: str):
from hermes_agent.cron.jobs import remove_job
if not remove_job(job_id):
raise HTTPException(status_code=404, detail="Job not found")
return {"ok": True}
# ---------------------------------------------------------------------------
# Skills & Tools endpoints
# ---------------------------------------------------------------------------
class SkillToggle(BaseModel):
name: str
enabled: bool
@app.get("/api/skills")
async def get_skills():
from hermes_agent.tools.skills.tool import _find_all_skills
from hermes_agent.cli.skills_config import get_disabled_skills
config = load_config()
disabled = get_disabled_skills(config)
skills = _find_all_skills(skip_disabled=True)
for s in skills:
s["enabled"] = s["name"] not in disabled
return skills
@app.put("/api/skills/toggle")
async def toggle_skill(body: SkillToggle):
from hermes_agent.cli.skills_config import get_disabled_skills, save_disabled_skills
config = load_config()
disabled = get_disabled_skills(config)
if body.enabled:
disabled.discard(body.name)
else:
disabled.add(body.name)
save_disabled_skills(config, disabled)
return {"ok": True, "name": body.name, "enabled": body.enabled}
@app.get("/api/tools/toolsets")
async def get_toolsets():
from hermes_agent.cli.tools_config import (
_get_effective_configurable_toolsets,
_get_platform_tools,
_toolset_has_keys,
)
from hermes_agent.tools.toolsets import resolve_toolset
config = load_config()
enabled_toolsets = _get_platform_tools(
config,
"cli",
include_default_mcp_servers=False,
)
result = []
for name, label, desc in _get_effective_configurable_toolsets():
try:
tools = sorted(set(resolve_toolset(name)))
except Exception:
tools = []
is_enabled = name in enabled_toolsets
result.append({
"name": name, "label": label, "description": desc,
"enabled": is_enabled,
"available": is_enabled,
"configured": _toolset_has_keys(name, config),
"tools": tools,
})
return result
# ---------------------------------------------------------------------------
# Raw YAML config endpoint
# ---------------------------------------------------------------------------
class RawConfigUpdate(BaseModel):
yaml_text: str
@app.get("/api/config/raw")
async def get_config_raw():
path = get_config_path()
if not path.exists():
return {"yaml": ""}
return {"yaml": path.read_text(encoding="utf-8")}
@app.put("/api/config/raw")
async def update_config_raw(body: RawConfigUpdate):
try:
parsed = yaml.safe_load(body.yaml_text)
if not isinstance(parsed, dict):
raise HTTPException(status_code=400, detail="YAML must be a mapping")
save_config(parsed)
return {"ok": True}
except yaml.YAMLError as e:
raise HTTPException(status_code=400, detail=f"Invalid YAML: {e}")
# ---------------------------------------------------------------------------
# Token / cost analytics endpoint
# ---------------------------------------------------------------------------
@app.get("/api/analytics/usage")
async def get_usage_analytics(days: int = 30):
from hermes_agent.state import SessionDB
from hermes_agent.agent.insights import InsightsEngine
db = SessionDB()
try:
cutoff = time.time() - (days * 86400)
cur = db._conn.execute("""
SELECT date(started_at, 'unixepoch') as day,
SUM(input_tokens) as input_tokens,
SUM(output_tokens) as output_tokens,
SUM(cache_read_tokens) as cache_read_tokens,
SUM(reasoning_tokens) as reasoning_tokens,
COALESCE(SUM(estimated_cost_usd), 0) as estimated_cost,
COALESCE(SUM(actual_cost_usd), 0) as actual_cost,
COUNT(*) as sessions
FROM sessions WHERE started_at > ?
GROUP BY day ORDER BY day
""", (cutoff,))
daily = [dict(r) for r in cur.fetchall()]
cur2 = db._conn.execute("""
SELECT model,
SUM(input_tokens) as input_tokens,
SUM(output_tokens) as output_tokens,
COALESCE(SUM(estimated_cost_usd), 0) as estimated_cost,
COUNT(*) as sessions
FROM sessions WHERE started_at > ? AND model IS NOT NULL
GROUP BY model ORDER BY SUM(input_tokens) + SUM(output_tokens) DESC
""", (cutoff,))
by_model = [dict(r) for r in cur2.fetchall()]
cur3 = db._conn.execute("""
SELECT SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cache_read_tokens) as total_cache_read,
SUM(reasoning_tokens) as total_reasoning,
COALESCE(SUM(estimated_cost_usd), 0) as total_estimated_cost,
COALESCE(SUM(actual_cost_usd), 0) as total_actual_cost,
COUNT(*) as total_sessions
FROM sessions WHERE started_at > ?
""", (cutoff,))
totals = dict(cur3.fetchone())
insights_report = InsightsEngine(db).generate(days=days)
skills = insights_report.get("skills", {
"summary": {
"total_skill_loads": 0,
"total_skill_edits": 0,
"total_skill_actions": 0,
"distinct_skills_used": 0,
},
"top_skills": [],
})
return {
"daily": daily,
"by_model": by_model,
"totals": totals,
"period_days": days,
"skills": skills,
}
finally:
db.close()
def mount_spa(application: FastAPI):
"""Mount the built SPA. Falls back to index.html for client-side routing.
The session token is injected into index.html via a ``<script>`` tag so
the SPA can authenticate against protected API endpoints without a
separate (unauthenticated) token-dispensing endpoint.
"""
if not WEB_DIST.exists():
@application.get("/{full_path:path}")
async def no_frontend(full_path: str):
return JSONResponse(
{"error": "Frontend not built. Run: cd web && npm run build"},
status_code=404,
)
return
_index_path = WEB_DIST / "index.html"
def _serve_index():
"""Return index.html with the session token injected."""
html = _index_path.read_text()
token_script = (
f'<script>window.__HERMES_SESSION_TOKEN__="{_SESSION_TOKEN}";</script>'
)
html = html.replace("</head>", f"{token_script}</head>", 1)
return HTMLResponse(
html,
headers={"Cache-Control": "no-store, no-cache, must-revalidate"},
)
application.mount("/assets", StaticFiles(directory=WEB_DIST / "assets"), name="assets")
@application.get("/{full_path:path}")
async def serve_spa(full_path: str):
file_path = WEB_DIST / full_path
# Prevent path traversal via url-encoded sequences (%2e%2e/)
if (
full_path
and file_path.resolve().is_relative_to(WEB_DIST.resolve())
and file_path.exists()
and file_path.is_file()
):
return FileResponse(file_path)
return _serve_index()
# ---------------------------------------------------------------------------
# Dashboard theme endpoints
# ---------------------------------------------------------------------------
# Built-in dashboard themes — label + description only. The actual color
# definitions live in the frontend (web/src/themes/presets.ts).
_BUILTIN_DASHBOARD_THEMES = [
{"name": "default", "label": "Hermes Teal", "description": "Classic dark teal — the canonical Hermes look"},
{"name": "midnight", "label": "Midnight", "description": "Deep blue-violet with cool accents"},
{"name": "ember", "label": "Ember", "description": "Warm crimson and bronze — forge vibes"},
{"name": "mono", "label": "Mono", "description": "Clean grayscale — minimal and focused"},
{"name": "cyberpunk", "label": "Cyberpunk", "description": "Neon green on black — matrix terminal"},
{"name": "rose", "label": "Rosé", "description": "Soft pink and warm ivory — easy on the eyes"},
]
def _discover_user_themes() -> list:
"""Scan ~/.hermes/dashboard-themes/*.yaml for user-created themes."""
themes_dir = get_hermes_home() / "dashboard-themes"
if not themes_dir.is_dir():
return []
result = []
for f in sorted(themes_dir.glob("*.yaml")):
try:
data = yaml.safe_load(f.read_text(encoding="utf-8"))
if isinstance(data, dict) and data.get("name"):
result.append({
"name": data["name"],
"label": data.get("label", data["name"]),
"description": data.get("description", ""),
})
except Exception:
continue
return result
@app.get("/api/dashboard/themes")
async def get_dashboard_themes():
"""Return available themes and the currently active one."""
config = load_config()
active = config.get("dashboard", {}).get("theme", "default")
user_themes = _discover_user_themes()
# Merge built-in + user, user themes override built-in by name.
seen = set()
themes = []
for t in _BUILTIN_DASHBOARD_THEMES:
seen.add(t["name"])
themes.append(t)
for t in user_themes:
if t["name"] not in seen:
themes.append(t)
seen.add(t["name"])
return {"themes": themes, "active": active}
class ThemeSetBody(BaseModel):
name: str
@app.put("/api/dashboard/theme")
async def set_dashboard_theme(body: ThemeSetBody):
"""Set the active dashboard theme (persists to config.yaml)."""
config = load_config()
if "dashboard" not in config:
config["dashboard"] = {}
config["dashboard"]["theme"] = body.name
save_config(config)
return {"ok": True, "theme": body.name}
# ---------------------------------------------------------------------------
# Dashboard plugin system
# ---------------------------------------------------------------------------
def _discover_dashboard_plugins() -> list:
"""Scan plugins/*/dashboard/manifest.json for dashboard extensions.
Checks three plugin sources (same as hermes_cli.plugins):
1. User plugins: ~/.hermes/plugins/<name>/dashboard/manifest.json
2. Bundled plugins: <repo>/plugins/<name>/dashboard/manifest.json (memory/, etc.)
3. Project plugins: ./.hermes/plugins/ (only if HERMES_ENABLE_PROJECT_PLUGINS)
"""
plugins = []
seen_names: set = set()
search_dirs = [
(get_hermes_home() / "plugins", "user"),
(PROJECT_ROOT / "plugins" / "memory", "bundled"),
(PROJECT_ROOT / "plugins", "bundled"),
]
if os.environ.get("HERMES_ENABLE_PROJECT_PLUGINS"):
search_dirs.append((Path.cwd() / ".hermes" / "plugins", "project"))
for plugins_root, source in search_dirs:
if not plugins_root.is_dir():
continue
for child in sorted(plugins_root.iterdir()):
if not child.is_dir():
continue
manifest_file = child / "dashboard" / "manifest.json"
if not manifest_file.exists():
continue
try:
data = json.loads(manifest_file.read_text(encoding="utf-8"))
name = data.get("name", child.name)
if name in seen_names:
continue
seen_names.add(name)
plugins.append({
"name": name,
"label": data.get("label", name),
"description": data.get("description", ""),
"icon": data.get("icon", "Puzzle"),
"version": data.get("version", "0.0.0"),
"tab": data.get("tab", {"path": f"/{name}", "position": "end"}),
"entry": data.get("entry", "dist/index.js"),
"css": data.get("css"),
"has_api": bool(data.get("api")),
"source": source,
"_dir": str(child / "dashboard"),
"_api_file": data.get("api"),
})
except Exception as exc:
_log.warning("Bad dashboard plugin manifest %s: %s", manifest_file, exc)
continue
return plugins
# Cache discovered plugins per-process (refresh on explicit re-scan).
_dashboard_plugins_cache: Optional[list] = None
def _get_dashboard_plugins(force_rescan: bool = False) -> list:
global _dashboard_plugins_cache
if _dashboard_plugins_cache is None or force_rescan:
_dashboard_plugins_cache = _discover_dashboard_plugins()
return _dashboard_plugins_cache
@app.get("/api/dashboard/plugins")
async def get_dashboard_plugins():
"""Return discovered dashboard plugins."""
plugins = _get_dashboard_plugins()
# Strip internal fields before sending to frontend.
return [
{k: v for k, v in p.items() if not k.startswith("_")}
for p in plugins
]
@app.get("/api/dashboard/plugins/rescan")
async def rescan_dashboard_plugins():
"""Force re-scan of dashboard plugins."""
plugins = _get_dashboard_plugins(force_rescan=True)
return {"ok": True, "count": len(plugins)}
@app.get("/dashboard-plugins/{plugin_name}/{file_path:path}")
async def serve_plugin_asset(plugin_name: str, file_path: str):
"""Serve static assets from a dashboard plugin directory.
Only serves files from the plugin's ``dashboard/`` subdirectory.
Path traversal is blocked by checking ``resolve().is_relative_to()``.
"""
plugins = _get_dashboard_plugins()
plugin = next((p for p in plugins if p["name"] == plugin_name), None)
if not plugin:
raise HTTPException(status_code=404, detail="Plugin not found")
base = Path(plugin["_dir"])
target = (base / file_path).resolve()
if not target.is_relative_to(base.resolve()):
raise HTTPException(status_code=403, detail="Path traversal blocked")
if not target.exists() or not target.is_file():
raise HTTPException(status_code=404, detail="File not found")
# Guess content type
suffix = target.suffix.lower()
content_types = {
".js": "application/javascript",
".mjs": "application/javascript",
".css": "text/css",
".json": "application/json",
".html": "text/html",
".svg": "image/svg+xml",
".png": "image/png",
".jpg": "image/jpeg",
".woff2": "font/woff2",
".woff": "font/woff",
}
media_type = content_types.get(suffix, "application/octet-stream")
return FileResponse(target, media_type=media_type)
def _mount_plugin_api_routes():
"""Import and mount backend API routes from plugins that declare them.
Each plugin's ``api`` field points to a Python file that must expose
a ``router`` (FastAPI APIRouter). Routes are mounted under
``/api/plugins/<name>/``.
"""
for plugin in _get_dashboard_plugins():
api_file_name = plugin.get("_api_file")
if not api_file_name:
continue
api_path = Path(plugin["_dir"]) / api_file_name
if not api_path.exists():
_log.warning("Plugin %s declares api=%s but file not found", plugin["name"], api_file_name)
continue
try:
spec = importlib.util.spec_from_file_location(
f"hermes_dashboard_plugin_{plugin['name']}", api_path,
)
if spec is None or spec.loader is None:
continue
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
router = getattr(mod, "router", None)
if router is None:
_log.warning("Plugin %s api file has no 'router' attribute", plugin["name"])
continue
app.include_router(router, prefix=f"/api/plugins/{plugin['name']}")
_log.info("Mounted plugin API routes: /api/plugins/%s/", plugin["name"])
except Exception as exc:
_log.warning("Failed to load plugin %s API routes: %s", plugin["name"], exc)
# Mount plugin API routes before the SPA catch-all.
_mount_plugin_api_routes()
mount_spa(app)
def start_server(
host: str = "127.0.0.1",
port: int = 9119,
open_browser: bool = True,
allow_public: bool = False,
):
"""Start the web UI server."""
import uvicorn
_LOCALHOST = ("127.0.0.1", "localhost", "::1")
if host not in _LOCALHOST and not allow_public:
raise SystemExit(
f"Refusing to bind to {host} — the dashboard exposes API keys "
f"and config without robust authentication.\n"
f"Use --insecure to override (NOT recommended on untrusted networks)."
)
if host not in _LOCALHOST:
_log.warning(
"Binding to %s with --insecure — the dashboard has no robust "
"authentication. Only use on trusted networks.", host,
)
# Record the bound host so host_header_middleware can validate incoming
# Host headers against it. Defends against DNS rebinding (GHSA-ppp5-vxwm-4cf7).
app.state.bound_host = host
if open_browser:
import webbrowser
def _open():
time.sleep(1.0)
webbrowser.open(f"http://{host}:{port}")
threading.Thread(target=_open, daemon=True).start()
print(f" Hermes Web UI → http://{host}:{port}")
uvicorn.run(app, host=host, port=port, log_level="warning")