Merge branch 'main' of github.com:NousResearch/hermes-agent into feat/ink-refactor

This commit is contained in:
Brooklyn Nicholson 2026-04-14 19:33:03 -05:00
commit 77cd5bf565
28 changed files with 2378 additions and 541 deletions

View file

@ -36,6 +36,7 @@ _PROVIDER_PREFIXES: frozenset[str] = frozenset({
"opencode", "zen", "go", "vercel", "kilo", "dashscope", "aliyun", "qwen",
"mimo", "xiaomi-mimo",
"arcee-ai", "arceeai",
"xai", "x-ai", "x.ai", "grok",
"qwen-portal",
})

19
cli.py
View file

@ -1027,6 +1027,7 @@ def _prune_orphaned_branches(repo_root: str) -> None:
_ACCENT_ANSI_DEFAULT = "\033[1;38;2;255;215;0m" # True-color #FFD700 bold — fallback
_BOLD = "\033[1m"
_RST = "\033[0m"
_STREAM_PAD = " " # 4-space indent for streamed response text (matches Panel padding)
def _hex_to_ansi(hex_color: str, *, bold: bool = False) -> str:
@ -1756,9 +1757,9 @@ class HermesCLI:
# Parse and validate toolsets
self.enabled_toolsets = toolsets
if toolsets and "all" not in toolsets and "*" not in toolsets:
# Validate each toolset — MCP server names are added by
# _get_platform_tools() but aren't registered in TOOLSETS yet
# (that happens later in _sync_mcp_toolsets), so exclude them.
# Validate each toolset — MCP server names are resolved via
# live registry aliases (registered during discover_mcp_tools),
# but discovery hasn't run yet at this point, so exclude them.
mcp_names = set((CLI_CONFIG.get("mcp_servers") or {}).keys())
invalid = [t for t in toolsets if not validate_toolset(t) and t not in mcp_names]
if invalid:
@ -2624,7 +2625,7 @@ class HermesCLI:
_tc = getattr(self, "_stream_text_ansi", "")
while "\n" in self._stream_buf:
line, self._stream_buf = self._stream_buf.split("\n", 1)
_cprint(f"{_tc}{line}{_RST}" if _tc else line)
_cprint(f"{_STREAM_PAD}{_tc}{line}{_RST}" if _tc else f"{_STREAM_PAD}{line}")
def _flush_stream(self) -> None:
"""Emit any remaining partial line from the stream buffer and close the box."""
@ -2641,7 +2642,7 @@ class HermesCLI:
if self._stream_buf:
_tc = getattr(self, "_stream_text_ansi", "")
_cprint(f"{_tc}{self._stream_buf}{_RST}" if _tc else self._stream_buf)
_cprint(f"{_STREAM_PAD}{_tc}{self._stream_buf}{_RST}" if _tc else f"{_STREAM_PAD}{self._stream_buf}")
self._stream_buf = ""
# Close the response box
@ -5869,7 +5870,7 @@ class HermesCLI:
border_style=_resp_color,
style=_resp_text,
box=rich_box.HORIZONTALS,
padding=(1, 2),
padding=(1, 4),
))
else:
_cprint(" (No response generated)")
@ -5993,7 +5994,7 @@ class HermesCLI:
title_align="left",
border_style=_resp_color,
box=rich_box.HORIZONTALS,
padding=(1, 2),
padding=(1, 4),
))
else:
_cprint(" 💬 /btw: (no response)")
@ -7756,7 +7757,7 @@ class HermesCLI:
label = " ⚕ Hermes "
fill = w - 2 - len(label)
_cprint(f"\n{_ACCENT}╭─{label}{'' * max(fill - 1, 0)}{_RST}")
_cprint(sentence.rstrip())
_cprint(f"{_STREAM_PAD}{sentence.rstrip()}")
tts_thread = threading.Thread(
target=stream_tts_to_speaker,
@ -7987,7 +7988,7 @@ class HermesCLI:
border_style=_resp_color,
style=_resp_text,
box=rich_box.HORIZONTALS,
padding=(1, 2),
padding=(1, 4),
))

View file

@ -1696,6 +1696,10 @@ class DiscordAdapter(BasePlatformAdapter):
async def slash_update(interaction: discord.Interaction):
await self._run_simple_slash(interaction, "/update", "Update initiated~")
@tree.command(name="restart", description="Gracefully restart the Hermes gateway")
async def slash_restart(interaction: discord.Interaction):
await self._run_simple_slash(interaction, "/restart", "Restart requested~")
@tree.command(name="approve", description="Approve a pending dangerous command")
@discord.app_commands.describe(scope="Optional: 'all', 'session', 'always', 'all session', 'all always'")
async def slash_approve(interaction: discord.Interaction, scope: str = ""):
@ -1736,46 +1740,90 @@ class DiscordAdapter(BasePlatformAdapter):
async def slash_btw(interaction: discord.Interaction, question: str):
await self._run_simple_slash(interaction, f"/btw {question}")
# Register installed skills as native slash commands (parity with
# Telegram, which uses telegram_menu_commands() in commands.py).
# Discord allows up to 100 application commands globally.
_DISCORD_CMD_LIMIT = 100
# Register skills under a single /skill command group with category
# subcommand groups. This uses 1 top-level slot instead of N,
# supporting up to 25 categories × 25 skills = 625 skills.
self._register_skill_group(tree)
def _register_skill_group(self, tree) -> None:
"""Register a ``/skill`` command group with category subcommand groups.
Skills are organized by their directory category under ``SKILLS_DIR``.
Each category becomes a subcommand group; root-level skills become
direct subcommands. Discord supports 25 subcommand groups × 25
subcommands each = 625 skills well beyond the old 100-command cap.
"""
try:
from hermes_cli.commands import discord_skill_commands
from hermes_cli.commands import discord_skill_commands_by_category
existing_names = {cmd.name for cmd in tree.get_commands()}
remaining_slots = max(0, _DISCORD_CMD_LIMIT - len(existing_names))
existing_names = set()
try:
existing_names = {cmd.name for cmd in tree.get_commands()}
except Exception:
pass
skill_entries, skipped = discord_skill_commands(
max_slots=remaining_slots,
categories, uncategorized, hidden = discord_skill_commands_by_category(
reserved_names=existing_names,
)
for discord_name, description, cmd_key in skill_entries:
# Closure factory to capture cmd_key per iteration
def _make_skill_handler(_key: str):
async def _skill_slash(interaction: discord.Interaction, args: str = ""):
await self._run_simple_slash(interaction, f"{_key} {args}".strip())
return _skill_slash
if not categories and not uncategorized:
return
handler = _make_skill_handler(cmd_key)
handler.__name__ = f"skill_{discord_name.replace('-', '_')}"
skill_group = discord.app_commands.Group(
name="skill",
description="Run a Hermes skill",
)
# ── Helper: build a callback for a skill command key ──
def _make_handler(_key: str):
@discord.app_commands.describe(args="Optional arguments for the skill")
async def _handler(interaction: discord.Interaction, args: str = ""):
await self._run_simple_slash(interaction, f"{_key} {args}".strip())
_handler.__name__ = f"skill_{_key.lstrip('/').replace('-', '_')}"
return _handler
# ── Uncategorized (root-level) skills → direct subcommands ──
for discord_name, description, cmd_key in uncategorized:
cmd = discord.app_commands.Command(
name=discord_name,
description=description,
callback=handler,
description=description or f"Run the {discord_name} skill",
callback=_make_handler(cmd_key),
)
discord.app_commands.describe(args="Optional arguments for the skill")(cmd)
tree.add_command(cmd)
skill_group.add_command(cmd)
if skipped:
# ── Category subcommand groups ──
for cat_name in sorted(categories):
cat_desc = f"{cat_name.replace('-', ' ').title()} skills"
if len(cat_desc) > 100:
cat_desc = cat_desc[:97] + "..."
cat_group = discord.app_commands.Group(
name=cat_name,
description=cat_desc,
parent=skill_group,
)
for discord_name, description, cmd_key in categories[cat_name]:
cmd = discord.app_commands.Command(
name=discord_name,
description=description or f"Run the {discord_name} skill",
callback=_make_handler(cmd_key),
)
cat_group.add_command(cmd)
tree.add_command(skill_group)
total = sum(len(v) for v in categories.values()) + len(uncategorized)
logger.info(
"[%s] Registered /skill group: %d skill(s) across %d categories"
" + %d uncategorized",
self.name, total, len(categories), len(uncategorized),
)
if hidden:
logger.warning(
"[%s] Discord slash command limit reached (%d): %d skill(s) not registered",
self.name, _DISCORD_CMD_LIMIT, skipped,
"[%s] %d skill(s) not registered (Discord subcommand limits)",
self.name, hidden,
)
except Exception as exc:
logger.warning("[%s] Failed to register skill slash commands: %s", self.name, exc)
logger.warning("[%s] Failed to register /skill group: %s", self.name, exc)
def _build_slash_event(self, interaction: discord.Interaction, text: str) -> MessageEvent:
"""Build a MessageEvent from a Discord slash command interaction."""

View file

@ -1405,7 +1405,7 @@ class GatewayRunner:
action = "restarting" if self._restart_requested else "shutting down"
hint = (
"Your current task will be interrupted. "
"Use /retry after restart to continue."
"Send any message after restart to resume where it left off."
if self._restart_requested
else "Your current task will be interrupted."
)
@ -1475,6 +1475,106 @@ class GatewayRunner:
except Exception:
pass
_STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend
_STUCK_LOOP_FILE = ".restart_failure_counts"
def _increment_restart_failure_counts(self, active_session_keys: set) -> None:
"""Increment restart-failure counters for sessions active at shutdown.
Persists to a JSON file so counters survive across restarts.
Sessions NOT in active_session_keys are removed (they completed
successfully, so the loop is broken).
"""
import json
path = _hermes_home / self._STUCK_LOOP_FILE
try:
counts = json.loads(path.read_text()) if path.exists() else {}
except Exception:
counts = {}
# Increment active sessions, remove inactive ones (loop broken)
new_counts = {}
for key in active_session_keys:
new_counts[key] = counts.get(key, 0) + 1
# Keep any entries that are still above 0 even if not active now
# (they might become active again next restart)
try:
path.write_text(json.dumps(new_counts))
except Exception:
pass
def _suspend_stuck_loop_sessions(self) -> int:
"""Suspend sessions that have been active across too many restarts.
Returns the number of sessions suspended. Called on gateway startup
AFTER suspend_recently_active() to catch the stuck-loop pattern:
session loads agent gets stuck gateway restarts repeat.
"""
import json
path = _hermes_home / self._STUCK_LOOP_FILE
if not path.exists():
return 0
try:
counts = json.loads(path.read_text())
except Exception:
return 0
suspended = 0
stuck_keys = [k for k, v in counts.items() if v >= self._STUCK_LOOP_THRESHOLD]
for session_key in stuck_keys:
try:
entry = self.session_store._entries.get(session_key)
if entry and not entry.suspended:
entry.suspended = True
suspended += 1
logger.warning(
"Auto-suspended stuck session %s (active across %d "
"consecutive restarts — likely a stuck loop)",
session_key[:30], counts[session_key],
)
except Exception:
pass
if suspended:
try:
self.session_store._save()
except Exception:
pass
# Clear the file — counters start fresh after suspension
try:
path.unlink(missing_ok=True)
except Exception:
pass
return suspended
def _clear_restart_failure_count(self, session_key: str) -> None:
"""Clear the restart-failure counter for a session that completed OK.
Called after a successful agent turn to signal the loop is broken.
"""
import json
path = _hermes_home / self._STUCK_LOOP_FILE
if not path.exists():
return
try:
counts = json.loads(path.read_text())
if session_key in counts:
del counts[session_key]
if counts:
path.write_text(json.dumps(counts))
else:
path.unlink(missing_ok=True)
except Exception:
pass
async def _launch_detached_restart_command(self) -> None:
import shutil
import subprocess
@ -1618,6 +1718,17 @@ class GatewayRunner:
except Exception as e:
logger.warning("Session suspension on startup failed: %s", e)
# Stuck-loop detection (#7536): if a session has been active across
# 3+ consecutive restarts, it's probably stuck in a loop (the same
# history keeps causing the agent to hang). Auto-suspend it so the
# user gets a clean slate on the next message.
try:
stuck = self._suspend_stuck_loop_sessions()
if stuck:
logger.warning("Auto-suspended %d stuck-loop session(s)", stuck)
except Exception as e:
logger.debug("Stuck-loop detection failed: %s", e)
connected_count = 0
enabled_platform_count = 0
startup_nonretryable_errors: list[str] = []
@ -2169,6 +2280,14 @@ class GatewayRunner:
"active sessions."
)
# Track sessions that were active at shutdown for stuck-loop
# detection (#7536). On each restart, the counter increments
# for sessions that were running. If a session hits the
# threshold (3 consecutive restarts while active), the next
# startup auto-suspends it — breaking the loop.
if active_agents:
self._increment_restart_failure_counts(set(active_agents.keys()))
if self._restart_requested and self._restart_via_service:
self._exit_code = GATEWAY_SERVICE_RESTART_EXIT_CODE
self._exit_reason = self._exit_reason or "Gateway restart requested"
@ -3674,6 +3793,12 @@ class GatewayRunner:
_response_time, _api_calls, _resp_len,
)
# Successful turn — clear any stuck-loop counter for this session.
# This ensures the counter only accumulates across CONSECUTIVE
# restarts where the session was active (never completed).
if session_key:
self._clear_restart_failure_count(session_key)
# Surface error details when the agent failed silently (final_response=None)
if not response and agent_result.get("failed"):
error_detail = agent_result.get("error", "unknown error")
@ -8547,6 +8672,21 @@ class GatewayRunner:
if _msn:
message = _msn + "\n\n" + message
# Auto-continue: if the loaded history ends with a tool result,
# the previous agent turn was interrupted mid-work (gateway
# restart, crash, SIGTERM). Prepend a system note so the model
# finishes processing the pending tool results before addressing
# the user's new message. (#4493)
if agent_history and agent_history[-1].get("role") == "tool":
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "
"tool outputs you haven't responded to yet. Please finish processing "
"those results and summarize what was accomplished, then address the "
"user's new message below.]\n\n"
+ message
)
_approval_session_key = session_key or ""
_approval_session_token = set_current_session_key(_approval_session_key)
register_gateway_notify(_approval_session_key, _approval_notify_sync)
@ -9370,6 +9510,29 @@ async def start_gateway(config: Optional[GatewayConfig] = None, replace: bool =
nonlocal _signal_initiated_shutdown
_signal_initiated_shutdown = True
logger.info("Received SIGTERM/SIGINT — initiating shutdown")
# Diagnostic: log all hermes-related processes so we can identify
# what triggered the signal (hermes update, hermes gateway restart,
# a stale detached subprocess, etc.).
try:
import subprocess as _sp
_ps = _sp.run(
["ps", "aux"],
capture_output=True, text=True, timeout=3,
)
_hermes_procs = [
line for line in _ps.stdout.splitlines()
if ("hermes" in line.lower() or "gateway" in line.lower())
and str(os.getpid()) not in line.split()[1:2] # exclude self
]
if _hermes_procs:
logger.warning(
"Shutdown diagnostic — other hermes processes running:\n %s",
"\n ".join(_hermes_procs),
)
else:
logger.info("Shutdown diagnostic — no other hermes processes found")
except Exception:
pass
asyncio.create_task(runner.stop())
def restart_signal_handler():

View file

@ -586,6 +586,116 @@ def discord_skill_commands(
)
def discord_skill_commands_by_category(
reserved_names: set[str],
) -> tuple[dict[str, list[tuple[str, str, str]]], list[tuple[str, str, str]], int]:
"""Return skill entries organized by category for Discord ``/skill`` subcommand groups.
Skills whose directory is nested at least 2 levels under ``SKILLS_DIR``
(e.g. ``creative/ascii-art/SKILL.md``) are grouped by their top-level
category. Root-level skills (e.g. ``dogfood/SKILL.md``) are returned as
*uncategorized* the caller should register them as direct subcommands
of the ``/skill`` group.
The same filtering as :func:`discord_skill_commands` is applied: hub
skills excluded, per-platform disabled excluded, names clamped.
Returns:
``(categories, uncategorized, hidden_count)``
- *categories*: ``{category_name: [(name, description, cmd_key), ...]}``
- *uncategorized*: ``[(name, description, cmd_key), ...]``
- *hidden_count*: skills dropped due to Discord group limits
(25 subcommand groups, 25 subcommands per group)
"""
from pathlib import Path as _P
_platform_disabled: set[str] = set()
try:
from agent.skill_utils import get_disabled_skill_names
_platform_disabled = get_disabled_skill_names(platform="discord")
except Exception:
pass
# Collect raw skill data --------------------------------------------------
categories: dict[str, list[tuple[str, str, str]]] = {}
uncategorized: list[tuple[str, str, str]] = []
_names_used: set[str] = set(reserved_names)
hidden = 0
try:
from agent.skill_commands import get_skill_commands
from tools.skills_tool import SKILLS_DIR
_skills_dir = SKILLS_DIR.resolve()
_hub_dir = (SKILLS_DIR / ".hub").resolve()
skill_cmds = get_skill_commands()
for cmd_key in sorted(skill_cmds):
info = skill_cmds[cmd_key]
skill_path = info.get("skill_md_path", "")
if not skill_path:
continue
sp = _P(skill_path).resolve()
# Skip skills outside SKILLS_DIR or from the hub
if not str(sp).startswith(str(_skills_dir)):
continue
if str(sp).startswith(str(_hub_dir)):
continue
skill_name = info.get("name", "")
if skill_name in _platform_disabled:
continue
raw_name = cmd_key.lstrip("/")
# Clamp to 32 chars (Discord limit)
discord_name = raw_name[:32]
if discord_name in _names_used:
continue
_names_used.add(discord_name)
desc = info.get("description", "")
if len(desc) > 100:
desc = desc[:97] + "..."
# Determine category from the relative path within SKILLS_DIR.
# e.g. creative/ascii-art/SKILL.md → parts = ("creative", "ascii-art")
try:
rel = sp.parent.relative_to(_skills_dir)
except ValueError:
continue
parts = rel.parts
if len(parts) >= 2:
cat = parts[0]
categories.setdefault(cat, []).append((discord_name, desc, cmd_key))
else:
uncategorized.append((discord_name, desc, cmd_key))
except Exception:
pass
# Enforce Discord limits: 25 subcommand groups, 25 subcommands each ------
_MAX_GROUPS = 25
_MAX_PER_GROUP = 25
trimmed_categories: dict[str, list[tuple[str, str, str]]] = {}
group_count = 0
for cat in sorted(categories):
if group_count >= _MAX_GROUPS:
hidden += len(categories[cat])
continue
entries = categories[cat][:_MAX_PER_GROUP]
hidden += max(0, len(categories[cat]) - _MAX_PER_GROUP)
trimmed_categories[cat] = entries
group_count += 1
# Uncategorized skills also count against the 25 top-level limit
remaining_slots = _MAX_GROUPS - group_count
if len(uncategorized) > remaining_slots:
hidden += len(uncategorized) - remaining_slots
uncategorized = uncategorized[:remaining_slots]
return trimmed_categories, uncategorized, hidden
def slack_subcommand_map() -> dict[str, str]:
"""Return subcommand -> /command mapping for Slack /hermes handler.

View file

@ -1128,7 +1128,62 @@ def systemd_restart(system: bool = False):
pid = get_running_pid()
if pid is not None and _request_gateway_self_restart(pid):
print(f"{_service_scope_label(system).capitalize()} service restart requested")
# SIGUSR1 sent — the gateway will drain active agents, exit with
# code 75, and systemd will restart it after RestartSec (30s).
# Wait for the old process to die and the new one to become active
# so the CLI doesn't return while the service is still restarting.
import time
scope_label = _service_scope_label(system).capitalize()
svc = get_service_name()
scope_cmd = _systemctl_cmd(system)
# Phase 1: wait for old process to exit (drain + shutdown)
print(f"{scope_label} service draining active work...")
deadline = time.time() + 90
while time.time() < deadline:
try:
os.kill(pid, 0)
time.sleep(1)
except (ProcessLookupError, PermissionError):
break # old process is gone
else:
print(f"⚠ Old process (PID {pid}) still alive after 90s")
# Phase 2: wait for systemd to start the new process
print(f"⏳ Waiting for {svc} to restart...")
deadline = time.time() + 60
while time.time() < deadline:
try:
result = subprocess.run(
scope_cmd + ["is-active", svc],
capture_output=True, text=True, timeout=5,
)
if result.stdout.strip() == "active":
# Verify it's a NEW process, not the old one somehow
new_pid = get_running_pid()
if new_pid and new_pid != pid:
print(f"{scope_label} service restarted (PID {new_pid})")
return
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
time.sleep(2)
# Timed out — check final state
try:
result = subprocess.run(
scope_cmd + ["is-active", svc],
capture_output=True, text=True, timeout=5,
)
if result.stdout.strip() == "active":
print(f"{scope_label} service restarted")
return
except Exception:
pass
print(
f"{scope_label} service did not become active within 60s.\n"
f" Check status: {'sudo ' if system else ''}hermes gateway status\n"
f" Check logs: journalctl {'--user ' if not system else ''}-u {svc} --since '2 min ago'"
)
return
_run_systemctl(["reload-or-restart", get_service_name()], system=system, check=True, timeout=90)
print(f"{_service_scope_label(system).capitalize()} service restarted")

View file

@ -121,6 +121,7 @@ TOOL_CATEGORIES = {
"providers": [
{
"name": "Nous Subscription",
"badge": "subscription",
"tag": "Managed OpenAI TTS billed to your subscription",
"env_vars": [],
"tts_provider": "openai",
@ -130,13 +131,15 @@ TOOL_CATEGORIES = {
},
{
"name": "Microsoft Edge TTS",
"tag": "Free - no API key needed",
"badge": "★ recommended · free",
"tag": "Good quality, no API key needed",
"env_vars": [],
"tts_provider": "edge",
},
{
"name": "OpenAI TTS",
"tag": "Premium - high quality voices",
"badge": "paid",
"tag": "High quality voices",
"env_vars": [
{"key": "VOICE_TOOLS_OPENAI_KEY", "prompt": "OpenAI API key", "url": "https://platform.openai.com/api-keys"},
],
@ -144,7 +147,8 @@ TOOL_CATEGORIES = {
},
{
"name": "ElevenLabs",
"tag": "Premium - most natural voices",
"badge": "paid",
"tag": "Most natural voices",
"env_vars": [
{"key": "ELEVENLABS_API_KEY", "prompt": "ElevenLabs API key", "url": "https://elevenlabs.io/app/settings/api-keys"},
],
@ -152,7 +156,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Mistral (Voxtral TTS)",
"tag": "Multilingual, native Opus, needs MISTRAL_API_KEY",
"badge": "paid",
"tag": "Multilingual, native Opus",
"env_vars": [
{"key": "MISTRAL_API_KEY", "prompt": "Mistral API key", "url": "https://console.mistral.ai/"},
],
@ -168,6 +173,7 @@ TOOL_CATEGORIES = {
"providers": [
{
"name": "Nous Subscription",
"badge": "subscription",
"tag": "Managed Firecrawl billed to your subscription",
"web_backend": "firecrawl",
"env_vars": [],
@ -177,7 +183,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Firecrawl Cloud",
"tag": "Hosted service - search, extract, and crawl",
"badge": "★ recommended",
"tag": "Full-featured search, extract, and crawl",
"web_backend": "firecrawl",
"env_vars": [
{"key": "FIRECRAWL_API_KEY", "prompt": "Firecrawl API key", "url": "https://firecrawl.dev"},
@ -185,7 +192,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Exa",
"tag": "AI-native search and contents",
"badge": "paid",
"tag": "Neural search with semantic understanding",
"web_backend": "exa",
"env_vars": [
{"key": "EXA_API_KEY", "prompt": "Exa API key", "url": "https://exa.ai"},
@ -193,7 +201,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Parallel",
"tag": "AI-native search and extract",
"badge": "paid",
"tag": "AI-powered search and extract",
"web_backend": "parallel",
"env_vars": [
{"key": "PARALLEL_API_KEY", "prompt": "Parallel API key", "url": "https://parallel.ai"},
@ -201,7 +210,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Tavily",
"tag": "AI-native search, extract, and crawl",
"badge": "free tier",
"tag": "Search, extract, and crawl — 1000 free searches/mo",
"web_backend": "tavily",
"env_vars": [
{"key": "TAVILY_API_KEY", "prompt": "Tavily API key", "url": "https://app.tavily.com/home"},
@ -209,7 +219,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Firecrawl Self-Hosted",
"tag": "Free - run your own instance",
"badge": "free · self-hosted",
"tag": "Run your own Firecrawl instance (Docker)",
"web_backend": "firecrawl",
"env_vars": [
{"key": "FIRECRAWL_API_URL", "prompt": "Your Firecrawl instance URL (e.g., http://localhost:3002)"},
@ -223,6 +234,7 @@ TOOL_CATEGORIES = {
"providers": [
{
"name": "Nous Subscription",
"badge": "subscription",
"tag": "Managed FAL image generation billed to your subscription",
"env_vars": [],
"requires_nous_auth": True,
@ -231,6 +243,7 @@ TOOL_CATEGORIES = {
},
{
"name": "FAL.ai",
"badge": "paid",
"tag": "FLUX 2 Pro with auto-upscaling",
"env_vars": [
{"key": "FAL_KEY", "prompt": "FAL API key", "url": "https://fal.ai/dashboard/keys"},
@ -244,6 +257,7 @@ TOOL_CATEGORIES = {
"providers": [
{
"name": "Nous Subscription (Browser Use cloud)",
"badge": "subscription",
"tag": "Managed Browser Use billed to your subscription",
"env_vars": [],
"browser_provider": "browser-use",
@ -254,14 +268,16 @@ TOOL_CATEGORIES = {
},
{
"name": "Local Browser",
"tag": "Free headless Chromium (no API key needed)",
"badge": "★ recommended · free",
"tag": "Headless Chromium, no API key needed",
"env_vars": [],
"browser_provider": "local",
"post_setup": "agent_browser",
},
{
"name": "Browserbase",
"tag": "Cloud browser with stealth & proxies",
"badge": "paid",
"tag": "Cloud browser with stealth and proxies",
"env_vars": [
{"key": "BROWSERBASE_API_KEY", "prompt": "Browserbase API key", "url": "https://browserbase.com"},
{"key": "BROWSERBASE_PROJECT_ID", "prompt": "Browserbase project ID"},
@ -271,6 +287,7 @@ TOOL_CATEGORIES = {
},
{
"name": "Browser Use",
"badge": "paid",
"tag": "Cloud browser with remote execution",
"env_vars": [
{"key": "BROWSER_USE_API_KEY", "prompt": "Browser Use API key", "url": "https://browser-use.com"},
@ -280,6 +297,7 @@ TOOL_CATEGORIES = {
},
{
"name": "Firecrawl",
"badge": "paid",
"tag": "Cloud browser with remote execution",
"env_vars": [
{"key": "FIRECRAWL_API_KEY", "prompt": "Firecrawl API key", "url": "https://firecrawl.dev"},
@ -289,7 +307,8 @@ TOOL_CATEGORIES = {
},
{
"name": "Camofox",
"tag": "Local anti-detection browser (Firefox/Camoufox)",
"badge": "free · local",
"tag": "Anti-detection browser (Firefox/Camoufox)",
"env_vars": [
{"key": "CAMOFOX_URL", "prompt": "Camofox server URL", "default": "http://localhost:9377",
"url": "https://github.com/jo-inc/camofox-browser"},
@ -838,7 +857,8 @@ def _configure_tool_category(ts_key: str, cat: dict, config: dict):
# Plain text labels only (no ANSI codes in menu items)
provider_choices = []
for p in providers:
tag = f" ({p['tag']})" if p.get("tag") else ""
badge = f" [{p['badge']}]" if p.get("badge") else ""
tag = f"{p['tag']}" if p.get("tag") else ""
configured = ""
env_vars = p.get("env_vars", [])
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
@ -848,7 +868,7 @@ def _configure_tool_category(ts_key: str, cat: dict, config: dict):
configured = ""
else:
configured = " [configured]"
provider_choices.append(f"{p['name']}{tag}{configured}")
provider_choices.append(f"{p['name']}{badge}{tag}{configured}")
# Add skip option
provider_choices.append("Skip — keep defaults / configure later")
@ -1104,7 +1124,8 @@ def _configure_tool_category_for_reconfig(ts_key: str, cat: dict, config: dict):
provider_choices = []
for p in providers:
tag = f" ({p['tag']})" if p.get("tag") else ""
badge = f" [{p['badge']}]" if p.get("badge") else ""
tag = f"{p['tag']}" if p.get("tag") else ""
configured = ""
env_vars = p.get("env_vars", [])
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
@ -1114,7 +1135,7 @@ def _configure_tool_category_for_reconfig(ts_key: str, cat: dict, config: dict):
configured = ""
else:
configured = " [configured]"
provider_choices.append(f"{p['name']}{tag}{configured}")
provider_choices.append(f"{p['name']}{badge}{tag}{configured}")
default_idx = _detect_active_provider_index(providers, config)

View file

@ -6976,6 +6976,31 @@ class AIAgent:
skip_pre_tool_call_hook=True,
)
@staticmethod
def _wrap_verbose(label: str, text: str, indent: str = " ") -> str:
"""Word-wrap verbose tool output to fit the terminal width.
Splits *text* on existing newlines and wraps each line individually,
preserving intentional line breaks (e.g. pretty-printed JSON).
Returns a ready-to-print string with *label* on the first line and
continuation lines indented.
"""
import shutil as _shutil
import textwrap as _tw
cols = _shutil.get_terminal_size((120, 24)).columns
wrap_width = max(40, cols - len(indent))
out_lines: list[str] = []
for raw_line in text.split("\n"):
if len(raw_line) <= wrap_width:
out_lines.append(raw_line)
else:
wrapped = _tw.wrap(raw_line, width=wrap_width,
break_long_words=True,
break_on_hyphens=False)
out_lines.extend(wrapped or [raw_line])
body = ("\n" + indent).join(out_lines)
return f"{indent}{label}{body}"
def _execute_tool_calls_concurrent(self, assistant_message, messages: list, effective_task_id: str, api_call_count: int = 0) -> None:
"""Execute multiple tool calls concurrently using a thread pool.
@ -7046,7 +7071,7 @@ class AIAgent:
args_str = json.dumps(args, ensure_ascii=False)
if self.verbose_logging:
print(f" 📞 Tool {i}: {name}({list(args.keys())})")
print(f" Args: {args_str}")
print(self._wrap_verbose("Args: ", json.dumps(args, indent=2, ensure_ascii=False)))
else:
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {name}({list(args.keys())}) - {args_preview}")
@ -7144,7 +7169,7 @@ class AIAgent:
elif not self.quiet_mode:
if self.verbose_logging:
print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s")
print(f" Result: {function_result}")
print(self._wrap_verbose("Result: ", function_result))
else:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s - {response_preview}")
@ -7237,7 +7262,7 @@ class AIAgent:
args_str = json.dumps(function_args, ensure_ascii=False)
if self.verbose_logging:
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())})")
print(f" Args: {args_str}")
print(self._wrap_verbose("Args: ", json.dumps(function_args, indent=2, ensure_ascii=False)))
else:
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())}) - {args_preview}")
@ -7525,7 +7550,7 @@ class AIAgent:
if not self.quiet_mode:
if self.verbose_logging:
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s")
print(f" Result: {function_result}")
print(self._wrap_verbose("Result: ", function_result))
else:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}")

View file

@ -95,7 +95,9 @@ AUTHOR_MAP = {
"vincentcharlebois@gmail.com": "vincentcharlebois",
"aryan@synvoid.com": "aryansingh",
"johnsonblake1@gmail.com": "blakejohnson",
"greer.guthrie@gmail.com": "g-guthrie",
"kennyx102@gmail.com": "bobashopcashier",
"shokatalishaikh95@gmail.com": "areu01or00",
"bryan@intertwinesys.com": "bryanyoung",
"christo.mitov@gmail.com": "christomitov",
"hermes@nousresearch.com": "NousResearch",

View file

@ -1,35 +1,19 @@
---
name: google-workspace
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via gws CLI (googleworkspace/cli). Uses OAuth2 with automatic token refresh via bridge script. Requires gws binary.
version: 2.0.0
description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise.
version: 1.0.0
author: Nous Research
license: MIT
required_credential_files:
- path: google_token.json
description: Google OAuth2 token (created by setup script)
- path: google_client_secret.json
description: Google OAuth2 client credentials (downloaded from Google Cloud Console)
metadata:
hermes:
tags: [Google, Gmail, Calendar, Drive, Sheets, Docs, Contacts, Email, OAuth, gws]
tags: [Google, Gmail, Calendar, Drive, Sheets, Docs, Contacts, Email, OAuth]
homepage: https://github.com/NousResearch/hermes-agent
related_skills: [himalaya]
---
# Google Workspace
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — powered by `gws` (Google's official Rust CLI). The skill provides a backward-compatible Python wrapper that handles OAuth token refresh and delegates to `gws`.
## Architecture
```
google_api.py → gws_bridge.py → gws CLI
(argparse compat) (token refresh) (Google APIs)
```
- `setup.py` handles OAuth2 (headless-compatible, works on CLI/Telegram/Discord)
- `gws_bridge.py` refreshes the Hermes token and injects it into `gws` via `GOOGLE_WORKSPACE_CLI_TOKEN`
- `google_api.py` provides the same CLI interface as v1 but delegates to `gws`
Gmail, Calendar, Drive, Contacts, Sheets, and Docs — through Hermes-managed OAuth and a thin CLI wrapper. When `gws` is installed, the skill uses it as the execution backend for broader Google Workspace coverage; otherwise it falls back to the bundled Python client implementation.
## References
@ -38,22 +22,7 @@ google_api.py → gws_bridge.py → gws CLI
## Scripts
- `scripts/setup.py` — OAuth2 setup (run once to authorize)
- `scripts/gws_bridge.py` — Token refresh bridge to gws CLI
- `scripts/google_api.py` — Backward-compatible API wrapper (delegates to gws)
## Prerequisites
Install `gws`:
```bash
cargo install google-workspace-cli
# or via npm (recommended, downloads prebuilt binary):
npm install -g @googleworkspace/cli
# or via Homebrew:
brew install googleworkspace-cli
```
Verify: `gws --version`
- `scripts/google_api.py` — compatibility wrapper CLI. It prefers `gws` for operations when available, while preserving Hermes' existing JSON output contract.
## First-Time Setup
@ -63,13 +32,7 @@ on CLI, Telegram, Discord, or any platform.
Define a shorthand first:
```bash
HERMES_HOME="${HERMES_HOME:-$HOME/.hermes}"
GWORKSPACE_SKILL_DIR="$HERMES_HOME/skills/productivity/google-workspace"
PYTHON_BIN="${HERMES_PYTHON:-python3}"
if [ -x "$HERMES_HOME/hermes-agent/venv/bin/python" ]; then
PYTHON_BIN="$HERMES_HOME/hermes-agent/venv/bin/python"
fi
GSETUP="$PYTHON_BIN $GWORKSPACE_SKILL_DIR/scripts/setup.py"
GSETUP="python ~/.hermes/skills/productivity/google-workspace/scripts/setup.py"
```
### Step 0: Check if already set up
@ -82,88 +45,166 @@ If it prints `AUTHENTICATED`, skip to Usage — setup is already done.
### Step 1: Triage — ask the user what they need
Before starting OAuth setup, ask the user TWO questions:
**Question 1: "What Google services do you need? Just email, or also
Calendar/Drive/Sheets/Docs?"**
- **Email only** → Use the `himalaya` skill instead — simpler setup.
- **Calendar, Drive, Sheets, Docs (or email + these)** → Continue below.
- **Email only** → They don't need this skill at all. Use the `himalaya` skill
instead — it works with a Gmail App Password (Settings → Security → App
Passwords) and takes 2 minutes to set up. No Google Cloud project needed.
Load the himalaya skill and follow its setup instructions.
**Partial scopes**: Users can authorize only a subset of services. The setup
script accepts partial scopes and warns about missing ones.
- **Email + Calendar** → Continue with this skill, but use
`--services email,calendar` during auth so the consent screen only asks for
the scopes they actually need.
**Question 2: "Does your Google account use Advanced Protection?"**
- **Calendar/Drive/Sheets/Docs only** → Continue with this skill and use a
narrower `--services` set like `calendar,drive,sheets,docs`.
- **No / Not sure** → Normal setup.
- **Yes** → Workspace admin must add the OAuth client ID to allowed apps first.
- **Full Workspace access** → Continue with this skill and use the default
`all` service set.
**Question 2: "Does your Google account use Advanced Protection (hardware
security keys required to sign in)? If you're not sure, you probably don't
— it's something you would have explicitly enrolled in."**
- **No / Not sure** → Normal setup. Continue below.
- **Yes** → Their Workspace admin must add the OAuth client ID to the org's
allowed apps list before Step 4 will work. Let them know upfront.
### Step 2: Create OAuth credentials (one-time, ~5 minutes)
Tell the user:
> 1. Go to https://console.cloud.google.com/apis/credentials
> 2. Create a project (or use an existing one)
> 3. Enable the APIs you need (Gmail, Calendar, Drive, Sheets, Docs, People)
> 4. Credentials → Create Credentials → OAuth 2.0 Client ID → Desktop app
> 5. Download JSON and tell me the file path
> You need a Google Cloud OAuth client. This is a one-time setup:
>
> 1. Create or select a project:
> https://console.cloud.google.com/projectselector2/home/dashboard
> 2. Enable the required APIs from the API Library:
> https://console.cloud.google.com/apis/library
> Enable: Gmail API, Google Calendar API, Google Drive API,
> Google Sheets API, Google Docs API, People API
> 3. Create the OAuth client here:
> https://console.cloud.google.com/apis/credentials
> Credentials → Create Credentials → OAuth 2.0 Client ID
> 4. Application type: "Desktop app" → Create
> 5. If the app is still in Testing, add the user's Google account as a test user here:
> https://console.cloud.google.com/auth/audience
> Audience → Test users → Add users
> 6. Download the JSON file and tell me the file path
>
> Important Hermes CLI note: if the file path starts with `/`, do NOT send only the bare path as its own message in the CLI, because it can be mistaken for a slash command. Send it in a sentence instead, like:
> `The JSON file path is: /home/user/Downloads/client_secret_....json`
Once they provide the path:
```bash
$GSETUP --client-secret /path/to/client_secret.json
```
If they paste the raw client ID / client secret values instead of a file path,
write a valid Desktop OAuth JSON file for them yourself, save it somewhere
explicit (for example `~/Downloads/hermes-google-client-secret.json`), then run
`--client-secret` against that file.
### Step 3: Get authorization URL
Use the service set chosen in Step 1. Examples:
```bash
$GSETUP --auth-url
$GSETUP --auth-url --services email,calendar --format json
$GSETUP --auth-url --services calendar,drive,sheets,docs --format json
$GSETUP --auth-url --services all --format json
```
Send the URL to the user. After authorizing, they paste back the redirect URL or code.
This returns JSON with an `auth_url` field and also saves the exact URL to
`~/.hermes/google_oauth_last_url.txt`.
Agent rules for this step:
- Extract the `auth_url` field and send that exact URL to the user as a single line.
- Tell the user that the browser will likely fail on `http://localhost:1` after approval, and that this is expected.
- Tell them to copy the ENTIRE redirected URL from the browser address bar.
- If the user gets `Error 403: access_denied`, send them directly to `https://console.cloud.google.com/auth/audience` to add themselves as a test user.
### Step 4: Exchange the code
The user will paste back either a URL like `http://localhost:1/?code=4/0A...&scope=...`
or just the code string. Either works. The `--auth-url` step stores a temporary
pending OAuth session locally so `--auth-code` can complete the PKCE exchange
later, even on headless systems:
```bash
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED"
$GSETUP --auth-code "THE_URL_OR_CODE_THE_USER_PASTED" --format json
```
If `--auth-code` fails because the code expired, was already used, or came from
an older browser tab, it now returns a fresh `fresh_auth_url`. In that case,
immediately send the new URL to the user and have them retry with the newest
browser redirect only.
### Step 5: Verify
```bash
$GSETUP --check
```
Should print `AUTHENTICATED`. Token refreshes automatically from now on.
Should print `AUTHENTICATED`. Setup is complete — token refreshes automatically from now on.
### Notes
- Token is stored at `~/.hermes/google_token.json` and auto-refreshes.
- Pending OAuth session state/verifier are stored temporarily at `~/.hermes/google_oauth_pending.json` until exchange completes.
- If `gws` is installed, `google_api.py` points it at the same `~/.hermes/google_token.json` credentials file. Users do not need to run a separate `gws auth login` flow.
- To revoke: `$GSETUP --revoke`
## Usage
All commands go through the API script:
All commands go through the API script. Set `GAPI` as a shorthand:
```bash
HERMES_HOME="${HERMES_HOME:-$HOME/.hermes}"
GWORKSPACE_SKILL_DIR="$HERMES_HOME/skills/productivity/google-workspace"
PYTHON_BIN="${HERMES_PYTHON:-python3}"
if [ -x "$HERMES_HOME/hermes-agent/venv/bin/python" ]; then
PYTHON_BIN="$HERMES_HOME/hermes-agent/venv/bin/python"
fi
GAPI="$PYTHON_BIN $GWORKSPACE_SKILL_DIR/scripts/google_api.py"
GAPI="python ~/.hermes/skills/productivity/google-workspace/scripts/google_api.py"
```
### Gmail
```bash
# Search (returns JSON array with id, from, subject, date, snippet)
$GAPI gmail search "is:unread" --max 10
$GAPI gmail search "from:boss@company.com newer_than:1d"
$GAPI gmail search "has:attachment filename:pdf newer_than:7d"
# Read full message (returns JSON with body text)
$GAPI gmail get MESSAGE_ID
# Send
$GAPI gmail send --to user@example.com --subject "Hello" --body "Message text"
$GAPI gmail send --to user@example.com --subject "Report" --body "<h1>Q4</h1>" --html
$GAPI gmail send --to user@example.com --subject "Report" --body "<h1>Q4</h1><p>Details...</p>" --html
$GAPI gmail send --to user@example.com --subject "Hello" --from '"Research Agent" <user@example.com>' --body "Message text"
# Reply (automatically threads and sets In-Reply-To)
$GAPI gmail reply MESSAGE_ID --body "Thanks, that works for me."
$GAPI gmail reply MESSAGE_ID --from '"Support Bot" <user@example.com>' --body "Thanks"
# Labels
$GAPI gmail labels
$GAPI gmail modify MESSAGE_ID --add-labels LABEL_ID
$GAPI gmail modify MESSAGE_ID --remove-labels UNREAD
```
### Calendar
```bash
# List events (defaults to next 7 days)
$GAPI calendar list
$GAPI calendar create --summary "Standup" --start 2026-03-01T10:00:00+01:00 --end 2026-03-01T10:30:00+01:00
$GAPI calendar create --summary "Review" --start ... --end ... --attendees "alice@co.com,bob@co.com"
$GAPI calendar list --start 2026-03-01T00:00:00Z --end 2026-03-07T23:59:59Z
# Create event (ISO 8601 with timezone required)
$GAPI calendar create --summary "Team Standup" --start 2026-03-01T10:00:00-06:00 --end 2026-03-01T10:30:00-06:00
$GAPI calendar create --summary "Lunch" --start 2026-03-01T12:00:00Z --end 2026-03-01T13:00:00Z --location "Cafe"
$GAPI calendar create --summary "Review" --start 2026-03-01T14:00:00Z --end 2026-03-01T15:00:00Z --attendees "alice@co.com,bob@co.com"
# Delete event
$GAPI calendar delete EVENT_ID
```
@ -183,8 +224,13 @@ $GAPI contacts list --max 20
### Sheets
```bash
# Read
$GAPI sheets get SHEET_ID "Sheet1!A1:D10"
# Write
$GAPI sheets update SHEET_ID "Sheet1!A1:B2" --values '[["Name","Score"],["Alice","95"]]'
# Append rows
$GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
```
@ -194,52 +240,37 @@ $GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
$GAPI docs get DOC_ID
```
### Direct gws access (advanced)
For operations not covered by the wrapper, use `gws_bridge.py` directly:
```bash
GBRIDGE="$PYTHON_BIN $GWORKSPACE_SKILL_DIR/scripts/gws_bridge.py"
$GBRIDGE calendar +agenda --today --format table
$GBRIDGE gmail +triage --labels --format json
$GBRIDGE drive +upload ./report.pdf
$GBRIDGE sheets +read --spreadsheet SHEET_ID --range "Sheet1!A1:D10"
```
## Output Format
All commands return JSON via `gws --format json`. Key output shapes:
All commands return JSON. Parse with `jq` or read directly. Key fields:
- **Gmail search/triage**: Array of message summaries (sender, subject, date, snippet)
- **Gmail get/read**: Message object with headers and body text
- **Gmail send/reply**: Confirmation with message ID
- **Calendar list/agenda**: Array of event objects (summary, start, end, location)
- **Calendar create**: Confirmation with event ID and htmlLink
- **Drive search**: Array of file objects (id, name, mimeType, webViewLink)
- **Sheets get/read**: 2D array of cell values
- **Docs get**: Full document JSON (use `body.content` for text extraction)
- **Contacts list**: Array of person objects with names, emails, phones
Parse output with `jq` or read JSON directly.
- **Gmail search**: `[{id, threadId, from, to, subject, date, snippet, labels}]`
- **Gmail get**: `{id, threadId, from, to, subject, date, labels, body}`
- **Gmail send/reply**: `{status: "sent", id, threadId}`
- **Calendar list**: `[{id, summary, start, end, location, description, htmlLink}]`
- **Calendar create**: `{status: "created", id, summary, htmlLink}`
- **Drive search**: `[{id, name, mimeType, modifiedTime, webViewLink}]`
- **Contacts list**: `[{name, emails: [...], phones: [...]}]`
- **Sheets get**: `[[cell, cell, ...], ...]`
## Rules
1. **Never send email or create/delete events without confirming with the user first.**
2. **Check auth before first use** — run `setup.py --check`.
3. **Use the Gmail search syntax reference** for complex queries.
4. **Calendar times must include timezone**ISO 8601 with offset or UTC.
5. **Respect rate limits** — avoid rapid-fire sequential API calls.
1. **Never send email or create/delete events without confirming with the user first.** Show the draft content and ask for approval.
2. **Check auth before first use** — run `setup.py --check`. If it fails, guide the user through setup.
3. **Use the Gmail search syntax reference** for complex queries — load it with `skill_view("google-workspace", file_path="references/gmail-search-syntax.md")`.
4. **Calendar times must include timezone**always use ISO 8601 with offset (e.g., `2026-03-01T10:00:00-06:00`) or UTC (`Z`).
5. **Respect rate limits** — avoid rapid-fire sequential API calls. Batch reads when possible.
## Troubleshooting
| Problem | Fix |
|---------|-----|
| `NOT_AUTHENTICATED` | Run setup Steps 2-5 |
| `REFRESH_FAILED` | Token revoked — redo Steps 3-5 |
| `gws: command not found` | Install: `npm install -g @googleworkspace/cli` |
| `HttpError 403` | Missing scope — `$GSETUP --revoke` then redo Steps 3-5 |
| `HttpError 403: Access Not Configured` | Enable API in Google Cloud Console |
| Advanced Protection blocks auth | Admin must allowlist the OAuth client ID |
| `NOT_AUTHENTICATED` | Run setup Steps 2-5 above |
| `REFRESH_FAILED` | Token revoked or expired — redo Steps 3-5 |
| `HttpError 403: Insufficient Permission` | Missing API scope — `$GSETUP --revoke` then redo Steps 3-5 |
| `HttpError 403: Access Not Configured` | API not enabled — user needs to enable it in Google Cloud Console |
| `ModuleNotFoundError` | Run `$GSETUP --install-deps` |
| Advanced Protection blocks auth | Workspace admin must allowlist the OAuth client ID |
## Revoking Access

View file

@ -1,17 +1,17 @@
#!/usr/bin/env python3
"""Google Workspace API CLI for Hermes Agent.
Thin wrapper that delegates to gws (googleworkspace/cli) via gws_bridge.py.
Maintains the same CLI interface for backward compatibility with Hermes skills.
Uses the Google Workspace CLI (`gws`) when available, but preserves the
existing Hermes-facing JSON contract and falls back to the Python client
libraries if `gws` is not installed.
Usage:
python google_api.py gmail search "is:unread" [--max 10]
python google_api.py gmail get MESSAGE_ID
python google_api.py gmail send --to user@example.com --subject "Hi" --body "Hello"
python google_api.py gmail reply MESSAGE_ID --body "Thanks"
python google_api.py calendar list [--start DATE] [--end DATE] [--calendar primary]
python google_api.py calendar list [--from DATE] [--to DATE] [--calendar primary]
python google_api.py calendar create --summary "Meeting" --start DATETIME --end DATETIME
python google_api.py calendar delete EVENT_ID
python google_api.py drive search "budget report" [--max 10]
python google_api.py contacts list [--max 20]
python google_api.py sheets get SHEET_ID RANGE
@ -21,47 +21,396 @@ Usage:
"""
import argparse
import base64
import json
import os
import shutil
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
from pathlib import Path
BRIDGE = Path(__file__).parent / "gws_bridge.py"
PYTHON = sys.executable
HERMES_HOME = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
TOKEN_PATH = HERMES_HOME / "google_token.json"
CLIENT_SECRET_PATH = HERMES_HOME / "google_client_secret.json"
SCOPES = [
"https://www.googleapis.com/auth/gmail.readonly",
"https://www.googleapis.com/auth/gmail.send",
"https://www.googleapis.com/auth/gmail.modify",
"https://www.googleapis.com/auth/calendar",
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/contacts.readonly",
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/documents.readonly",
]
def gws(*args: str) -> None:
"""Call gws via the bridge and exit with its return code."""
def _ensure_authenticated():
if not TOKEN_PATH.exists():
print("Not authenticated. Run the setup script first:", file=sys.stderr)
print(f" python {Path(__file__).parent / 'setup.py'}", file=sys.stderr)
sys.exit(1)
def _stored_token_scopes() -> list[str]:
try:
data = json.loads(TOKEN_PATH.read_text())
except Exception:
return list(SCOPES)
scopes = data.get("scopes")
if isinstance(scopes, list) and scopes:
return scopes
return list(SCOPES)
def _gws_binary() -> str | None:
override = os.getenv("HERMES_GWS_BIN")
if override:
return override
return shutil.which("gws")
def _gws_env() -> dict[str, str]:
env = os.environ.copy()
env["GOOGLE_WORKSPACE_CLI_CREDENTIALS_FILE"] = str(TOKEN_PATH)
return env
def _run_gws(parts: list[str], *, params: dict | None = None, body: dict | None = None):
binary = _gws_binary()
if not binary:
raise RuntimeError("gws not installed")
_ensure_authenticated()
cmd = [binary, *parts]
if params is not None:
cmd.extend(["--params", json.dumps(params)])
if body is not None:
cmd.extend(["--json", json.dumps(body)])
result = subprocess.run(
[PYTHON, str(BRIDGE)] + list(args),
env={**os.environ, "HERMES_HOME": os.environ.get("HERMES_HOME", str(Path.home() / ".hermes"))},
cmd,
capture_output=True,
text=True,
env=_gws_env(),
)
sys.exit(result.returncode)
if result.returncode != 0:
err = result.stderr.strip() or result.stdout.strip() or "Unknown gws error"
print(err, file=sys.stderr)
sys.exit(result.returncode or 1)
stdout = result.stdout.strip()
if not stdout:
return {}
try:
return json.loads(stdout)
except json.JSONDecodeError:
print("ERROR: Unexpected non-JSON output from gws:", file=sys.stderr)
print(stdout, file=sys.stderr)
sys.exit(1)
# -- Gmail --
def _headers_dict(msg: dict) -> dict[str, str]:
return {h["name"]: h["value"] for h in msg.get("payload", {}).get("headers", [])}
def _extract_message_body(msg: dict) -> str:
body = ""
payload = msg.get("payload", {})
if payload.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8", errors="replace")
elif payload.get("parts"):
for part in payload["parts"]:
if part.get("mimeType") == "text/plain" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
if not body:
for part in payload["parts"]:
if part.get("mimeType") == "text/html" and part.get("body", {}).get("data"):
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8", errors="replace")
break
return body
def _extract_doc_text(doc: dict) -> str:
text_parts = []
for element in doc.get("body", {}).get("content", []):
paragraph = element.get("paragraph", {})
for pe in paragraph.get("elements", []):
text_run = pe.get("textRun", {})
if text_run.get("content"):
text_parts.append(text_run["content"])
return "".join(text_parts)
def _datetime_with_timezone(value: str) -> str:
if not value:
return value
if "T" not in value:
return value
if value.endswith("Z"):
return value
tail = value[10:]
if "+" in tail or "-" in tail:
return value
return value + "Z"
def get_credentials():
"""Load and refresh credentials from token file."""
_ensure_authenticated()
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), _stored_token_scopes())
if creds.expired and creds.refresh_token:
creds.refresh(Request())
TOKEN_PATH.write_text(creds.to_json())
if not creds.valid:
print("Token is invalid. Re-run setup.", file=sys.stderr)
sys.exit(1)
return creds
def build_service(api, version):
from googleapiclient.discovery import build
return build(api, version, credentials=get_credentials())
# =========================================================================
# Gmail
# =========================================================================
def gmail_search(args):
cmd = ["gmail", "+triage", "--query", args.query, "--max", str(args.max), "--format", "json"]
gws(*cmd)
if _gws_binary():
results = _run_gws(
["gmail", "users", "messages", "list"],
params={"userId": "me", "q": args.query, "maxResults": args.max},
)
messages = results.get("messages", [])
output = []
for msg_meta in messages:
msg = _run_gws(
["gmail", "users", "messages", "get"],
params={
"userId": "me",
"id": msg_meta["id"],
"format": "metadata",
"metadataHeaders": ["From", "To", "Subject", "Date"],
},
)
headers = _headers_dict(msg)
output.append(
{
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
"labels": msg.get("labelIds", []),
}
)
print(json.dumps(output, indent=2, ensure_ascii=False))
return
service = build_service("gmail", "v1")
results = service.users().messages().list(
userId="me", q=args.query, maxResults=args.max
).execute()
messages = results.get("messages", [])
if not messages:
print("No messages found.")
return
output = []
for msg_meta in messages:
msg = service.users().messages().get(
userId="me", id=msg_meta["id"], format="metadata",
metadataHeaders=["From", "To", "Subject", "Date"],
).execute()
headers = _headers_dict(msg)
output.append({
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"snippet": msg.get("snippet", ""),
"labels": msg.get("labelIds", []),
})
print(json.dumps(output, indent=2, ensure_ascii=False))
def gmail_get(args):
gws("gmail", "+read", "--id", args.message_id, "--headers", "--format", "json")
if _gws_binary():
msg = _run_gws(
["gmail", "users", "messages", "get"],
params={"userId": "me", "id": args.message_id, "format": "full"},
)
headers = _headers_dict(msg)
result = {
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"labels": msg.get("labelIds", []),
"body": _extract_message_body(msg),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return
service = build_service("gmail", "v1")
msg = service.users().messages().get(
userId="me", id=args.message_id, format="full"
).execute()
headers = _headers_dict(msg)
result = {
"id": msg["id"],
"threadId": msg["threadId"],
"from": headers.get("From", ""),
"to": headers.get("To", ""),
"subject": headers.get("Subject", ""),
"date": headers.get("Date", ""),
"labels": msg.get("labelIds", []),
"body": _extract_message_body(msg),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
def gmail_send(args):
cmd = ["gmail", "+send", "--to", args.to, "--subject", args.subject, "--body", args.body, "--format", "json"]
if _gws_binary():
message = MIMEText(args.body, "html" if args.html else "plain")
message["to"] = args.to
message["subject"] = args.subject
if args.cc:
message["cc"] = args.cc
if args.from_header:
message["from"] = args.from_header
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw}
if args.thread_id:
body["threadId"] = args.thread_id
result = _run_gws(
["gmail", "users", "messages", "send"],
params={"userId": "me"},
body=body,
)
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
return
service = build_service("gmail", "v1")
message = MIMEText(args.body, "html" if args.html else "plain")
message["to"] = args.to
message["subject"] = args.subject
if args.cc:
cmd += ["--cc", args.cc]
if args.html:
cmd.append("--html")
gws(*cmd)
message["cc"] = args.cc
if args.from_header:
message["from"] = args.from_header
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw}
if args.thread_id:
body["threadId"] = args.thread_id
result = service.users().messages().send(userId="me", body=body).execute()
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
def gmail_reply(args):
gws("gmail", "+reply", "--message-id", args.message_id, "--body", args.body, "--format", "json")
if _gws_binary():
original = _run_gws(
["gmail", "users", "messages", "get"],
params={
"userId": "me",
"id": args.message_id,
"format": "metadata",
"metadataHeaders": ["From", "Subject", "Message-ID"],
},
)
headers = _headers_dict(original)
subject = headers.get("Subject", "")
if not subject.startswith("Re:"):
subject = f"Re: {subject}"
message = MIMEText(args.body)
message["to"] = headers.get("From", "")
message["subject"] = subject
if args.from_header:
message["from"] = args.from_header
if headers.get("Message-ID"):
message["In-Reply-To"] = headers["Message-ID"]
message["References"] = headers["Message-ID"]
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
result = _run_gws(
["gmail", "users", "messages", "send"],
params={"userId": "me"},
body={"raw": raw, "threadId": original["threadId"]},
)
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
return
service = build_service("gmail", "v1")
original = service.users().messages().get(
userId="me", id=args.message_id, format="metadata",
metadataHeaders=["From", "Subject", "Message-ID"],
).execute()
headers = _headers_dict(original)
subject = headers.get("Subject", "")
if not subject.startswith("Re:"):
subject = f"Re: {subject}"
message = MIMEText(args.body)
message["to"] = headers.get("From", "")
message["subject"] = subject
if args.from_header:
message["from"] = args.from_header
if headers.get("Message-ID"):
message["In-Reply-To"] = headers["Message-ID"]
message["References"] = headers["Message-ID"]
raw = base64.urlsafe_b64encode(message.as_bytes()).decode()
body = {"raw": raw, "threadId": original["threadId"]}
result = service.users().messages().send(userId="me", body=body).execute()
print(json.dumps({"status": "sent", "id": result["id"], "threadId": result.get("threadId", "")}, indent=2))
def gmail_labels(args):
gws("gmail", "users", "labels", "list", "--params", json.dumps({"userId": "me"}), "--format", "json")
if _gws_binary():
results = _run_gws(["gmail", "users", "labels", "list"], params={"userId": "me"})
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
print(json.dumps(labels, indent=2))
return
service = build_service("gmail", "v1")
results = service.users().labels().list(userId="me").execute()
labels = [{"id": l["id"], "name": l["name"], "type": l.get("type", "")} for l in results.get("labels", [])]
print(json.dumps(labels, indent=2))
def gmail_modify(args):
body = {}
@ -69,145 +418,310 @@ def gmail_modify(args):
body["addLabelIds"] = args.add_labels.split(",")
if args.remove_labels:
body["removeLabelIds"] = args.remove_labels.split(",")
gws(
"gmail", "users", "messages", "modify",
"--params", json.dumps({"userId": "me", "id": args.message_id}),
"--json", json.dumps(body),
"--format", "json",
)
if _gws_binary():
result = _run_gws(
["gmail", "users", "messages", "modify"],
params={"userId": "me", "id": args.message_id},
body=body,
)
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
return
service = build_service("gmail", "v1")
result = service.users().messages().modify(userId="me", id=args.message_id, body=body).execute()
print(json.dumps({"id": result["id"], "labels": result.get("labelIds", [])}, indent=2))
# -- Calendar --
# =========================================================================
# Calendar
# =========================================================================
def calendar_list(args):
if args.start or args.end:
# Specific date range — use raw Calendar API for precise timeMin/timeMax
from datetime import datetime, timedelta, timezone as tz
now = datetime.now(tz.utc)
time_min = args.start or now.isoformat()
time_max = args.end or (now + timedelta(days=7)).isoformat()
gws(
"calendar", "events", "list",
"--params", json.dumps({
now = datetime.now(timezone.utc)
time_min = _datetime_with_timezone(args.start or now.isoformat())
time_max = _datetime_with_timezone(args.end or (now + timedelta(days=7)).isoformat())
if _gws_binary():
results = _run_gws(
["calendar", "events", "list"],
params={
"calendarId": args.calendar,
"timeMin": time_min,
"timeMax": time_max,
"maxResults": args.max,
"singleEvents": True,
"orderBy": "startTime",
}),
"--format", "json",
},
)
else:
# No date range — use +agenda helper (defaults to 7 days)
cmd = ["calendar", "+agenda", "--days", "7", "--format", "json"]
if args.calendar != "primary":
cmd += ["--calendar", args.calendar]
gws(*cmd)
events = []
for e in results.get("items", []):
events.append({
"id": e["id"],
"summary": e.get("summary", "(no title)"),
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
"location": e.get("location", ""),
"description": e.get("description", ""),
"status": e.get("status", ""),
"htmlLink": e.get("htmlLink", ""),
})
print(json.dumps(events, indent=2, ensure_ascii=False))
return
service = build_service("calendar", "v3")
results = service.events().list(
calendarId=args.calendar, timeMin=time_min, timeMax=time_max,
maxResults=args.max, singleEvents=True, orderBy="startTime",
).execute()
events = []
for e in results.get("items", []):
events.append({
"id": e["id"],
"summary": e.get("summary", "(no title)"),
"start": e.get("start", {}).get("dateTime", e.get("start", {}).get("date", "")),
"end": e.get("end", {}).get("dateTime", e.get("end", {}).get("date", "")),
"location": e.get("location", ""),
"description": e.get("description", ""),
"status": e.get("status", ""),
"htmlLink": e.get("htmlLink", ""),
})
print(json.dumps(events, indent=2, ensure_ascii=False))
def calendar_create(args):
cmd = [
"calendar", "+insert",
"--summary", args.summary,
"--start", args.start,
"--end", args.end,
"--format", "json",
]
event = {
"summary": args.summary,
"start": {"dateTime": args.start},
"end": {"dateTime": args.end},
}
if args.location:
cmd += ["--location", args.location]
event["location"] = args.location
if args.description:
cmd += ["--description", args.description]
event["description"] = args.description
if args.attendees:
for email in args.attendees.split(","):
cmd += ["--attendee", email.strip()]
if args.calendar != "primary":
cmd += ["--calendar", args.calendar]
gws(*cmd)
event["attendees"] = [{"email": e.strip()} for e in args.attendees.split(",") if e.strip()]
if _gws_binary():
result = _run_gws(
["calendar", "events", "insert"],
params={"calendarId": args.calendar},
body=event,
)
print(json.dumps({
"status": "created",
"id": result["id"],
"summary": result.get("summary", ""),
"htmlLink": result.get("htmlLink", ""),
}, indent=2))
return
service = build_service("calendar", "v3")
result = service.events().insert(calendarId=args.calendar, body=event).execute()
print(json.dumps({
"status": "created",
"id": result["id"],
"summary": result.get("summary", ""),
"htmlLink": result.get("htmlLink", ""),
}, indent=2))
def calendar_delete(args):
gws(
"calendar", "events", "delete",
"--params", json.dumps({"calendarId": args.calendar, "eventId": args.event_id}),
"--format", "json",
)
if _gws_binary():
_run_gws(["calendar", "events", "delete"], params={"calendarId": args.calendar, "eventId": args.event_id})
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
return
service = build_service("calendar", "v3")
service.events().delete(calendarId=args.calendar, eventId=args.event_id).execute()
print(json.dumps({"status": "deleted", "eventId": args.event_id}))
# -- Drive --
# =========================================================================
# Drive
# =========================================================================
def drive_search(args):
query = args.query if args.raw_query else f"fullText contains '{args.query}'"
gws(
"drive", "files", "list",
"--params", json.dumps({
"q": query,
"pageSize": args.max,
"fields": "files(id,name,mimeType,modifiedTime,webViewLink)",
}),
"--format", "json",
)
if _gws_binary():
results = _run_gws(
["drive", "files", "list"],
params={
"q": query,
"pageSize": args.max,
"fields": "files(id, name, mimeType, modifiedTime, webViewLink)",
},
)
print(json.dumps(results.get("files", []), indent=2, ensure_ascii=False))
return
service = build_service("drive", "v3")
results = service.files().list(
q=query, pageSize=args.max, fields="files(id, name, mimeType, modifiedTime, webViewLink)",
).execute()
files = results.get("files", [])
print(json.dumps(files, indent=2, ensure_ascii=False))
# -- Contacts --
# =========================================================================
# Contacts
# =========================================================================
def contacts_list(args):
gws(
"people", "people", "connections", "list",
"--params", json.dumps({
"resourceName": "people/me",
"pageSize": args.max,
"personFields": "names,emailAddresses,phoneNumbers",
}),
"--format", "json",
)
if _gws_binary():
results = _run_gws(
["people", "people", "connections", "list"],
params={
"resourceName": "people/me",
"pageSize": args.max,
"personFields": "names,emailAddresses,phoneNumbers",
},
)
contacts = []
for person in results.get("connections", []):
names = person.get("names", [{}])
emails = person.get("emailAddresses", [])
phones = person.get("phoneNumbers", [])
contacts.append({
"name": names[0].get("displayName", "") if names else "",
"emails": [e.get("value", "") for e in emails],
"phones": [p.get("value", "") for p in phones],
})
print(json.dumps(contacts, indent=2, ensure_ascii=False))
return
service = build_service("people", "v1")
results = service.people().connections().list(
resourceName="people/me",
pageSize=args.max,
personFields="names,emailAddresses,phoneNumbers",
).execute()
contacts = []
for person in results.get("connections", []):
names = person.get("names", [{}])
emails = person.get("emailAddresses", [])
phones = person.get("phoneNumbers", [])
contacts.append({
"name": names[0].get("displayName", "") if names else "",
"emails": [e.get("value", "") for e in emails],
"phones": [p.get("value", "") for p in phones],
})
print(json.dumps(contacts, indent=2, ensure_ascii=False))
# -- Sheets --
# =========================================================================
# Sheets
# =========================================================================
def sheets_get(args):
gws(
"sheets", "+read",
"--spreadsheet", args.sheet_id,
"--range", args.range,
"--format", "json",
)
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "get"],
params={"spreadsheetId": args.sheet_id, "range": args.range},
)
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().get(
spreadsheetId=args.sheet_id, range=args.range,
).execute()
print(json.dumps(result.get("values", []), indent=2, ensure_ascii=False))
def sheets_update(args):
values = json.loads(args.values)
gws(
"sheets", "spreadsheets", "values", "update",
"--params", json.dumps({
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
}),
"--json", json.dumps({"values": values}),
"--format", "json",
)
body = {"values": values}
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "update"],
params={
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
},
body=body,
)
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().update(
spreadsheetId=args.sheet_id, range=args.range,
valueInputOption="USER_ENTERED", body=body,
).execute()
print(json.dumps({"updatedCells": result.get("updatedCells", 0), "updatedRange": result.get("updatedRange", "")}, indent=2))
def sheets_append(args):
values = json.loads(args.values)
gws(
"sheets", "+append",
"--spreadsheet", args.sheet_id,
"--json-values", json.dumps(values),
"--format", "json",
)
body = {"values": values}
if _gws_binary():
result = _run_gws(
["sheets", "spreadsheets", "values", "append"],
params={
"spreadsheetId": args.sheet_id,
"range": args.range,
"valueInputOption": "USER_ENTERED",
"insertDataOption": "INSERT_ROWS",
},
body=body,
)
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
return
service = build_service("sheets", "v4")
result = service.spreadsheets().values().append(
spreadsheetId=args.sheet_id, range=args.range,
valueInputOption="USER_ENTERED", insertDataOption="INSERT_ROWS", body=body,
).execute()
print(json.dumps({"updatedCells": result.get("updates", {}).get("updatedCells", 0)}, indent=2))
# -- Docs --
# =========================================================================
# Docs
# =========================================================================
def docs_get(args):
gws(
"docs", "documents", "get",
"--params", json.dumps({"documentId": args.doc_id}),
"--format", "json",
)
if _gws_binary():
doc = _run_gws(["docs", "documents", "get"], params={"documentId": args.doc_id})
result = {
"title": doc.get("title", ""),
"documentId": doc.get("documentId", ""),
"body": _extract_doc_text(doc),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
return
service = build_service("docs", "v1")
doc = service.documents().get(documentId=args.doc_id).execute()
result = {
"title": doc.get("title", ""),
"documentId": doc.get("documentId", ""),
"body": _extract_doc_text(doc),
}
print(json.dumps(result, indent=2, ensure_ascii=False))
# -- CLI parser (backward-compatible interface) --
# =========================================================================
# CLI parser
# =========================================================================
def main():
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent (gws backend)")
parser = argparse.ArgumentParser(description="Google Workspace API for Hermes Agent")
sub = parser.add_subparsers(dest="service", required=True)
# --- Gmail ---
@ -228,13 +742,15 @@ def main():
p.add_argument("--subject", required=True)
p.add_argument("--body", required=True)
p.add_argument("--cc", default="")
p.add_argument("--from", dest="from_header", default="", help="Custom From header (e.g. '\"Agent Name\" <user@example.com>')")
p.add_argument("--html", action="store_true", help="Send body as HTML")
p.add_argument("--thread-id", default="", help="Thread ID (unused with gws, kept for compat)")
p.add_argument("--thread-id", default="", help="Thread ID for threading")
p.set_defaults(func=gmail_send)
p = gmail_sub.add_parser("reply")
p.add_argument("message_id", help="Message ID to reply to")
p.add_argument("--body", required=True)
p.add_argument("--from", dest="from_header", default="", help="Custom From header (e.g. '\"Agent Name\" <user@example.com>')")
p.set_defaults(func=gmail_reply)
p = gmail_sub.add_parser("labels")

View file

@ -0,0 +1,95 @@
"""Tests for the auto-continue feature (#4493).
When the gateway restarts mid-agent-work, the session transcript ends on a
tool result that the agent never processed. The auto-continue logic detects
this and prepends a system note to the next user message so the model
finishes the interrupted work before addressing the new input.
"""
import pytest
def _simulate_auto_continue(agent_history: list, user_message: str) -> str:
"""Reproduce the auto-continue injection logic from _run_agent().
This mirrors the exact code in gateway/run.py so we can test the
detection and message transformation without spinning up a full
gateway runner.
"""
message = user_message
if agent_history and agent_history[-1].get("role") == "tool":
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "
"tool outputs you haven't responded to yet. Please finish processing "
"those results and summarize what was accomplished, then address the "
"user's new message below.]\n\n"
+ message
)
return message
class TestAutoDetection:
"""Test that trailing tool results are correctly detected."""
def test_trailing_tool_result_triggers_note(self):
history = [
{"role": "user", "content": "deploy the app"},
{"role": "assistant", "content": None, "tool_calls": [
{"id": "call_1", "function": {"name": "terminal", "arguments": "{}"}}
]},
{"role": "tool", "tool_call_id": "call_1", "content": "deployed successfully"},
]
result = _simulate_auto_continue(history, "what happened?")
assert "[System note:" in result
assert "interrupted" in result
assert "what happened?" in result
def test_trailing_assistant_message_no_note(self):
history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "Hi there!"},
]
result = _simulate_auto_continue(history, "how are you?")
assert "[System note:" not in result
assert result == "how are you?"
def test_empty_history_no_note(self):
result = _simulate_auto_continue([], "hello")
assert result == "hello"
def test_trailing_user_message_no_note(self):
"""Shouldn't happen in practice, but ensure no false positive."""
history = [
{"role": "user", "content": "hello"},
]
result = _simulate_auto_continue(history, "hello again")
assert result == "hello again"
def test_multiple_tool_results_still_triggers(self):
"""Multiple tool calls in a row — last one is still role=tool."""
history = [
{"role": "user", "content": "search and read"},
{"role": "assistant", "content": None, "tool_calls": [
{"id": "call_1", "function": {"name": "search", "arguments": "{}"}},
{"id": "call_2", "function": {"name": "read", "arguments": "{}"}},
]},
{"role": "tool", "tool_call_id": "call_1", "content": "found it"},
{"role": "tool", "tool_call_id": "call_2", "content": "file content here"},
]
result = _simulate_auto_continue(history, "continue")
assert "[System note:" in result
def test_original_message_preserved_after_note(self):
"""The user's actual message must appear after the system note."""
history = [
{"role": "assistant", "content": None, "tool_calls": [
{"id": "c1", "function": {"name": "t", "arguments": "{}"}}
]},
{"role": "tool", "tool_call_id": "c1", "content": "done"},
]
result = _simulate_auto_continue(history, "now do X")
# System note comes first, then user's message
note_end = result.index("]\n\n")
user_msg_start = result.index("now do X")
assert user_msg_start > note_end

View file

@ -19,10 +19,34 @@ def _ensure_discord_mock():
discord_mod.Thread = type("Thread", (), {})
discord_mod.ForumChannel = type("ForumChannel", (), {})
discord_mod.Interaction = object
# Lightweight mock for app_commands.Group and Command used by
# _register_skill_group.
class _FakeGroup:
def __init__(self, *, name, description, parent=None):
self.name = name
self.description = description
self.parent = parent
self._children: dict[str, object] = {}
if parent is not None:
parent.add_command(self)
def add_command(self, cmd):
self._children[cmd.name] = cmd
class _FakeCommand:
def __init__(self, *, name, description, callback, parent=None):
self.name = name
self.description = description
self.callback = callback
self.parent = parent
discord_mod.app_commands = SimpleNamespace(
describe=lambda **kwargs: (lambda fn: fn),
choices=lambda **kwargs: (lambda fn: fn),
Choice=lambda **kwargs: SimpleNamespace(**kwargs),
Group=_FakeGroup,
Command=_FakeCommand,
)
ext_mod = MagicMock()
@ -51,6 +75,12 @@ class FakeTree:
return decorator
def add_command(self, cmd):
self.commands[cmd.name] = cmd
def get_commands(self):
return [SimpleNamespace(name=n) for n in self.commands]
@pytest.fixture
def adapter():
@ -87,6 +117,23 @@ async def test_registers_native_thread_slash_command(adapter):
adapter._handle_thread_create_slash.assert_awaited_once_with(interaction, "Planning", "", 1440)
@pytest.mark.asyncio
async def test_registers_native_restart_slash_command(adapter):
adapter._run_simple_slash = AsyncMock()
adapter._register_slash_commands()
assert "restart" in adapter._client.tree.commands
interaction = SimpleNamespace()
await adapter._client.tree.commands["restart"](interaction)
adapter._run_simple_slash.assert_awaited_once_with(
interaction,
"/restart",
"Restart requested~",
)
# ------------------------------------------------------------------
# _handle_thread_create_slash — success, session dispatch, failure
# ------------------------------------------------------------------
@ -498,3 +545,79 @@ def test_discord_auto_thread_config_bridge(monkeypatch, tmp_path):
import os
assert os.getenv("DISCORD_AUTO_THREAD") == "true"
# ------------------------------------------------------------------
# /skill group registration
# ------------------------------------------------------------------
def test_register_skill_group_creates_group(adapter):
"""_register_skill_group should register a '/skill' Group on the tree."""
mock_categories = {
"creative": [
("ascii-art", "Generate ASCII art", "/ascii-art"),
("excalidraw", "Hand-drawn diagrams", "/excalidraw"),
],
"media": [
("gif-search", "Search for GIFs", "/gif-search"),
],
}
mock_uncategorized = [
("dogfood", "Exploratory QA testing", "/dogfood"),
]
with patch(
"hermes_cli.commands.discord_skill_commands_by_category",
return_value=(mock_categories, mock_uncategorized, 0),
):
adapter._register_slash_commands()
tree = adapter._client.tree
assert "skill" in tree.commands, "Expected /skill group to be registered"
skill_group = tree.commands["skill"]
assert skill_group.name == "skill"
# Should have 2 category subgroups + 1 uncategorized subcommand
children = skill_group._children
assert "creative" in children
assert "media" in children
assert "dogfood" in children
# Category groups should have their skills
assert "ascii-art" in children["creative"]._children
assert "excalidraw" in children["creative"]._children
assert "gif-search" in children["media"]._children
def test_register_skill_group_empty_skills_no_group(adapter):
"""No /skill group should be added when there are zero skills."""
with patch(
"hermes_cli.commands.discord_skill_commands_by_category",
return_value=({}, [], 0),
):
adapter._register_slash_commands()
tree = adapter._client.tree
assert "skill" not in tree.commands
def test_register_skill_group_handler_dispatches_command(adapter):
"""Skill subcommand handlers should dispatch the correct /cmd-key text."""
mock_categories = {
"media": [
("gif-search", "Search for GIFs", "/gif-search"),
],
}
with patch(
"hermes_cli.commands.discord_skill_commands_by_category",
return_value=(mock_categories, [], 0),
):
adapter._register_slash_commands()
skill_group = adapter._client.tree.commands["skill"]
media_group = skill_group._children["media"]
gif_cmd = media_group._children["gif-search"]
assert gif_cmd.callback is not None
# The callback name should reflect the skill
assert "gif_search" in gif_cmd.callback.__name__

View file

@ -193,7 +193,7 @@ async def test_shutdown_notification_says_restarting_when_restart_requested():
assert len(adapter.sent) == 1
assert "restarting" in adapter.sent[0]
assert "/retry" in adapter.sent[0]
assert "resume" in adapter.sent[0]
@pytest.mark.asyncio

View file

@ -0,0 +1,116 @@
"""Tests for stuck-session loop detection (#7536).
When a session is active across 3+ consecutive gateway restarts (the agent
gets stuck, gateway restarts, same session gets stuck again), the session
is auto-suspended on startup so the user gets a clean slate.
"""
import json
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from tests.gateway.restart_test_helpers import make_restart_runner
@pytest.fixture
def runner_with_home(tmp_path, monkeypatch):
"""Create a runner with a writable HERMES_HOME."""
monkeypatch.setattr("gateway.run._hermes_home", tmp_path)
runner, adapter = make_restart_runner()
return runner, tmp_path
class TestStuckLoopDetection:
def test_increment_creates_file(self, runner_with_home):
runner, home = runner_with_home
runner._increment_restart_failure_counts({"session:a", "session:b"})
path = home / runner._STUCK_LOOP_FILE
assert path.exists()
counts = json.loads(path.read_text())
assert counts["session:a"] == 1
assert counts["session:b"] == 1
def test_increment_accumulates(self, runner_with_home):
runner, home = runner_with_home
runner._increment_restart_failure_counts({"session:a"})
runner._increment_restart_failure_counts({"session:a"})
runner._increment_restart_failure_counts({"session:a"})
counts = json.loads((home / runner._STUCK_LOOP_FILE).read_text())
assert counts["session:a"] == 3
def test_increment_drops_inactive_sessions(self, runner_with_home):
runner, home = runner_with_home
runner._increment_restart_failure_counts({"session:a", "session:b"})
runner._increment_restart_failure_counts({"session:a"}) # b not active
counts = json.loads((home / runner._STUCK_LOOP_FILE).read_text())
assert "session:a" in counts
assert "session:b" not in counts
def test_suspend_at_threshold(self, runner_with_home):
runner, home = runner_with_home
# Simulate 3 restarts with session:a active each time
for _ in range(3):
runner._increment_restart_failure_counts({"session:a"})
# Create a mock session entry
mock_entry = MagicMock()
mock_entry.suspended = False
runner.session_store._entries = {"session:a": mock_entry}
runner.session_store._save = MagicMock()
suspended = runner._suspend_stuck_loop_sessions()
assert suspended == 1
assert mock_entry.suspended is True
def test_no_suspend_below_threshold(self, runner_with_home):
runner, home = runner_with_home
runner._increment_restart_failure_counts({"session:a"})
runner._increment_restart_failure_counts({"session:a"})
# Only 2 restarts — below threshold of 3
mock_entry = MagicMock()
mock_entry.suspended = False
runner.session_store._entries = {"session:a": mock_entry}
suspended = runner._suspend_stuck_loop_sessions()
assert suspended == 0
assert mock_entry.suspended is False
def test_clear_on_success(self, runner_with_home):
runner, home = runner_with_home
runner._increment_restart_failure_counts({"session:a", "session:b"})
runner._clear_restart_failure_count("session:a")
path = home / runner._STUCK_LOOP_FILE
counts = json.loads(path.read_text())
assert "session:a" not in counts
assert "session:b" in counts
def test_clear_removes_file_when_empty(self, runner_with_home):
runner, home = runner_with_home
runner._increment_restart_failure_counts({"session:a"})
runner._clear_restart_failure_count("session:a")
assert not (home / runner._STUCK_LOOP_FILE).exists()
def test_suspend_clears_file(self, runner_with_home):
runner, home = runner_with_home
for _ in range(3):
runner._increment_restart_failure_counts({"session:a"})
mock_entry = MagicMock()
mock_entry.suspended = False
runner.session_store._entries = {"session:a": mock_entry}
runner.session_store._save = MagicMock()
runner._suspend_stuck_loop_sessions()
assert not (home / runner._STUCK_LOOP_FILE).exists()
def test_no_file_no_crash(self, runner_with_home):
runner, home = runner_with_home
# No file exists — should return 0 and not crash
assert runner._suspend_stuck_loop_sessions() == 0
# Clear on nonexistent file — should not crash
runner._clear_restart_failure_count("nonexistent")

View file

@ -1031,3 +1031,154 @@ class TestDiscordSkillCommands:
assert len(name) <= _CMD_NAME_LIMIT, (
f"Name '{name}' is {len(name)} chars (limit {_CMD_NAME_LIMIT})"
)
# ---------------------------------------------------------------------------
# Discord skill commands grouped by category
# ---------------------------------------------------------------------------
from hermes_cli.commands import discord_skill_commands_by_category # noqa: E402
class TestDiscordSkillCommandsByCategory:
"""Tests for discord_skill_commands_by_category() — /skill group registration."""
def test_groups_skills_by_category(self, tmp_path, monkeypatch):
"""Skills nested 2+ levels deep should be grouped by top-level category."""
from unittest.mock import patch
fake_skills_dir = str(tmp_path / "skills")
# Create the directory structure so resolve() works
for p in [
"skills/creative/ascii-art",
"skills/creative/excalidraw",
"skills/media/gif-search",
]:
(tmp_path / p).mkdir(parents=True, exist_ok=True)
(tmp_path / p / "SKILL.md").write_text("---\nname: test\n---\n")
fake_cmds = {
"/ascii-art": {
"name": "ascii-art",
"description": "Generate ASCII art",
"skill_md_path": f"{fake_skills_dir}/creative/ascii-art/SKILL.md",
},
"/excalidraw": {
"name": "excalidraw",
"description": "Hand-drawn diagrams",
"skill_md_path": f"{fake_skills_dir}/creative/excalidraw/SKILL.md",
},
"/gif-search": {
"name": "gif-search",
"description": "Search for GIFs",
"skill_md_path": f"{fake_skills_dir}/media/gif-search/SKILL.md",
},
}
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with (
patch("agent.skill_commands.get_skill_commands", return_value=fake_cmds),
patch("tools.skills_tool.SKILLS_DIR", tmp_path / "skills"),
):
categories, uncategorized, hidden = discord_skill_commands_by_category(
reserved_names=set(),
)
assert "creative" in categories
assert "media" in categories
assert len(categories["creative"]) == 2
assert len(categories["media"]) == 1
assert uncategorized == []
assert hidden == 0
def test_root_level_skills_are_uncategorized(self, tmp_path, monkeypatch):
"""Skills directly under SKILLS_DIR (only 1 path component) → uncategorized."""
from unittest.mock import patch
fake_skills_dir = str(tmp_path / "skills")
(tmp_path / "skills" / "dogfood").mkdir(parents=True, exist_ok=True)
(tmp_path / "skills" / "dogfood" / "SKILL.md").write_text("")
fake_cmds = {
"/dogfood": {
"name": "dogfood",
"description": "QA testing",
"skill_md_path": f"{fake_skills_dir}/dogfood/SKILL.md",
},
}
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with (
patch("agent.skill_commands.get_skill_commands", return_value=fake_cmds),
patch("tools.skills_tool.SKILLS_DIR", tmp_path / "skills"),
):
categories, uncategorized, hidden = discord_skill_commands_by_category(
reserved_names=set(),
)
assert categories == {}
assert len(uncategorized) == 1
assert uncategorized[0][0] == "dogfood"
def test_hub_skills_excluded(self, tmp_path, monkeypatch):
"""Skills under .hub should be excluded."""
from unittest.mock import patch
fake_skills_dir = str(tmp_path / "skills")
(tmp_path / "skills" / ".hub" / "some-skill").mkdir(parents=True, exist_ok=True)
(tmp_path / "skills" / ".hub" / "some-skill" / "SKILL.md").write_text("")
fake_cmds = {
"/some-skill": {
"name": "some-skill",
"description": "Hub skill",
"skill_md_path": f"{fake_skills_dir}/.hub/some-skill/SKILL.md",
},
}
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with (
patch("agent.skill_commands.get_skill_commands", return_value=fake_cmds),
patch("tools.skills_tool.SKILLS_DIR", tmp_path / "skills"),
):
categories, uncategorized, hidden = discord_skill_commands_by_category(
reserved_names=set(),
)
assert categories == {}
assert uncategorized == []
def test_deep_nested_skills_use_top_category(self, tmp_path, monkeypatch):
"""Skills like mlops/training/axolotl should group under 'mlops'."""
from unittest.mock import patch
fake_skills_dir = str(tmp_path / "skills")
(tmp_path / "skills" / "mlops" / "training" / "axolotl").mkdir(parents=True, exist_ok=True)
(tmp_path / "skills" / "mlops" / "training" / "axolotl" / "SKILL.md").write_text("")
(tmp_path / "skills" / "mlops" / "inference" / "vllm").mkdir(parents=True, exist_ok=True)
(tmp_path / "skills" / "mlops" / "inference" / "vllm" / "SKILL.md").write_text("")
fake_cmds = {
"/axolotl": {
"name": "axolotl",
"description": "Fine-tuning with Axolotl",
"skill_md_path": f"{fake_skills_dir}/mlops/training/axolotl/SKILL.md",
},
"/vllm": {
"name": "vllm",
"description": "vLLM inference",
"skill_md_path": f"{fake_skills_dir}/mlops/inference/vllm/SKILL.md",
},
}
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with (
patch("agent.skill_commands.get_skill_commands", return_value=fake_cmds),
patch("tools.skills_tool.SKILLS_DIR", tmp_path / "skills"),
):
categories, uncategorized, hidden = discord_skill_commands_by_category(
reserved_names=set(),
)
# Both should be under 'mlops' regardless of sub-category
assert "mlops" in categories
names = {n for n, _d, _k in categories["mlops"]}
assert "axolotl" in names
assert "vllm" in names
assert len(uncategorized) == 0

View file

@ -452,7 +452,7 @@ class TestGatewayServiceDetection:
class TestGatewaySystemServiceRouting:
def test_systemd_restart_self_requests_graceful_restart_without_reload_or_restart(self, monkeypatch, capsys):
def test_systemd_restart_self_requests_graceful_restart_and_waits(self, monkeypatch, capsys):
calls = []
monkeypatch.setattr(gateway_cli, "_select_systemd_scope", lambda system=False: False)
@ -466,16 +466,37 @@ class TestGatewaySystemServiceRouting:
"_request_gateway_self_restart",
lambda pid: calls.append(("self", pid)) or True,
)
monkeypatch.setattr(
gateway_cli.subprocess,
"run",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("systemctl should not run")),
)
# Simulate: old process dies immediately, new process becomes active
kill_call_count = [0]
def fake_kill(pid, sig):
kill_call_count[0] += 1
if kill_call_count[0] >= 2: # first call checks, second = dead
raise ProcessLookupError()
monkeypatch.setattr(os, "kill", fake_kill)
# Simulate systemctl is-active returning "active" with a new PID
new_pid = [None]
def fake_subprocess_run(cmd, **kwargs):
if "is-active" in cmd:
result = SimpleNamespace(stdout="active\n", returncode=0)
new_pid[0] = 999 # new PID
return result
raise AssertionError(f"Unexpected systemctl call: {cmd}")
monkeypatch.setattr(gateway_cli.subprocess, "run", fake_subprocess_run)
# get_running_pid returns new PID after restart
pid_calls = [0]
def fake_get_pid():
pid_calls[0] += 1
return 999 if pid_calls[0] > 1 else 654
monkeypatch.setattr("gateway.status.get_running_pid", fake_get_pid)
gateway_cli.systemd_restart()
assert calls == [("refresh", False), ("self", 654)]
assert "restart requested" in capsys.readouterr().out.lower()
assert ("self", 654) in calls
out = capsys.readouterr().out.lower()
assert "restarted" in out
def test_gateway_install_passes_system_flags(self, monkeypatch):
monkeypatch.setattr(gateway_cli, "supports_systemd_services", lambda: True)

View file

@ -116,6 +116,22 @@ class TestValidateToolset:
def test_invalid(self):
assert validate_toolset("nonexistent") is False
def test_mcp_alias_uses_live_registry(self, monkeypatch):
reg = ToolRegistry()
reg.register(
name="mcp_dynserver_ping",
toolset="mcp-dynserver",
schema=_make_schema("mcp_dynserver_ping", "Ping"),
handler=_dummy_handler,
)
reg.register_toolset_alias("dynserver", "mcp-dynserver")
monkeypatch.setattr("tools.registry.registry", reg)
assert validate_toolset("dynserver") is True
assert validate_toolset("mcp-dynserver") is True
assert "mcp_dynserver_ping" in resolve_toolset("dynserver")
class TestGetToolsetInfo:
def test_leaf(self):
@ -150,6 +166,23 @@ class TestCreateCustomToolset:
del TOOLSETS["_test_custom"]
class TestRegistryOwnedToolsets:
def test_registry_membership_is_live(self, monkeypatch):
reg = ToolRegistry()
reg.register(
name="test_live_toolset_tool",
toolset="test-live-toolset",
schema=_make_schema("test_live_toolset_tool", "Live"),
handler=_dummy_handler,
)
monkeypatch.setattr("tools.registry.registry", reg)
assert validate_toolset("test-live-toolset") is True
assert get_toolset("test-live-toolset")["tools"] == ["test_live_toolset_tool"]
assert resolve_toolset("test-live-toolset") == ["test_live_toolset_tool"]
class TestToolsetConsistency:
"""Verify structural integrity of the built-in TOOLSETS dict."""

View file

@ -31,18 +31,25 @@ def _clear_browser_caches():
class TestSanePath:
"""Verify _SANE_PATH includes Homebrew directories."""
"""Verify _SANE_PATH includes fallback directories used by browser_tool."""
def test_includes_termux_bin(self):
assert "/data/data/com.termux/files/usr/bin" in _SANE_PATH.split(os.pathsep)
def test_includes_termux_sbin(self):
assert "/data/data/com.termux/files/usr/sbin" in _SANE_PATH.split(os.pathsep)
def test_includes_homebrew_bin(self):
assert "/opt/homebrew/bin" in _SANE_PATH
assert "/opt/homebrew/bin" in _SANE_PATH.split(os.pathsep)
def test_includes_homebrew_sbin(self):
assert "/opt/homebrew/sbin" in _SANE_PATH
assert "/opt/homebrew/sbin" in _SANE_PATH.split(os.pathsep)
def test_includes_standard_dirs(self):
assert "/usr/local/bin" in _SANE_PATH
assert "/usr/bin" in _SANE_PATH
assert "/bin" in _SANE_PATH
path_parts = _SANE_PATH.split(os.pathsep)
assert "/usr/local/bin" in path_parts
assert "/usr/bin" in path_parts
assert "/bin" in path_parts
class TestDiscoverHomebrewNodeDirs:
@ -143,6 +150,44 @@ class TestFindAgentBrowser:
result = _find_agent_browser()
assert result == "npx agent-browser"
def test_finds_npx_in_termux_fallback_path(self):
"""Should find npx when only Termux fallback dirs are available."""
def mock_which(cmd, path=None):
if cmd == "agent-browser":
return None
if cmd == "npx":
if path and "/data/data/com.termux/files/usr/bin" in path:
return "/data/data/com.termux/files/usr/bin/npx"
return None
return None
original_path_exists = Path.exists
def mock_path_exists(self):
if "node_modules" in str(self) and "agent-browser" in str(self):
return False
return original_path_exists(self)
real_isdir = os.path.isdir
def selective_isdir(path):
if path in (
"/data/data/com.termux/files/usr/bin",
"/data/data/com.termux/files/usr/sbin",
):
return True
return real_isdir(path)
with patch("shutil.which", side_effect=mock_which), \
patch("os.path.isdir", side_effect=selective_isdir), \
patch.object(Path, "exists", mock_path_exists), \
patch(
"tools.browser_tool._discover_homebrew_node_dirs",
return_value=[],
):
result = _find_agent_browser()
assert result == "npx agent-browser"
def test_raises_when_not_found(self):
"""Should raise FileNotFoundError when nothing works."""
original_path_exists = Path.exists
@ -399,3 +444,51 @@ class TestRunBrowserCommandPathConstruction:
result_path = captured_env.get("PATH", "")
assert "/opt/homebrew/bin" in result_path
assert "/opt/homebrew/sbin" in result_path
def test_subprocess_path_includes_termux_fallback_dirs(self, tmp_path):
"""Termux fallback dirs should survive browser PATH rebuilding."""
captured_env = {}
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.wait.return_value = 0
def capture_popen(cmd, **kwargs):
captured_env.update(kwargs.get("env", {}))
return mock_proc
fake_session = {
"session_name": "test-session",
"session_id": "test-id",
"cdp_url": None,
}
fake_json = json.dumps({"success": True})
real_isdir = os.path.isdir
def selective_isdir(path):
if path in (
"/data/data/com.termux/files/usr/bin",
"/data/data/com.termux/files/usr/sbin",
):
return True
if path.startswith(str(tmp_path)):
return True
return real_isdir(path)
with patch("tools.browser_tool._find_agent_browser", return_value="/usr/local/bin/agent-browser"), \
patch("tools.browser_tool._get_session_info", return_value=fake_session), \
patch("tools.browser_tool._socket_safe_tmpdir", return_value=str(tmp_path)), \
patch("tools.browser_tool._discover_homebrew_node_dirs", return_value=[]), \
patch("os.path.isdir", side_effect=selective_isdir), \
patch("subprocess.Popen", side_effect=capture_popen), \
patch("os.open", return_value=99), \
patch("os.close"), \
patch("tools.interrupt.is_interrupted", return_value=False), \
patch.dict(os.environ, {"PATH": "/usr/bin:/bin", "HOME": "/home/test"}, clear=True):
with patch("builtins.open", mock_open(read_data=fake_json)):
_run_browser_command("test-task", "navigate", ["https://example.com"])
result_path = captured_env.get("PATH", "")
assert "/data/data/com.termux/files/usr/bin" in result_path
assert "/data/data/com.termux/files/usr/sbin" in result_path

View file

@ -21,34 +21,19 @@ class TestRegisterServerTools:
def mock_registry(self):
return ToolRegistry()
@pytest.fixture
def mock_toolsets(self):
return {
"hermes-cli": {"tools": ["terminal"], "description": "CLI", "includes": []},
"hermes-telegram": {"tools": ["terminal"], "description": "TG", "includes": []},
"custom-toolset": {"tools": [], "description": "Other", "includes": []},
}
def test_injects_hermes_toolsets(self, mock_registry, mock_toolsets):
"""Tools are injected into hermes-* toolsets but not custom ones."""
def test_exposes_live_server_aliases(self, mock_registry):
"""Registered MCP tools are reachable via live raw-server aliases."""
server = MCPServerTask("my_srv")
server._tools = [_make_mcp_tool("my_tool", "desc")]
server.session = MagicMock()
from toolsets import resolve_toolset, validate_toolset
with patch("tools.registry.registry", mock_registry), \
patch("toolsets.create_custom_toolset"), \
patch.dict("toolsets.TOOLSETS", mock_toolsets, clear=True):
with patch("tools.registry.registry", mock_registry):
registered = _register_server_tools("my_srv", server, {})
assert "mcp_my_srv_my_tool" in registered
assert "mcp_my_srv_my_tool" in mock_registry.get_all_tool_names()
# Injected into hermes-* toolsets
assert "mcp_my_srv_my_tool" in mock_toolsets["hermes-cli"]["tools"]
assert "mcp_my_srv_my_tool" in mock_toolsets["hermes-telegram"]["tools"]
# NOT into non-hermes toolsets
assert "mcp_my_srv_my_tool" not in mock_toolsets["custom-toolset"]["tools"]
assert "mcp_my_srv_my_tool" in registered
assert "mcp_my_srv_my_tool" in mock_registry.get_all_tool_names()
assert validate_toolset("my_srv") is True
assert "mcp_my_srv_my_tool" in resolve_toolset("my_srv")
class TestRefreshTools:
@ -58,19 +43,13 @@ class TestRefreshTools:
def mock_registry(self):
return ToolRegistry()
@pytest.fixture
def mock_toolsets(self):
return {
"hermes-cli": {"tools": ["terminal"], "description": "CLI", "includes": []},
"hermes-telegram": {"tools": ["terminal"], "description": "TG", "includes": []},
}
@pytest.mark.asyncio
async def test_nuke_and_repave(self, mock_registry, mock_toolsets):
async def test_nuke_and_repave(self, mock_registry):
"""Old tools are removed and new tools registered on refresh."""
server = MCPServerTask("live_srv")
server._refresh_lock = asyncio.Lock()
server._config = {}
from toolsets import resolve_toolset
# Seed initial state: one old tool registered
mock_registry.register(
@ -79,7 +58,6 @@ class TestRefreshTools:
description="", emoji="",
)
server._registered_tool_names = ["mcp_live_srv_old_tool"]
mock_toolsets["hermes-cli"]["tools"].append("mcp_live_srv_old_tool")
# New tool list from server
new_tool = _make_mcp_tool("new_tool", "new behavior")
@ -89,20 +67,13 @@ class TestRefreshTools:
)
)
with patch("tools.registry.registry", mock_registry), \
patch("toolsets.create_custom_toolset"), \
patch.dict("toolsets.TOOLSETS", mock_toolsets, clear=True):
with patch("tools.registry.registry", mock_registry):
await server._refresh_tools()
# Old tool completely gone
assert "mcp_live_srv_old_tool" not in mock_registry.get_all_tool_names()
assert "mcp_live_srv_old_tool" not in mock_toolsets["hermes-cli"]["tools"]
# New tool registered
assert "mcp_live_srv_new_tool" in mock_registry.get_all_tool_names()
assert "mcp_live_srv_new_tool" in mock_toolsets["hermes-cli"]["tools"]
assert server._registered_tool_names == ["mcp_live_srv_new_tool"]
assert "mcp_live_srv_old_tool" not in mock_registry.get_all_tool_names()
assert "mcp_live_srv_old_tool" not in resolve_toolset("live_srv")
assert "mcp_live_srv_new_tool" in mock_registry.get_all_tool_names()
assert "mcp_live_srv_new_tool" in resolve_toolset("live_srv")
assert server._registered_tool_names == ["mcp_live_srv_new_tool"]
class TestMessageHandler:
@ -165,6 +136,25 @@ class TestDeregister:
# bar still in ts1, so check should remain
assert "ts1" in reg._toolset_checks
def test_removes_toolset_alias_when_last_tool_is_removed(self):
reg = ToolRegistry()
reg.register(name="foo", toolset="mcp-srv", schema={}, handler=lambda x: x)
reg.register_toolset_alias("srv", "mcp-srv")
reg.deregister("foo")
assert reg.get_toolset_alias_target("srv") is None
def test_preserves_toolset_alias_while_toolset_still_exists(self):
reg = ToolRegistry()
reg.register(name="foo", toolset="mcp-srv", schema={}, handler=lambda x: x)
reg.register(name="bar", toolset="mcp-srv", schema={}, handler=lambda x: x)
reg.register_toolset_alias("srv", "mcp-srv")
reg.deregister("foo")
assert reg.get_toolset_alias_target("srv") == "mcp-srv"
def test_noop_for_unknown_tool(self):
reg = ToolRegistry()
reg.deregister("nonexistent") # Should not raise

View file

@ -184,11 +184,7 @@ class TestToolHandler:
def _patch_mcp_loop(self, coro_side_effect=None):
"""Return a patch for _run_on_mcp_loop that runs the coroutine directly."""
def fake_run(coro, timeout=30):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
return asyncio.run(coro)
if coro_side_effect:
return patch("tools.mcp_tool._run_on_mcp_loop", side_effect=coro_side_effect)
return patch("tools.mcp_tool._run_on_mcp_loop", side_effect=fake_run)
@ -365,10 +361,13 @@ class TestDiscoverAndRegister:
_servers.pop("fs", None)
def test_toolset_created(self):
"""A custom toolset is created for the MCP server."""
def test_toolset_resolves_live_from_registry(self):
"""MCP toolsets resolve through the live registry without TOOLSETS mutation."""
from tools.registry import ToolRegistry
from tools.mcp_tool import _discover_and_register_server, _servers, MCPServerTask
from toolsets import resolve_toolset, validate_toolset
mock_registry = ToolRegistry()
mock_tools = [_make_mcp_tool("ping", "Ping")]
mock_session = MagicMock()
@ -378,16 +377,16 @@ class TestDiscoverAndRegister:
server._tools = mock_tools
return server
mock_create = MagicMock()
with patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("toolsets.create_custom_toolset", mock_create):
patch("tools.registry.registry", mock_registry):
asyncio.run(
_discover_and_register_server("myserver", {"command": "test"})
)
mock_create.assert_called_once()
call_kwargs = mock_create.call_args
assert call_kwargs[1]["name"] == "mcp-myserver" or call_kwargs[0][0] == "mcp-myserver"
assert validate_toolset("myserver") is True
assert validate_toolset("mcp-myserver") is True
assert "mcp_myserver_ping" in resolve_toolset("myserver")
assert "mcp_myserver_ping" in resolve_toolset("mcp-myserver")
_servers.pop("myserver", None)
@ -550,12 +549,15 @@ class TestMCPServerTask:
# ---------------------------------------------------------------------------
class TestToolsetInjection:
def test_mcp_tools_added_to_all_hermes_toolsets(self):
"""Discovered MCP tools are dynamically injected into all hermes-* toolsets."""
def test_mcp_tools_resolve_through_server_aliases(self):
"""Discovered MCP tools resolve through raw server-name aliases."""
from tools.mcp_tool import MCPServerTask
from tools.registry import ToolRegistry
from toolsets import resolve_toolset, validate_toolset
mock_tools = [_make_mcp_tool("list_files", "List files")]
mock_session = MagicMock()
mock_registry = ToolRegistry()
fresh_servers = {}
@ -565,43 +567,32 @@ class TestToolsetInjection:
server._tools = mock_tools
return server
fake_toolsets = {
"hermes-cli": {"tools": ["terminal"], "description": "CLI", "includes": []},
"hermes-telegram": {"tools": ["terminal"], "description": "TG", "includes": []},
"hermes-gateway": {"tools": [], "description": "GW", "includes": []},
"non-hermes": {"tools": [], "description": "other", "includes": []},
}
fake_config = {"fs": {"command": "npx", "args": []}}
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._servers", fresh_servers), \
patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("toolsets.TOOLSETS", fake_toolsets):
patch("tools.registry.registry", mock_registry):
from tools.mcp_tool import discover_mcp_tools
result = discover_mcp_tools()
assert "mcp_fs_list_files" in result
# All hermes-* toolsets get injection
assert "mcp_fs_list_files" in fake_toolsets["hermes-cli"]["tools"]
assert "mcp_fs_list_files" in fake_toolsets["hermes-telegram"]["tools"]
assert "mcp_fs_list_files" in fake_toolsets["hermes-gateway"]["tools"]
# Non-hermes toolset should NOT get injection
assert "mcp_fs_list_files" not in fake_toolsets["non-hermes"]["tools"]
# Original tools preserved
assert "terminal" in fake_toolsets["hermes-cli"]["tools"]
# Server name becomes a standalone toolset
assert "fs" in fake_toolsets
assert "mcp_fs_list_files" in fake_toolsets["fs"]["tools"]
assert fake_toolsets["fs"]["description"].startswith("MCP server '")
assert "mcp_fs_list_files" in result
assert validate_toolset("fs") is True
assert validate_toolset("mcp-fs") is True
assert "mcp_fs_list_files" in resolve_toolset("fs")
assert "mcp_fs_list_files" in resolve_toolset("mcp-fs")
def test_server_toolset_skips_builtin_collision(self):
"""MCP server named after a built-in toolset shouldn't overwrite it."""
"""MCP raw aliases never overwrite a built-in toolset name."""
from tools.mcp_tool import MCPServerTask
from tools.registry import ToolRegistry
from toolsets import resolve_toolset, validate_toolset
mock_tools = [_make_mcp_tool("run", "Run command")]
mock_session = MagicMock()
fresh_servers = {}
mock_registry = ToolRegistry()
async def fake_connect(name, config):
server = MCPServerTask(name)
@ -620,12 +611,15 @@ class TestToolsetInjection:
patch("tools.mcp_tool._servers", fresh_servers), \
patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.registry.registry", mock_registry), \
patch("toolsets.TOOLSETS", fake_toolsets):
from tools.mcp_tool import discover_mcp_tools
discover_mcp_tools()
# Built-in toolset preserved — description unchanged
assert fake_toolsets["terminal"]["description"] == "Terminal tools"
assert fake_toolsets["terminal"]["description"] == "Terminal tools"
assert "mcp_terminal_run" not in resolve_toolset("terminal")
assert validate_toolset("mcp-terminal") is True
assert "mcp_terminal_run" in resolve_toolset("mcp-terminal")
def test_server_connection_failure_skipped(self):
"""If one server fails to connect, others still proceed."""
@ -776,6 +770,42 @@ class TestShutdown:
assert len(_servers) == 0
mock_server.shutdown.assert_called_once()
def test_shutdown_deregisters_registered_tools(self):
"""shutdown_mcp_servers removes MCP tools and their raw alias."""
import tools.mcp_tool as mcp_mod
from tools.mcp_tool import MCPServerTask, shutdown_mcp_servers, _servers
from tools.registry import registry
from toolsets import resolve_toolset, validate_toolset
_servers.clear()
registry.register(
name="mcp_test_ping",
toolset="mcp-test",
schema={
"name": "mcp_test_ping",
"description": "Ping",
"parameters": {"type": "object", "properties": {}},
},
handler=lambda *_args, **_kwargs: "{}",
)
registry.register_toolset_alias("test", "mcp-test")
server = MCPServerTask("test")
server._registered_tool_names = ["mcp_test_ping"]
_servers["test"] = server
mcp_mod._ensure_mcp_loop()
try:
assert validate_toolset("test") is True
assert "mcp_test_ping" in resolve_toolset("test")
shutdown_mcp_servers()
finally:
mcp_mod._mcp_loop = None
mcp_mod._mcp_thread = None
assert "mcp_test_ping" not in registry.get_all_tool_names()
assert validate_toolset("test") is False
def test_shutdown_handles_errors(self):
"""shutdown_mcp_servers handles errors during close gracefully."""
import tools.mcp_tool as mcp_mod
@ -1179,7 +1209,11 @@ class TestConfigurableTimeouts:
try:
handler = _make_tool_handler("test_srv", "my_tool", 180)
with patch("tools.mcp_tool._run_on_mcp_loop") as mock_run:
mock_run.return_value = json.dumps({"result": "ok"})
def fake_run(coro, timeout=30):
coro.close()
return json.dumps({"result": "ok"})
mock_run.side_effect = fake_run
handler({})
# Verify timeout=180 was passed
call_kwargs = mock_run.call_args
@ -1279,11 +1313,7 @@ class TestUtilityHandlers:
def _patch_mcp_loop(self):
"""Return a patch for _run_on_mcp_loop that runs the coroutine directly."""
def fake_run(coro, timeout=30):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
return asyncio.run(coro)
return patch("tools.mcp_tool._run_on_mcp_loop", side_effect=fake_run)
# -- list_resources --
@ -3038,14 +3068,23 @@ class TestSanitizeMcpNameComponent:
assert "/" not in name
assert "." not in name
def test_slash_in_sync_mcp_toolsets(self):
"""_sync_mcp_toolsets uses sanitize consistently with _convert_mcp_schema."""
from tools.mcp_tool import sanitize_mcp_name_component
def test_slash_in_server_alias_resolution(self):
"""Server names with slashes resolve through their live MCP alias."""
from tools.registry import ToolRegistry
from toolsets import resolve_toolset, validate_toolset
# Verify the prefix generation matches what _convert_mcp_schema produces
server_name = "ai.exa/exa"
safe_prefix = f"mcp_{sanitize_mcp_name_component(server_name)}_"
assert safe_prefix == "mcp_ai_exa_exa_"
reg = ToolRegistry()
reg.register(
name="mcp_ai_exa_exa_search",
toolset="mcp-ai.exa/exa",
schema={"name": "mcp_ai_exa_exa_search", "description": "Search", "parameters": {"type": "object", "properties": {}}},
handler=lambda *_args, **_kwargs: "{}",
)
reg.register_toolset_alias("ai.exa/exa", "mcp-ai.exa/exa")
with patch("tools.registry.registry", reg):
assert validate_toolset("ai.exa/exa") is True
assert "mcp_ai_exa_exa_search" in resolve_toolset("ai.exa/exa")
# ---------------------------------------------------------------------------

View file

@ -94,11 +94,21 @@ except ImportError:
logger = logging.getLogger(__name__)
# Standard PATH entries for environments with minimal PATH (e.g. systemd services).
# Includes macOS Homebrew paths (/opt/homebrew/* for Apple Silicon).
_SANE_PATH = (
"/opt/homebrew/bin:/opt/homebrew/sbin:"
"/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
# Includes Android/Termux and macOS Homebrew locations needed for agent-browser,
# npx, node, and Android's glibc runner (grun).
_SANE_PATH_DIRS = (
"/data/data/com.termux/files/usr/bin",
"/data/data/com.termux/files/usr/sbin",
"/opt/homebrew/bin",
"/opt/homebrew/sbin",
"/usr/local/sbin",
"/usr/local/bin",
"/usr/sbin",
"/usr/bin",
"/sbin",
"/bin",
)
_SANE_PATH = os.pathsep.join(_SANE_PATH_DIRS)
@functools.lru_cache(maxsize=1)
@ -123,6 +133,28 @@ def _discover_homebrew_node_dirs() -> tuple[str, ...]:
pass
return tuple(dirs)
def _browser_candidate_path_dirs() -> list[str]:
"""Return ordered browser CLI PATH candidates shared by discovery and execution."""
hermes_home = get_hermes_home()
hermes_node_bin = str(hermes_home / "node" / "bin")
return [hermes_node_bin, *list(_discover_homebrew_node_dirs()), *_SANE_PATH_DIRS]
def _merge_browser_path(existing_path: str = "") -> str:
"""Prepend browser-specific PATH fallbacks without reordering existing entries."""
path_parts = [p for p in (existing_path or "").split(os.pathsep) if p]
existing_parts = set(path_parts)
prefix_parts: list[str] = []
for part in _browser_candidate_path_dirs():
if not part or part in existing_parts or part in prefix_parts:
continue
if os.path.isdir(part):
prefix_parts.append(part)
return os.pathsep.join(prefix_parts + path_parts)
# Throttle screenshot cleanup to avoid repeated full directory scans.
_last_screenshot_cleanup_by_dir: dict[str, float] = {}
@ -895,21 +927,10 @@ def _find_agent_browser() -> str:
_agent_browser_resolved = True
return which_result
# Build an extended search PATH including Homebrew and Hermes-managed dirs.
# This covers macOS where the process PATH may not include Homebrew paths.
extra_dirs: list[str] = []
for d in ["/opt/homebrew/bin", "/usr/local/bin"]:
if os.path.isdir(d):
extra_dirs.append(d)
extra_dirs.extend(_discover_homebrew_node_dirs())
hermes_home = get_hermes_home()
hermes_node_bin = str(hermes_home / "node" / "bin")
if os.path.isdir(hermes_node_bin):
extra_dirs.append(hermes_node_bin)
if extra_dirs:
extended_path = os.pathsep.join(extra_dirs)
# Build an extended search PATH including Hermes-managed Node, macOS
# versioned Homebrew installs, and fallback system dirs like Termux.
extended_path = _merge_browser_path("")
if extended_path:
which_result = shutil.which("agent-browser", path=extended_path)
if which_result:
_cached_agent_browser = which_result
@ -924,10 +945,10 @@ def _find_agent_browser() -> str:
_agent_browser_resolved = True
return _cached_agent_browser
# Check common npx locations (also search extended dirs)
# Check common npx locations (also search the extended fallback PATH)
npx_path = shutil.which("npx")
if not npx_path and extra_dirs:
npx_path = shutil.which("npx", path=os.pathsep.join(extra_dirs))
if not npx_path and extended_path:
npx_path = shutil.which("npx", path=extended_path)
if npx_path:
_cached_agent_browser = "npx agent-browser"
_agent_browser_resolved = True
@ -1046,24 +1067,9 @@ def _run_browser_command(
browser_env = {**os.environ}
# Ensure PATH includes Hermes-managed Node first, Homebrew versioned
# node dirs (for macOS ``brew install node@24``), then standard system dirs.
hermes_home = get_hermes_home()
hermes_node_bin = str(hermes_home / "node" / "bin")
existing_path = browser_env.get("PATH", "")
path_parts = [p for p in existing_path.split(":") if p]
candidate_dirs = (
[hermes_node_bin]
+ list(_discover_homebrew_node_dirs())
+ [p for p in _SANE_PATH.split(":") if p]
)
for part in reversed(candidate_dirs):
if os.path.isdir(part) and part not in path_parts:
path_parts.insert(0, part)
browser_env["PATH"] = ":".join(path_parts)
# Ensure subprocesses inherit the same browser-specific PATH fallbacks
# used during CLI discovery.
browser_env["PATH"] = _merge_browser_path(browser_env.get("PATH", ""))
browser_env["AGENT_BROWSER_SOCKET_DIR"] = task_socket_dir
# Use temp files for stdout/stderr instead of pipes.

View file

@ -846,8 +846,7 @@ class MCPServerTask:
After the initial ``await`` (list_tools), all mutations are synchronous
atomic from the event loop's perspective.
"""
from tools.registry import registry, tool_error
from toolsets import TOOLSETS
from tools.registry import registry
async with self._refresh_lock:
# Capture old tool names for change diff
@ -857,16 +856,11 @@ class MCPServerTask:
tools_result = await self.session.list_tools()
new_mcp_tools = tools_result.tools if hasattr(tools_result, "tools") else []
# 2. Remove old tools from hermes-* umbrella toolsets
for ts_name, ts in TOOLSETS.items():
if ts_name.startswith("hermes-"):
ts["tools"] = [t for t in ts["tools"] if t not in self._registered_tool_names]
# 3. Deregister old tools from the central registry
# 2. Deregister old tools from the central registry
for prefixed_name in self._registered_tool_names:
registry.deregister(prefixed_name)
# 4. Re-register with fresh tool list
# 3. Re-register with fresh tool list
self._tools = new_mcp_tools
self._registered_tool_names = _register_server_tools(
self.name, self, self._config
@ -1144,6 +1138,8 @@ class MCPServerTask:
async def shutdown(self):
"""Signal the Task to exit and wait for clean resource teardown."""
from tools.registry import registry
self._shutdown_event.set()
if self._task and not self._task.done():
try:
@ -1158,6 +1154,9 @@ class MCPServerTask:
await self._task
except asyncio.CancelledError:
pass
for tool_name in list(getattr(self, "_registered_tool_names", [])):
registry.deregister(tool_name)
self._registered_tool_names = []
self.session = None
@ -1671,57 +1670,6 @@ def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
}
def _sync_mcp_toolsets(server_names: Optional[List[str]] = None) -> None:
"""Expose each MCP server as a standalone toolset and inject into hermes-* sets.
Creates a real toolset entry in TOOLSETS for each server name (e.g.
TOOLSETS["github"] = {"tools": ["mcp_github_list_files", ...]}). This
makes raw server names resolvable in platform_toolsets overrides.
Also injects all MCP tools into hermes-* umbrella toolsets for the
default behavior.
Skips server names that collide with built-in toolsets.
"""
from toolsets import TOOLSETS
if server_names is None:
server_names = list(_load_mcp_config().keys())
existing = _existing_tool_names()
all_mcp_tools: List[str] = []
for server_name in server_names:
safe_prefix = f"mcp_{sanitize_mcp_name_component(server_name)}_"
server_tools = sorted(
t for t in existing if t.startswith(safe_prefix)
)
all_mcp_tools.extend(server_tools)
# Don't overwrite a built-in toolset that happens to share the name.
existing_ts = TOOLSETS.get(server_name)
if existing_ts and not str(existing_ts.get("description", "")).startswith("MCP server '"):
logger.warning(
"Skipping MCP toolset alias '%s' — a built-in toolset already uses that name",
server_name,
)
continue
TOOLSETS[server_name] = {
"description": f"MCP server '{server_name}' tools",
"tools": server_tools,
"includes": [],
}
# Also inject into hermes-* umbrella toolsets for default behavior.
for ts_name, ts in TOOLSETS.items():
if not ts_name.startswith("hermes-"):
continue
for tool_name in all_mcp_tools:
if tool_name not in ts["tools"]:
ts["tools"].append(tool_name)
def _build_utility_schemas(server_name: str) -> List[dict]:
"""Build schemas for the MCP utility tools (resources & prompts).
@ -1874,16 +1822,16 @@ def _existing_tool_names() -> List[str]:
def _register_server_tools(name: str, server: MCPServerTask, config: dict) -> List[str]:
"""Register tools from an already-connected server into the registry.
Handles include/exclude filtering, utility tools, toolset creation,
and hermes-* umbrella toolset injection.
Handles include/exclude filtering and utility tools. Toolset resolution
for ``mcp-{server}`` and raw server-name aliases is derived from the live
registry, rather than mutating ``toolsets.TOOLSETS`` at runtime.
Used by both initial discovery and dynamic refresh (list_changed).
Returns:
List of registered prefixed tool names.
"""
from tools.registry import registry, tool_error
from toolsets import create_custom_toolset, TOOLSETS
from tools.registry import registry
registered_names: List[str] = []
toolset_name = f"mcp-{name}"
@ -1973,19 +1921,8 @@ def _register_server_tools(name: str, server: MCPServerTask, config: dict) -> Li
)
registered_names.append(util_name)
# Create a custom toolset so these tools are discoverable
if registered_names:
create_custom_toolset(
name=toolset_name,
description=f"MCP tools from {name} server",
tools=registered_names,
)
# Inject into hermes-* umbrella toolsets for default behavior
for ts_name, ts in TOOLSETS.items():
if ts_name.startswith("hermes-"):
for tool_name in registered_names:
if tool_name not in ts["tools"]:
ts["tools"].append(tool_name)
registry.register_toolset_alias(name, toolset_name)
return registered_names
@ -2049,7 +1986,6 @@ def register_mcp_servers(servers: Dict[str, dict]) -> List[str]:
}
if not new_servers:
_sync_mcp_toolsets(list(servers.keys()))
return _existing_tool_names()
# Start the background event loop for MCP connections
@ -2080,8 +2016,6 @@ def register_mcp_servers(servers: Dict[str, dict]) -> List[str]:
# The outer timeout is generous: 120s total for parallel discovery.
_run_on_mcp_loop(_discover_all(), timeout=120)
_sync_mcp_toolsets(list(servers.keys()))
# Log a summary so ACP callers get visibility into what was registered.
with _lock:
connected = [n for n in new_servers if n in _servers]

View file

@ -52,6 +52,7 @@ class ToolRegistry:
def __init__(self):
self._tools: Dict[str, ToolEntry] = {}
self._toolset_checks: Dict[str, Callable] = {}
self._toolset_aliases: Dict[str, str] = {}
# MCP dynamic refresh can mutate the registry while other threads are
# reading tool metadata, so keep mutations serialized and readers on
# stable snapshots.
@ -96,6 +97,27 @@ class ToolRegistry:
if entry.toolset == toolset
)
def register_toolset_alias(self, alias: str, toolset: str) -> None:
"""Register an explicit alias for a canonical toolset name."""
with self._lock:
existing = self._toolset_aliases.get(alias)
if existing and existing != toolset:
logger.warning(
"Toolset alias collision: '%s' (%s) overwritten by %s",
alias, existing, toolset,
)
self._toolset_aliases[alias] = toolset
def get_registered_toolset_aliases(self) -> Dict[str, str]:
"""Return a snapshot of ``{alias: canonical_toolset}`` mappings."""
with self._lock:
return dict(self._toolset_aliases)
def get_toolset_alias_target(self, alias: str) -> Optional[str]:
"""Return the canonical toolset name for an alias, or None."""
with self._lock:
return self._toolset_aliases.get(alias)
# ------------------------------------------------------------------
# Registration
# ------------------------------------------------------------------
@ -164,11 +186,18 @@ class ToolRegistry:
entry = self._tools.pop(name, None)
if entry is None:
return
# Drop the toolset check if this was the last tool in that toolset
if entry.toolset in self._toolset_checks and not any(
# Drop the toolset check and aliases if this was the last tool in
# that toolset.
toolset_still_exists = any(
e.toolset == entry.toolset for e in self._tools.values()
):
)
if not toolset_still_exists:
self._toolset_checks.pop(entry.toolset, None)
self._toolset_aliases = {
alias: target
for alias, target in self._toolset_aliases.items()
if target != entry.toolset
}
logger.debug("Deregistered tool: %s", name)
# ------------------------------------------------------------------

View file

@ -409,8 +409,39 @@ def get_toolset(name: str) -> Optional[Dict[str, Any]]:
Dict: Toolset definition with description, tools, and includes
None: If toolset not found
"""
# Return toolset definition
return TOOLSETS.get(name)
toolset = TOOLSETS.get(name)
if toolset:
return toolset
try:
from tools.registry import registry
except Exception:
return None
registry_toolset = name
description = f"Plugin toolset: {name}"
alias_target = registry.get_toolset_alias_target(name)
if name not in _get_plugin_toolset_names():
registry_toolset = alias_target
if not registry_toolset:
return None
description = f"MCP server '{name}' tools"
else:
reverse_aliases = {
canonical: alias
for alias, canonical in _get_registry_toolset_aliases().items()
if alias not in TOOLSETS
}
alias = reverse_aliases.get(name)
if alias:
description = f"MCP server '{alias}' tools"
return {
"description": description,
"tools": registry.get_tool_names_for_toolset(registry_toolset),
"includes": [],
}
def resolve_toolset(name: str, visited: Set[str] = None) -> List[str]:
@ -438,7 +469,7 @@ def resolve_toolset(name: str, visited: Set[str] = None) -> List[str]:
# Use a fresh visited set per branch to avoid cross-branch contamination
resolved = resolve_toolset(toolset_name, visited.copy())
all_tools.update(resolved)
return list(all_tools)
return sorted(all_tools)
# Check for cycles / already-resolved (diamond deps).
# Silently return [] — either this is a diamond (not a bug, tools already
@ -449,15 +480,8 @@ def resolve_toolset(name: str, visited: Set[str] = None) -> List[str]:
visited.add(name)
# Get toolset definition
toolset = TOOLSETS.get(name)
toolset = get_toolset(name)
if not toolset:
# Fall back to tool registry for plugin-provided toolsets
if name in _get_plugin_toolset_names():
try:
from tools.registry import registry
return registry.get_tool_names_for_toolset(name)
except Exception:
pass
return []
# Collect direct tools
@ -470,7 +494,7 @@ def resolve_toolset(name: str, visited: Set[str] = None) -> List[str]:
included_tools = resolve_toolset(included_name, visited)
tools.update(included_tools)
return list(tools)
return sorted(tools)
def resolve_multiple_toolsets(toolset_names: List[str]) -> List[str]:
@ -489,7 +513,7 @@ def resolve_multiple_toolsets(toolset_names: List[str]) -> List[str]:
tools = resolve_toolset(name)
all_tools.update(tools)
return list(all_tools)
return sorted(all_tools)
def _get_plugin_toolset_names() -> Set[str]:
@ -509,6 +533,15 @@ def _get_plugin_toolset_names() -> Set[str]:
return set()
def _get_registry_toolset_aliases() -> Dict[str, str]:
"""Return explicit toolset aliases registered in the live registry."""
try:
from tools.registry import registry
return registry.get_registered_toolset_aliases()
except Exception:
return {}
def get_all_toolsets() -> Dict[str, Dict[str, Any]]:
"""
Get all available toolsets with their definitions.
@ -518,19 +551,19 @@ def get_all_toolsets() -> Dict[str, Dict[str, Any]]:
Returns:
Dict: All toolset definitions
"""
result = TOOLSETS.copy()
# Add plugin-provided toolsets (synthetic entries)
result = dict(TOOLSETS)
aliases = _get_registry_toolset_aliases()
for ts_name in _get_plugin_toolset_names():
if ts_name not in result:
try:
from tools.registry import registry
tools = registry.get_tool_names_for_toolset(ts_name)
result[ts_name] = {
"description": f"Plugin toolset: {ts_name}",
"tools": tools,
}
except Exception:
pass
display_name = ts_name
for alias, canonical in aliases.items():
if canonical == ts_name and alias not in TOOLSETS:
display_name = alias
break
if display_name in result:
continue
toolset = get_toolset(display_name)
if toolset:
result[display_name] = toolset
return result
@ -544,7 +577,14 @@ def get_toolset_names() -> List[str]:
List[str]: List of toolset names
"""
names = set(TOOLSETS.keys())
names |= _get_plugin_toolset_names()
aliases = _get_registry_toolset_aliases()
for ts_name in _get_plugin_toolset_names():
for alias, canonical in aliases.items():
if canonical == ts_name and alias not in TOOLSETS:
names.add(alias)
break
else:
names.add(ts_name)
return sorted(names)
@ -565,8 +605,9 @@ def validate_toolset(name: str) -> bool:
return True
if name in TOOLSETS:
return True
# Check tool registry for plugin-provided toolsets
return name in _get_plugin_toolset_names()
if name in _get_plugin_toolset_names():
return True
return name in _get_registry_toolset_aliases()
def create_custom_toolset(

View file

@ -152,12 +152,15 @@ hermes setup
### Install optional Node dependencies manually
The tested Termux path skips Node/browser bootstrap on purpose. If you want to experiment later:
The tested Termux path skips Node/browser bootstrap on purpose. If you want to experiment with browser tooling later:
```bash
pkg install nodejs-lts
npm install
```
The browser tool automatically includes Termux directories (`/data/data/com.termux/files/usr/bin`) in its PATH search, so `agent-browser` and `npx` are discovered without any extra PATH configuration.
Treat browser / WhatsApp tooling on Android as experimental until documented otherwise.
---

View file

@ -0,0 +1,191 @@
---
sidebar_position: 2
sidebar_label: "Google Workspace"
title: "Google Workspace — Gmail, Calendar, Drive, Sheets & Docs"
description: "Send email, manage calendar events, search Drive, read/write Sheets, and access Docs — all through OAuth2-authenticated Google APIs"
---
# Google Workspace Skill
Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses OAuth2 with automatic token refresh. Prefers the [Google Workspace CLI (`gws`)](https://github.com/nicholasgasior/gws) when available for broader coverage, and falls back to Google's Python client libraries otherwise.
**Skill path:** `skills/productivity/google-workspace/`
## Setup
The setup is fully agent-driven — ask Hermes to set up Google Workspace and it walks you through each step. The flow:
1. **Create a Google Cloud project** and enable the required APIs (Gmail, Calendar, Drive, Sheets, Docs, People)
2. **Create OAuth 2.0 credentials** (Desktop app type) and download the client secret JSON
3. **Authorize** — Hermes generates an auth URL, you approve in the browser, paste back the redirect URL
4. **Done** — token auto-refreshes from that point on
:::tip Email-only users
If you only need email (no Calendar/Drive/Sheets), use the **himalaya** skill instead — it works with a Gmail App Password and takes 2 minutes. No Google Cloud project needed.
:::
## Gmail
### Searching
```bash
$GAPI gmail search "is:unread" --max 10
$GAPI gmail search "from:boss@company.com newer_than:1d"
$GAPI gmail search "has:attachment filename:pdf newer_than:7d"
```
Returns JSON with `id`, `from`, `subject`, `date`, `snippet`, and `labels` for each message.
### Reading
```bash
$GAPI gmail get MESSAGE_ID
```
Returns the full message body as text (prefers plain text, falls back to HTML).
### Sending
```bash
# Basic send
$GAPI gmail send --to user@example.com --subject "Hello" --body "Message text"
# HTML email
$GAPI gmail send --to user@example.com --subject "Report" \
--body "<h1>Q4 Results</h1><p>Details here</p>" --html
# Custom From header (display name + email)
$GAPI gmail send --to user@example.com --subject "Hello" \
--from '"Research Agent" <user@example.com>' --body "Message text"
# With CC
$GAPI gmail send --to user@example.com --cc "team@example.com" \
--subject "Update" --body "FYI"
```
### Custom From Header
The `--from` flag lets you customize the sender display name on outgoing emails. This is useful when multiple agents share the same Gmail account but you want recipients to see different names:
```bash
# Agent 1
$GAPI gmail send --to client@co.com --subject "Research Summary" \
--from '"Research Agent" <shared@company.com>' --body "..."
# Agent 2
$GAPI gmail send --to client@co.com --subject "Code Review" \
--from '"Code Assistant" <shared@company.com>' --body "..."
```
**How it works:** The `--from` value is set as the RFC 5322 `From` header on the MIME message. Gmail allows customizing the display name on your own authenticated email address without any additional configuration. Recipients see the custom display name (e.g. "Research Agent") while the email address stays the same.
**Important:** If you use a *different email address* in `--from` (not the authenticated account), Gmail requires that address to be configured as a [Send As alias](https://support.google.com/mail/answer/22370) in Gmail Settings → Accounts → Send mail as.
The `--from` flag works on both `send` and `reply`:
```bash
$GAPI gmail reply MESSAGE_ID \
--from '"Support Bot" <shared@company.com>' --body "We're on it"
```
### Replying
```bash
$GAPI gmail reply MESSAGE_ID --body "Thanks, that works for me."
```
Automatically threads the reply (sets `In-Reply-To` and `References` headers) and uses the original message's thread ID.
### Labels
```bash
# List all labels
$GAPI gmail labels
# Add/remove labels
$GAPI gmail modify MESSAGE_ID --add-labels LABEL_ID
$GAPI gmail modify MESSAGE_ID --remove-labels UNREAD
```
## Calendar
```bash
# List events (defaults to next 7 days)
$GAPI calendar list
$GAPI calendar list --start 2026-03-01T00:00:00Z --end 2026-03-07T23:59:59Z
# Create event (timezone required)
$GAPI calendar create --summary "Team Standup" \
--start 2026-03-01T10:00:00-07:00 --end 2026-03-01T10:30:00-07:00
# With location and attendees
$GAPI calendar create --summary "Lunch" \
--start 2026-03-01T12:00:00Z --end 2026-03-01T13:00:00Z \
--location "Cafe" --attendees "alice@co.com,bob@co.com"
# Delete event
$GAPI calendar delete EVENT_ID
```
:::warning
Calendar times **must** include a timezone offset (e.g. `-07:00`) or use UTC (`Z`). Bare datetimes like `2026-03-01T10:00:00` are ambiguous and will be treated as UTC.
:::
## Drive
```bash
$GAPI drive search "quarterly report" --max 10
$GAPI drive search "mimeType='application/pdf'" --raw-query --max 5
```
## Sheets
```bash
# Read a range
$GAPI sheets get SHEET_ID "Sheet1!A1:D10"
# Write to a range
$GAPI sheets update SHEET_ID "Sheet1!A1:B2" --values '[["Name","Score"],["Alice","95"]]'
# Append rows
$GAPI sheets append SHEET_ID "Sheet1!A:C" --values '[["new","row","data"]]'
```
## Docs
```bash
$GAPI docs get DOC_ID
```
Returns the document title and full text content.
## Contacts
```bash
$GAPI contacts list --max 20
```
## Output Format
All commands return JSON. Key fields per service:
| Command | Fields |
|---------|--------|
| `gmail search` | `id`, `threadId`, `from`, `to`, `subject`, `date`, `snippet`, `labels` |
| `gmail get` | `id`, `threadId`, `from`, `to`, `subject`, `date`, `labels`, `body` |
| `gmail send/reply` | `status`, `id`, `threadId` |
| `calendar list` | `id`, `summary`, `start`, `end`, `location`, `description`, `htmlLink` |
| `calendar create` | `status`, `id`, `summary`, `htmlLink` |
| `drive search` | `id`, `name`, `mimeType`, `modifiedTime`, `webViewLink` |
| `contacts list` | `name`, `emails`, `phones` |
| `sheets get` | 2D array of cell values |
## Troubleshooting
| Problem | Fix |
|---------|-----|
| `NOT_AUTHENTICATED` | Run setup (ask Hermes to set up Google Workspace) |
| `REFRESH_FAILED` | Token revoked — re-run authorization steps |
| `HttpError 403: Insufficient Permission` | Missing scope — revoke and re-authorize with the right services |
| `HttpError 403: Access Not Configured` | API not enabled in Google Cloud Console |
| `ModuleNotFoundError` | Run setup script with `--install-deps` |

View file

@ -92,6 +92,7 @@ const sidebars: SidebarsConfig = {
label: 'Skills',
items: [
'user-guide/skills/godmode',
'user-guide/skills/google-workspace',
],
},
],
@ -118,7 +119,6 @@ const sidebars: SidebarsConfig = {
'user-guide/messaging/wecom-callback',
'user-guide/messaging/weixin',
'user-guide/messaging/bluebubbles',
'user-guide/messaging/qqbot',
'user-guide/messaging/open-webui',
'user-guide/messaging/webhooks',
],
@ -153,7 +153,6 @@ const sidebars: SidebarsConfig = {
'guides/use-voice-mode-with-hermes',
'guides/build-a-hermes-plugin',
'guides/automate-with-cron',
'guides/automation-templates',
'guides/cron-troubleshooting',
'guides/work-with-skills',
'guides/delegation-patterns',