Merge branch 'bb/gui' into bb/gui-glass

Brings in main (via bb/gui) plus the bb/gui-only changes since the
last sync, so a future bb/gui-glass → bb/gui merge is conflict-free.

Conflicts resolved:
- apps/desktop/src/app/chat/composer/focus.ts (add/add): keep the
  glass version. It is a strict superset of the bb/gui original —
  same focus API (`requestComposerFocus`, `onComposerFocusRequest`,
  `markActiveComposer`) plus the insert bus
  (`requestComposerInsert`, `onComposerInsertRequest`,
  `focusComposerInput`) that the glass composer / right-rail
  preview / use-composer-actions already depend on.
- apps/desktop/src/app/skills/index.tsx: keep the glass rewrite
  built on `PageSearchShell` + `Codicon` + `TextTab` — bb/gui's
  older `titlebarHeaderBaseClass` + ad-hoc `Input`/`Search`/`X`
  layout is the version this PR was meant to replace.

`npm run type-check` in apps/desktop passes against the merged tree.
This commit is contained in:
Brooklyn Nicholson 2026-05-16 21:57:56 -05:00
commit 4ce99508d6
13 changed files with 2650 additions and 3 deletions

230
gateway/memory_monitor.py Normal file
View file

@ -0,0 +1,230 @@
"""Periodic process memory usage logging for the gateway.
Ported from cline/cline#10343 (src/standalone/memory-monitor.ts).
The gateway is a long-lived process that accumulates memory as it caches
agent instances, session transcripts, tool schemas, memory providers, MCP
connections, etc. A slow leak in any of those subsystems is invisible
in a single log line you only see it by watching RSS climb over hours.
This module emits a single structured ``[MEMORY] ...`` line every N
minutes (default 5) so maintainers investigating a suspected leak can
grep ``agent.log`` / ``gateway.log`` for a time series of RSS + Python
GC stats. The timer runs in a background thread and shuts down cleanly
with the gateway.
Design notes (parity with the Cline port):
* Grep-friendly single-line format beginning ``[MEMORY]``.
* Final snapshot logged on shutdown so "last RSS before exit" is
always in the log.
* Baseline snapshot logged immediately on start.
* Daemon thread never blocks process exit.
* Uses ``resource`` (stdlib, Linux/macOS) first and falls back to
``psutil`` when ``resource`` isn't available (Windows). Both are
optional; when neither works we emit a single WARNING and disable
the monitor rather than crashing the gateway.
Config: ``logging.memory_monitor`` in ``config.yaml`` see
``hermes_cli/config.py`` for the defaults block.
"""
from __future__ import annotations
import gc
import logging
import os
import sys
import threading
import time
from typing import Optional
logger = logging.getLogger(__name__)
_BYTES_TO_MB = 1024 * 1024
_monitor_thread: Optional[threading.Thread] = None
_stop_event: Optional[threading.Event] = None
_start_time: Optional[float] = None
_interval_seconds: float = 300.0 # 5 minutes
_lock = threading.Lock()
def _get_rss_mb() -> Optional[int]:
"""Return current process resident set size in MB, or None if unavailable.
Tries ``resource.getrusage`` first (Linux/macOS, no extra deps), then
falls back to ``psutil`` which is an optional hermes-agent dep.
"""
# Linux / macOS — resource is stdlib. On Linux ru_maxrss is in KB,
# on macOS it is in bytes (yes, really). We use it as a cheap
# "current" RSS — ru_maxrss reports the high-water mark for the
# process, which is what you actually want for leak detection.
try:
import resource
maxrss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
if sys.platform == "darwin":
return int(maxrss / _BYTES_TO_MB)
# Linux / other unices: KB
return int(maxrss / 1024)
except Exception:
pass
# Fallback: psutil (Windows, or unusual unix without resource).
try:
import psutil # type: ignore
rss = psutil.Process(os.getpid()).memory_info().rss
return int(rss / _BYTES_TO_MB)
except Exception:
return None
def log_memory_usage(prefix: str = "") -> None:
"""Log current memory usage in a grep-friendly ``[MEMORY] ...`` line.
Safe to call on-demand from any thread at important lifecycle
moments (after shutdown, after context compression, etc.).
Parameters
----------
prefix
Optional extra tag inserted after ``[MEMORY]`` e.g.
``"baseline"``, ``"shutdown"``.
"""
rss = _get_rss_mb()
uptime = int(time.monotonic() - _start_time) if _start_time else 0
# gc.get_stats() returns per-generation collection counts; the sum
# is a cheap proxy for "how much garbage have we created".
try:
gc_counts = gc.get_count() # (gen0, gen1, gen2)
except Exception:
gc_counts = (0, 0, 0)
# Thread count is a handy correlate when diagnosing thread leaks.
try:
thread_count = threading.active_count()
except Exception:
thread_count = 0
tag = f"{prefix} " if prefix else ""
if rss is None:
logger.info(
"[MEMORY] %srss=unavailable gc=%s threads=%d uptime=%ds",
tag,
gc_counts,
thread_count,
uptime,
)
else:
logger.info(
"[MEMORY] %srss=%dMB gc=%s threads=%d uptime=%ds",
tag,
rss,
gc_counts,
thread_count,
uptime,
)
def _monitor_loop(stop_event: threading.Event, interval: float) -> None:
"""Background thread body — log every ``interval`` seconds until stopped."""
while not stop_event.wait(interval):
try:
log_memory_usage()
except Exception as e:
# Never let the monitor crash the gateway; just log and carry on.
logger.debug("Memory monitor iteration failed: %s", e)
def start_memory_monitoring(interval_seconds: float = 300.0) -> bool:
"""Start periodic memory usage logging in a daemon thread.
Logs immediately to capture a baseline, then every ``interval_seconds``.
Safe to call multiple times subsequent calls are no-ops while the
first monitor is still running.
Parameters
----------
interval_seconds
How often to log. Default 300s (5 minutes), matching the
upstream cline/cline implementation.
Returns
-------
bool
True if a fresh monitor thread was started, False if one was
already running or if memory introspection isn't available.
"""
global _monitor_thread, _stop_event, _start_time, _interval_seconds
with _lock:
if _monitor_thread is not None and _monitor_thread.is_alive():
return False
# Sanity-check that we can read RSS at all. If neither resource
# nor psutil works, no point spinning a thread that can only log
# "rss=unavailable" forever — warn once and bail.
if _get_rss_mb() is None:
logger.warning(
"[MEMORY] Memory monitoring unavailable: neither resource.getrusage "
"nor psutil could read process RSS — skipping periodic logging.",
)
return False
_start_time = time.monotonic()
_interval_seconds = float(interval_seconds)
_stop_event = threading.Event()
# Baseline snapshot before the loop starts.
log_memory_usage(prefix="baseline")
_monitor_thread = threading.Thread(
target=_monitor_loop,
args=(_stop_event, _interval_seconds),
name="gateway-memory-monitor",
daemon=True,
)
_monitor_thread.start()
logger.info(
"[MEMORY] Periodic memory monitoring started (interval: %ds)",
int(_interval_seconds),
)
return True
def stop_memory_monitoring(timeout: float = 2.0) -> None:
"""Stop the monitor thread and log a final snapshot.
Safe to call even if ``start_memory_monitoring()`` was never called.
"""
global _monitor_thread, _stop_event
with _lock:
if _stop_event is None or _monitor_thread is None:
return
# Final snapshot before teardown so "last RSS" is always in the log.
try:
log_memory_usage(prefix="shutdown")
except Exception:
pass
_stop_event.set()
thread = _monitor_thread
_monitor_thread = None
_stop_event = None
# Join outside the lock so a stuck log call can't deadlock shutdown.
try:
thread.join(timeout=timeout)
except Exception:
pass
logger.info("[MEMORY] Periodic memory monitoring stopped")
def is_running() -> bool:
"""True if the background monitor thread is alive."""
with _lock:
return _monitor_thread is not None and _monitor_thread.is_alive()

View file

@ -7256,7 +7256,18 @@ def _update_node_dependencies() -> None:
if not (path / "package.json").exists():
continue
extra_args = ["--silent", "--no-fund", "--no-audit", "--progress=false"]
# Stream npm output (no `--silent`, no `capture_output`) so any
# optional dependency postinstall scripts (e.g. `agent-browser`'s
# Chromium fetch on first install) print progress instead of
# appearing to hang silently for minutes (#18840). The
# `_UpdateOutputStream` wrapper installed by the updater mirrors
# streamed output to ``~/.hermes/logs/update.log`` so nothing is lost.
#
# The repo root install also passes `--workspaces=false` so npm
# does not recursively install every `apps/*` workspace (dashboard,
# desktop, shared) — those are installed/built on demand via
# `_build_web_ui()` and the desktop launchers.
extra_args = ["--no-fund", "--no-audit", "--progress=false"]
if path == PROJECT_ROOT:
extra_args.append("--workspaces=false")
@ -7264,13 +7275,14 @@ def _update_node_dependencies() -> None:
npm,
path,
extra_args=tuple(extra_args),
capture_output=False,
)
if result.returncode == 0:
print(f"{label}")
continue
print(f" ⚠ npm install failed in {label}")
stderr = (result.stderr or "").strip()
stderr = (result.stderr or "").strip() if result.stderr else ""
if stderr:
print(f" {stderr.splitlines()[-1]}")
@ -9652,7 +9664,8 @@ _BUILTIN_SUBCOMMANDS = frozenset(
"config", "cron", "curator", "dashboard", "debug", "doctor",
"dump", "fallback", "gateway", "hooks", "import", "insights",
"kanban", "login", "logout", "logs", "lsp", "mcp", "memory",
"model", "pairing", "plugins", "postinstall", "profile", "proxy", "sessions", "setup",
"model", "pairing", "plugins", "postinstall", "profile", "proxy", "send",
"sessions", "setup",
"skills", "slack", "status", "tools", "uninstall", "update",
"version", "webhook", "whatsapp", "chat",
# Help-ish invocations — plugin commands not being listed in
@ -10160,6 +10173,12 @@ def main():
)
slack_parser.set_defaults(func=cmd_slack)
# =========================================================================
# send command — pipe shell-script output to any configured platform
# =========================================================================
from hermes_cli.send_cmd import register_send_subparser
register_send_subparser(subparsers)
# =========================================================================
# login command
# =========================================================================

445
hermes_cli/send_cmd.py Normal file
View file

@ -0,0 +1,445 @@
"""CLI subcommand: ``hermes send`` — pipe text from shell scripts to any
configured messaging platform (Telegram, Discord, Slack, Signal, SMS, etc.).
This is a thin wrapper around ``tools.send_message_tool.send_message_tool``
that exposes its functionality as a standalone CLI entry point so ops
scripts, cron jobs, CI hooks, and monitoring daemons can reuse the gateway's
already-configured credentials without having to reimplement each platform's
REST API client.
Design notes:
* No LLM, no agent loop the subcommand just resolves arguments, reads the
message body, calls the shared tool function, and prints/returns the
result. It is intentionally fast, cheap, and side-effect-only.
* For platforms that send via bot token (Telegram, Discord, Slack, Signal,
SMS, WhatsApp-CloudAPI, ) no running gateway is required. The tool
talks directly to each platform's REST endpoint. For platforms that rely
on a persistent adapter connection (plugin platforms, Matrix in some
modes, ) a live gateway is needed; the underlying tool surfaces that
error to the caller.
* Exit codes follow the classic Unix convention:
0 delivery (or list) succeeded
1 delivery failed at the platform level
2 usage / argument / config error (argparse already uses 2)
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Optional
_USAGE_EXIT = 2
_FAILURE_EXIT = 1
_SUCCESS_EXIT = 0
def _read_message_body(
positional: Optional[str],
file_path: Optional[str],
) -> Optional[str]:
"""Resolve the message body from (in order):
1. An explicit positional message argument.
2. ``--file PATH`` or ``--file -`` (where ``-`` means stdin).
3. Piped stdin when it is not attached to a TTY.
Returns ``None`` when nothing is available callers must treat that as
a usage error.
"""
if positional:
return positional
if file_path:
if file_path == "-":
return sys.stdin.read()
try:
return Path(file_path).read_text()
except OSError as exc:
print(f"hermes send: cannot read {file_path}: {exc}", file=sys.stderr)
sys.exit(_USAGE_EXIT)
# Piped input: only consume stdin when it is not a TTY. Reading from a
# TTY would block the user in a half-broken "type your message" state,
# which is a poor default for an ops CLI.
if not sys.stdin.isatty():
data = sys.stdin.read()
if data:
return data
return None
def _resolve_target(arg_to: Optional[str]) -> Optional[str]:
"""Return a cleaned ``--to`` value, or ``None`` when nothing is set."""
if arg_to and arg_to.strip():
return arg_to.strip()
return None
def _emit_result(
result_json: str,
*,
json_mode: bool,
quiet: bool,
) -> int:
"""Print the tool result in the requested format and return the exit code.
The underlying ``send_message_tool`` always returns a JSON string. We
parse it, decide success/failure, and format accordingly.
"""
try:
payload = json.loads(result_json) if result_json else {}
except json.JSONDecodeError:
# Shouldn't happen with the shared tool, but be defensive — pass the
# raw string through so the user can still see what went wrong.
payload = {"error": "invalid JSON from send_message_tool", "raw": result_json}
if json_mode:
print(json.dumps(payload, indent=2))
elif quiet:
pass
else:
if payload.get("error"):
print(f"hermes send: {payload['error']}", file=sys.stderr)
elif payload.get("success"):
note = payload.get("note")
if note:
print(note)
else:
print("sent")
else:
# Unknown shape — dump it so nothing is silently dropped.
print(json.dumps(payload, indent=2))
if payload.get("error"):
return _FAILURE_EXIT
if payload.get("skipped"):
return _SUCCESS_EXIT
if payload.get("success"):
return _SUCCESS_EXIT
# Unknown / unexpected — treat as failure so scripts notice.
return _FAILURE_EXIT
def _list_targets(platform_filter: Optional[str], *, json_mode: bool) -> int:
"""Print the channel directory (all configured targets across platforms).
Uses ``load_directory()`` for structured JSON output and
``format_directory_for_display()`` for the human-readable rendering that
the send_message tool itself shows to the model keeps the two surfaces
identical.
"""
try:
from gateway.channel_directory import (
format_directory_for_display,
load_directory,
)
except Exception as exc:
print(f"hermes send: failed to load channel directory: {exc}", file=sys.stderr)
return _FAILURE_EXIT
try:
raw = load_directory()
except Exception as exc:
print(f"hermes send: failed to read channel directory: {exc}", file=sys.stderr)
return _FAILURE_EXIT
platforms = dict(raw.get("platforms") or {})
if platform_filter:
key = platform_filter.strip().lower()
filtered = {k: v for k, v in platforms.items() if k.lower() == key}
if not filtered:
print(
f"hermes send: no targets found for platform '{platform_filter}'. "
f"Configured: {', '.join(sorted(platforms)) or '(none)'}",
file=sys.stderr,
)
return _FAILURE_EXIT
platforms = filtered
if json_mode:
print(json.dumps({"platforms": platforms}, indent=2, default=str))
return _SUCCESS_EXIT
if not any(platforms.values()):
print("No messaging platforms configured or no channels discovered yet.")
print("Set one up with `hermes gateway setup`, or run the gateway once so")
print("channel discovery can populate ~/.hermes/channel_directory.json.")
return _SUCCESS_EXIT
# Human display — when unfiltered, reuse the shared formatter the agent
# already sees. When filtered, build a minimal view ourselves.
if platform_filter is None:
print(format_directory_for_display())
return _SUCCESS_EXIT
for plat_name in sorted(platforms):
channels = platforms[plat_name]
print(f"{plat_name}:")
if not channels:
print(" (no channels discovered yet)")
continue
for ch in channels:
name = ch.get("name", "?")
chat_id = ch.get("id") or ch.get("chat_id") or ""
suffix = f" [{chat_id}]" if chat_id and chat_id != name else ""
print(f" {plat_name}:{name}{suffix}")
print()
return _SUCCESS_EXIT
def _load_hermes_env() -> None:
"""Populate ``os.environ`` from ``~/.hermes/.env`` AND bridge top-level
``config.yaml`` keys into the environment so the underlying gateway
config loader sees platform credentials and home channel IDs.
``send_message_tool`` reads tokens and home-channel IDs via
``os.getenv(...)`` on each call. The gateway process does two things at
startup that ``hermes send`` must replicate when invoked standalone:
1. ``load_dotenv(~/.hermes/.env)`` brings bot tokens into the env.
2. Bridge top-level simple values from ``~/.hermes/config.yaml`` into
``os.environ`` (without overriding existing env vars). This is where
``TELEGRAM_HOME_CHANNEL`` and friends live when the user saved them
via ``hermes config set``.
See ``gateway/run.py`` for the canonical version of this bridge we
intentionally reimplement the minimum needed here so ``hermes send``
doesn't pull in the full gateway module just to resolve a home channel.
"""
# Step 1: dotenv
try:
from dotenv import load_dotenv
except Exception:
load_dotenv = None # type: ignore[assignment]
try:
from hermes_cli.config import get_hermes_home
home = get_hermes_home()
except Exception:
return
env_path = home / ".env"
if load_dotenv and env_path.exists():
try:
load_dotenv(str(env_path), override=True, encoding="utf-8")
except UnicodeDecodeError:
try:
load_dotenv(str(env_path), override=True, encoding="latin-1")
except Exception:
pass
except Exception:
pass
# Step 2: bridge top-level config.yaml values into the environment so
# gateway.config.load_gateway_config() sees them. Scalars only; don't
# override values already in the env.
import os
config_path = home / "config.yaml"
if not config_path.exists():
return
try:
import yaml # type: ignore[import-not-found]
except Exception:
return
try:
with open(config_path, "r", encoding="utf-8") as fh:
raw = yaml.safe_load(fh) or {}
except Exception:
return
try:
from hermes_cli.config import _expand_env_vars
raw = _expand_env_vars(raw)
except Exception:
pass
if not isinstance(raw, dict):
return
for key, val in raw.items():
if not isinstance(val, (str, int, float, bool)):
continue
if key in os.environ:
continue
os.environ[key] = str(val)
def cmd_send(args: argparse.Namespace) -> None:
"""Entry point wired into the top-level argparse dispatcher."""
# Bridge ~/.hermes/.env and ~/.hermes/config.yaml into os.environ so the
# gateway config loader (invoked downstream by send_message_tool and by
# the channel directory) can see platform credentials and home channels.
_load_hermes_env()
# --list short-circuits everything else.
if getattr(args, "list_targets", False):
# When `--list telegram` is used, argparse stores "telegram" in the
# `message` positional (since list_targets takes no argument).
platform_filter = getattr(args, "message", None)
exit_code = _list_targets(platform_filter, json_mode=getattr(args, "json", False))
sys.exit(exit_code)
target = _resolve_target(getattr(args, "to", None))
if not target:
print(
"hermes send: --to PLATFORM[:channel[:thread]] is required\n"
"Examples:\n"
" hermes send --to telegram \"hello\"\n"
" hermes send --to discord:#ops --file report.md\n"
" hermes send --list # list available targets",
file=sys.stderr,
)
sys.exit(_USAGE_EXIT)
message = _read_message_body(
getattr(args, "message", None),
getattr(args, "file", None),
)
if message is None or not message.strip():
print(
"hermes send: no message provided. Pass text as a positional "
"argument, use --file PATH, or pipe data via stdin.",
file=sys.stderr,
)
sys.exit(_USAGE_EXIT)
# Optional: prepend a subject line. Useful for alerting scripts that
# want a consistent header without inlining it into every call.
subject = getattr(args, "subject", None)
if subject:
message = f"{subject}\n\n{message.lstrip()}"
# Import lazily so `hermes send --help` stays fast and does not pull in
# the full tool registry / gateway config stack.
from tools.send_message_tool import send_message_tool
# send_message_tool auto-loads gateway config + env and routes to the
# appropriate platform adapter (bot-token path for Telegram/Discord/Slack/
# Signal/SMS/WhatsApp; live-adapter path for plugin platforms).
#
# It expects the standard tool-call dict and returns a JSON string.
tool_args = {
"action": "send",
"target": target,
"message": message,
}
result = send_message_tool(tool_args)
exit_code = _emit_result(
result,
json_mode=getattr(args, "json", False),
quiet=getattr(args, "quiet", False),
)
sys.exit(exit_code)
def register_send_subparser(subparsers) -> argparse.ArgumentParser:
"""Create the ``send`` subparser and return it.
Kept as a standalone function so the top-level parser builder can wire
it in next to the other messaging subcommands without cluttering
``_parser.py`` or ``main.py``.
"""
parser = subparsers.add_parser(
"send",
help="Send a message to a configured platform (scripts, cron jobs, CI).",
description=(
"Pipe text from any shell script to any messaging platform Hermes "
"is already configured for. Reuses the gateway's platform "
"credentials (~/.hermes/.env + ~/.hermes/config.yaml) — no LLM, "
"no agent loop, no running gateway required for bot-token "
"platforms like Telegram/Discord/Slack/Signal."
),
epilog=(
"Examples:\n"
" hermes send --to telegram \"deploy finished\"\n"
" echo \"RAM 92%\" | hermes send --to telegram:-1001234567890\n"
" hermes send --to discord:#ops --file /tmp/report.md\n"
" hermes send --to slack:#eng --subject \"[CI]\" --file build.log\n"
" hermes send --list # all platforms\n"
" hermes send --list telegram # filter by platform\n"
"\n"
"Exit codes: 0 ok, 1 delivery/backend error, 2 usage error."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-t",
"--to",
metavar="TARGET",
default=None,
help=(
"Delivery target. Format: 'platform' (home channel), "
"'platform:chat_id', 'platform:chat_id:thread_id', or "
"'platform:#channel-name'. Examples: telegram, "
"telegram:-1001234567890:17585, discord:#ops, slack:C0123ABCD, "
"signal:+15551234567."
),
)
parser.add_argument(
"message",
nargs="?",
default=None,
help="Message text. If omitted, read from --file or stdin.",
)
# Legacy / convenience positional removed — use --to for clarity.
parser.add_argument(
"-f",
"--file",
metavar="PATH",
default=None,
help="Read message body from PATH. Use '-' to force stdin.",
)
parser.add_argument(
"-s",
"--subject",
metavar="LINE",
default=None,
help="Prepend a subject/header line before the message body.",
)
parser.add_argument(
"-l",
"--list",
dest="list_targets",
action="store_true",
default=False,
help="List available targets. Optional positional filter: `hermes send --list telegram`.",
)
parser.add_argument(
"-q",
"--quiet",
action="store_true",
default=False,
help="Suppress stdout on success (exit code only).",
)
parser.add_argument(
"--json",
action="store_true",
default=False,
help="Emit raw JSON result instead of human-readable output.",
)
parser.set_defaults(func=cmd_send)
return parser
__all__ = ["cmd_send", "register_send_subparser"]

316
hermes_cli/session_recap.py Normal file
View file

@ -0,0 +1,316 @@
"""Session recap — summarize what's happened in the current session.
Inspired by Claude Code's `/recap` command (v2.1.114, April 2026), which
shows a one-line summary of what happened while a terminal was unfocused
so users juggling multiple sessions can re-orient quickly.
Source: https://code.claude.com/docs/en/whats-new/2026-w17
Differences from Claude Code:
- Pure local computation from the in-memory conversation history. No
LLM call, no auxiliary model, no prompt-cache invalidation. A
recap should be instant and free.
- Works unchanged on CLI and every gateway platform (Telegram,
Discord, Slack, ) because both call into the same ``build_recap``
helper. Claude Code only shows this on the CLI.
- Tailored to hermes-agent's tool vocabulary (``terminal``, ``patch``,
``write_file``, ``delegate_task``, ``browser_*``, ``web_*``) the
recap surfaces which classes of work were most active.
"""
from __future__ import annotations
import os
from collections import Counter
from typing import Any, Iterable, List, Mapping, Optional, Sequence, Tuple
# How many recent user/assistant turns we consider "recent activity".
_RECENT_TURN_WINDOW = 20
# How many characters of the latest user prompt to show.
_PROMPT_PREVIEW_CHARS = 140
# How many characters of the latest assistant text to show.
_ASSISTANT_PREVIEW_CHARS = 200
# How many recently-touched files to list.
_MAX_FILES_LISTED = 5
# Tool names that identify a file-editing action and the argument key that
# holds the path.
_FILE_EDIT_TOOLS: Mapping[str, str] = {
"write_file": "path",
"patch": "path",
"read_file": "path",
"skill_manage": "file_path",
"skill_view": "file_path",
}
def _coerce_text(value: Any) -> str:
"""Flatten assistant/user ``content`` into a plain string.
Content can be a string or a list of content blocks (for multimodal
or reasoning models). We concatenate every text-like block and
ignore the rest.
"""
if value is None:
return ""
if isinstance(value, str):
return value
if isinstance(value, list):
parts: List[str] = []
for block in value:
if isinstance(block, str):
parts.append(block)
continue
if isinstance(block, Mapping):
text = block.get("text")
if isinstance(text, str) and text:
parts.append(text)
return "\n".join(parts)
return str(value)
def _tool_call_name_and_args(tool_call: Any) -> Tuple[str, Mapping[str, Any]]:
"""Extract ``(name, arguments_dict)`` from a tool_call entry.
``arguments`` may be a JSON string or a dict depending on provider.
Return an empty dict if it cannot be parsed.
"""
if not isinstance(tool_call, Mapping):
return "", {}
fn = tool_call.get("function") or {}
if not isinstance(fn, Mapping):
return "", {}
name = str(fn.get("name") or "") or ""
raw_args = fn.get("arguments")
if isinstance(raw_args, Mapping):
return name, raw_args
if isinstance(raw_args, str) and raw_args:
try:
import json
parsed = json.loads(raw_args)
if isinstance(parsed, Mapping):
return name, parsed
except Exception:
return name, {}
return name, {}
def _iter_assistant_tool_calls(
messages: Sequence[Mapping[str, Any]],
) -> Iterable[Tuple[str, Mapping[str, Any]]]:
for msg in messages:
if not isinstance(msg, Mapping):
continue
if msg.get("role") != "assistant":
continue
tool_calls = msg.get("tool_calls") or []
if not isinstance(tool_calls, list):
continue
for tc in tool_calls:
name, args = _tool_call_name_and_args(tc)
if name:
yield name, args
def _count_visible_turns(
messages: Sequence[Mapping[str, Any]],
) -> Tuple[int, int, int]:
"""Return ``(user_turn_count, assistant_turn_count, tool_message_count)``."""
users = assistants = tools = 0
for msg in messages:
if not isinstance(msg, Mapping):
continue
role = msg.get("role")
if role == "user":
users += 1
elif role == "assistant":
assistants += 1
elif role == "tool":
tools += 1
return users, assistants, tools
def _latest_user_prompt(
messages: Sequence[Mapping[str, Any]],
) -> Optional[str]:
for msg in reversed(messages):
if isinstance(msg, Mapping) and msg.get("role") == "user":
text = _coerce_text(msg.get("content")).strip()
if text:
return text
return None
def _latest_assistant_text(
messages: Sequence[Mapping[str, Any]],
) -> Optional[str]:
for msg in reversed(messages):
if not isinstance(msg, Mapping):
continue
if msg.get("role") != "assistant":
continue
text = _coerce_text(msg.get("content")).strip()
if text:
return text
return None
def _recent_window(
messages: Sequence[Mapping[str, Any]], window: int = _RECENT_TURN_WINDOW
) -> List[Mapping[str, Any]]:
"""Return the tail slice of ``messages`` covering at most ``window``
user+assistant turns (tool messages ride along inside the window).
Iterating from the end, we count user and assistant messages and
keep everything from the first message that falls within the window.
"""
count = 0
cut = 0
for i in range(len(messages) - 1, -1, -1):
msg = messages[i]
if isinstance(msg, Mapping) and msg.get("role") in ("user", "assistant"):
count += 1
if count >= window:
cut = i
break
else:
return list(messages)
return list(messages[cut:])
def _shortened_path(path: str) -> str:
"""Show a path relative to cwd when possible, otherwise with ~ expansion."""
if not path:
return path
try:
abs_path = os.path.abspath(os.path.expanduser(path))
cwd = os.getcwd()
if abs_path == cwd:
return "."
if abs_path.startswith(cwd + os.sep):
return abs_path[len(cwd) + 1 :]
home = os.path.expanduser("~")
if abs_path.startswith(home + os.sep):
return "~/" + abs_path[len(home) + 1 :]
return abs_path
except Exception:
return path
def _summarise_tool_activity(
tool_calls: Sequence[Tuple[str, Mapping[str, Any]]],
) -> Tuple[List[Tuple[str, int]], List[str]]:
"""Return ``(tool_counts_sorted, recently_edited_files)``.
``tool_counts_sorted`` is descending by count, keeping the full list
so callers can truncate for display. ``recently_edited_files`` lists
distinct paths (most recent first) from file-editing tools.
"""
counter: Counter[str] = Counter()
files_seen: List[str] = []
files_set: set[str] = set()
# Walk in reverse so "most recent first" drops out of order-preserved iteration.
for name, args in reversed(list(tool_calls)):
counter[name] += 1
arg_key = _FILE_EDIT_TOOLS.get(name)
if arg_key:
path = args.get(arg_key)
if isinstance(path, str) and path and path not in files_set:
files_set.add(path)
files_seen.append(_shortened_path(path))
# Restore "reverse of reverse" for correct counts; Counter ignores order
# so only files_seen needed the reversal. Fix ordering: currently
# files_seen is newest→oldest which is what we want for display.
tool_counts = sorted(counter.items(), key=lambda kv: (-kv[1], kv[0]))
return tool_counts, files_seen
def _truncate(text: str, limit: int) -> str:
text = " ".join(text.split()) # collapse newlines for a compact one-liner
if len(text) <= limit:
return text
return text[: limit - 1].rstrip() + ""
def build_recap(
messages: Sequence[Mapping[str, Any]],
*,
session_title: Optional[str] = None,
session_id: Optional[str] = None,
platform: Optional[str] = None,
) -> str:
"""Build a multi-line recap of recent activity.
Inputs:
messages: the full conversation history as a list of
chat-completion-style dicts (``role``, ``content``,
``tool_calls``, ).
session_title: optional human title (from SessionDB).
session_id: optional session id.
platform: optional hint (``"cli"``, ``"telegram"``, ). Does not
change behavior today but is accepted for forward compat.
The output is plain text designed to render well in both a terminal
(with 80-col wrapping) and a gateway message bubble.
"""
_ = platform # reserved for future use
lines: List[str] = []
header_bits: List[str] = ["Session recap"]
if session_title:
header_bits.append(f"{session_title}")
elif session_id:
header_bits.append(f"{session_id[:8]}")
lines.append(" ".join(header_bits))
if not messages:
lines.append(" (nothing to recap — no messages yet)")
return "\n".join(lines)
users, assistants, tool_msgs = _count_visible_turns(messages)
window = _recent_window(messages)
win_users, win_assistants, _ = _count_visible_turns(window)
scope = (
f"{win_users} user turn{'s' if win_users != 1 else ''} / "
f"{win_assistants} assistant repl{'ies' if win_assistants != 1 else 'y'}"
)
if (users, assistants) != (win_users, win_assistants):
scope += f" (of {users}/{assistants} total)"
lines.append(f" Recent: {scope}, {tool_msgs} tool result{'s' if tool_msgs != 1 else ''}")
tool_calls = list(_iter_assistant_tool_calls(window))
tool_counts, files = _summarise_tool_activity(tool_calls)
if tool_counts:
top = ", ".join(f"{name}×{count}" for name, count in tool_counts[:5])
extra = len(tool_counts) - 5
if extra > 0:
top += f" (+{extra} more)"
lines.append(f" Tools used: {top}")
if files:
shown = files[:_MAX_FILES_LISTED]
extra = len(files) - len(shown)
entry = ", ".join(shown)
if extra > 0:
entry += f" (+{extra} more)"
lines.append(f" Files touched: {entry}")
latest_user = _latest_user_prompt(window)
if latest_user:
lines.append(f" Last ask: {_truncate(latest_user, _PROMPT_PREVIEW_CHARS)}")
latest_reply = _latest_assistant_text(window)
if latest_reply:
lines.append(f" Last reply: {_truncate(latest_reply, _ASSISTANT_PREVIEW_CHARS)}")
if len(lines) == 2:
# Only the header + scope line — nothing substantive to show.
lines.append(" (no assistant activity yet in this window)")
return "\n".join(lines)
__all__ = ["build_recap"]

View file

@ -0,0 +1,266 @@
"""Tests for post-compression historical-media stripping.
Port of Kilo-Org/kilocode#9434 (adapted for OpenAI-style message lists).
Without this pass, tail messages keep their original multi-MB base-64 image
payloads after context compression, and every subsequent request re-ships
them sometimes breaching provider body-size limits and wedging the
session.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from agent.context_compressor import (
ContextCompressor,
_content_has_images,
_is_image_part,
_strip_historical_media,
_strip_images_from_content,
)
IMG_URL = {
"type": "image_url",
"image_url": {"url": "data:image/png;base64," + ("A" * 1024)},
}
INPUT_IMG = {
"type": "input_image",
"image_url": "data:image/png;base64," + ("B" * 1024),
}
ANTHROPIC_IMG = {
"type": "image",
"source": {"type": "base64", "media_type": "image/png", "data": "C" * 1024},
}
TEXT = {"type": "text", "text": "hi"}
INPUT_TEXT = {"type": "input_text", "text": "hi"}
class TestIsImagePart:
def test_openai_chat_shape(self):
assert _is_image_part(IMG_URL) is True
def test_openai_responses_shape(self):
assert _is_image_part(INPUT_IMG) is True
def test_anthropic_native_shape(self):
assert _is_image_part(ANTHROPIC_IMG) is True
def test_text_part_is_not_image(self):
assert _is_image_part(TEXT) is False
assert _is_image_part(INPUT_TEXT) is False
def test_non_dict_rejected(self):
assert _is_image_part("image") is False
assert _is_image_part(None) is False
assert _is_image_part(42) is False
class TestContentHasImages:
def test_string_content(self):
assert _content_has_images("a string") is False
def test_empty_list(self):
assert _content_has_images([]) is False
def test_text_only_list(self):
assert _content_has_images([TEXT, TEXT]) is False
def test_list_with_image(self):
assert _content_has_images([TEXT, IMG_URL]) is True
def test_none(self):
assert _content_has_images(None) is False
class TestStripImagesFromContent:
def test_string_passthrough(self):
assert _strip_images_from_content("hello") == "hello"
def test_none_passthrough(self):
assert _strip_images_from_content(None) is None
def test_text_only_passthrough(self):
parts = [TEXT, {"type": "text", "text": "world"}]
assert _strip_images_from_content(parts) == parts
def test_replaces_image_with_placeholder(self):
parts = [TEXT, IMG_URL]
out = _strip_images_from_content(parts)
assert len(out) == 2
assert out[0] == TEXT
assert out[1] == {
"type": "text",
"text": "[Attached image — stripped after compression]",
}
def test_does_not_mutate_input(self):
parts = [IMG_URL, TEXT]
_ = _strip_images_from_content(parts)
assert parts[0] is IMG_URL # original list untouched
assert parts[1] is TEXT
def test_handles_all_three_shapes(self):
parts = [IMG_URL, INPUT_IMG, ANTHROPIC_IMG, TEXT]
out = _strip_images_from_content(parts)
assert sum(1 for p in out if p.get("type") == "text") == 4
assert not any(_is_image_part(p) for p in out)
class TestStripHistoricalMedia:
def test_empty_passthrough(self):
assert _strip_historical_media([]) == []
def test_no_images_anywhere(self):
msgs = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hey"},
{"role": "user", "content": "bye"},
]
assert _strip_historical_media(msgs) is msgs # identity — no copy
def test_single_image_user_only_first_message(self):
# Only image-bearing user is the first message — nothing before it.
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "ok"},
]
out = _strip_historical_media(msgs)
assert out is msgs # no-op
# Image still there.
assert _content_has_images(out[0]["content"])
def test_strips_older_user_image_keeps_newest(self):
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]}, # old — strip
{"role": "assistant", "content": "looked at it"},
{"role": "user", "content": [TEXT, INPUT_IMG]}, # newest — keep
]
out = _strip_historical_media(msgs)
assert out is not msgs # new list
# First message's image was replaced
assert not _content_has_images(out[0]["content"])
# Newest user still has its image
assert _content_has_images(out[2]["content"])
def test_strips_assistant_and_tool_images_before_anchor(self):
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]}, # old user
{"role": "assistant", "content": [TEXT, IMG_URL]}, # old assistant
{"role": "tool", "content": [TEXT, IMG_URL], "tool_call_id": "t1"},
{"role": "user", "content": [TEXT, IMG_URL]}, # newest user — keep
]
out = _strip_historical_media(msgs)
for i in range(3):
assert not _content_has_images(out[i]["content"]), f"msg {i} still has image"
assert _content_has_images(out[3]["content"])
def test_text_only_newest_user_still_strips_older_images(self):
# The anchor is "newest user WITH images". If the newest user is
# text-only, we fall back to the previous image-bearing user turn.
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": [TEXT, IMG_URL]}, # anchor
{"role": "assistant", "content": "done"},
{"role": "user", "content": "follow-up text only"},
]
out = _strip_historical_media(msgs)
# First image-bearing user (index 0) was stripped — it was before the
# newest image-bearing user (index 2).
assert not _content_has_images(out[0]["content"])
# Anchor (index 2) keeps its image.
assert _content_has_images(out[2]["content"])
def test_no_image_bearing_user_is_noop(self):
msgs = [
{"role": "user", "content": "first"},
{"role": "assistant", "content": [TEXT, IMG_URL]}, # assistant image only
{"role": "user", "content": "second"},
]
out = _strip_historical_media(msgs)
# No image-bearing user anchor → no stripping.
assert out is msgs
assert _content_has_images(out[1]["content"])
def test_does_not_mutate_input_messages(self):
msg0 = {"role": "user", "content": [TEXT, IMG_URL]}
msg1 = {"role": "user", "content": [TEXT, IMG_URL]}
msgs = [msg0, msg1]
_ = _strip_historical_media(msgs)
# Originals untouched
assert _content_has_images(msg0["content"])
assert _content_has_images(msg1["content"])
def test_idempotent(self):
msgs = [
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "k"},
{"role": "user", "content": [TEXT, IMG_URL]},
]
first = _strip_historical_media(msgs)
second = _strip_historical_media(first)
# Second pass is a no-op — no images left before the anchor.
assert second is first
def test_non_dict_messages_pass_through(self):
msgs = [
"not-a-dict", # shouldn't crash
{"role": "user", "content": [TEXT, IMG_URL]},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": [TEXT, IMG_URL]},
]
out = _strip_historical_media(msgs)
assert out[0] == "not-a-dict"
# Image-bearing user at index 1 is before the anchor (index 3) → stripped.
assert not _content_has_images(out[1]["content"])
class TestCompressIntegration:
"""Verify the stripping runs inside ContextCompressor.compress()."""
@pytest.fixture
def compressor(self):
with patch("agent.context_compressor.get_model_context_length", return_value=100_000):
c = ContextCompressor(
model="test/model",
threshold_percent=0.50,
protect_first_n=1,
protect_last_n=2,
quiet_mode=True,
)
return c
def test_compress_strips_historical_images(self, compressor):
# Enough messages to trigger the summarize path. protect_first_n=1 +
# protect_last_n=2 + a middle window of at least 3 with a summary.
msgs = [
{"role": "system", "content": "sys"},
{"role": "user", "content": [TEXT, IMG_URL]}, # old image-bearing user
{"role": "assistant", "content": "looked at it"},
{"role": "user", "content": "follow-up"},
{"role": "assistant", "content": "ack"},
{"role": "user", "content": "more"},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": [TEXT, IMG_URL]}, # newest image-bearing user (tail)
{"role": "assistant", "content": "done"},
]
# Bypass the real LLM summary — return a stub so compress() proceeds.
with patch.object(compressor, "_generate_summary", return_value="SUMMARY TEXT"):
out = compressor.compress(msgs, current_tokens=60_000)
# Newest user turn with image should still have it (it's in the tail).
user_imgs = [m for m in out if m.get("role") == "user" and _content_has_images(m.get("content"))]
assert len(user_imgs) == 1, (
"Expected exactly one user message with images after compression "
f"(the newest one); got {len(user_imgs)}"
)
# No assistant or tool messages should carry images either.
for m in out:
if m is user_imgs[0]:
continue
assert not _content_has_images(m.get("content")), (
f"Stale image in {m.get('role')!r} message after compression"
)

View file

@ -0,0 +1,104 @@
"""Tests for the /background indicator in the CLI status bar.
The classic prompt_toolkit status bar shows ` N` when N tasks launched via
`/background` are still running. Source of truth is `self._background_tasks`
(a Dict[str, threading.Thread]); entries are removed in the task thread's
finally block, so len() reflects truly-running tasks.
"""
import threading
from datetime import datetime
from cli import HermesCLI
def _stub_thread() -> threading.Thread:
"""Return a Thread instance that's never started — pure dict-value stand-in."""
return threading.Thread(target=lambda: None)
def _make_cli():
"""Bare-metal HermesCLI for snapshot/build tests (no __init__ side effects)."""
cli_obj = HermesCLI.__new__(HermesCLI)
cli_obj.model = "anthropic/claude-opus-4.6"
cli_obj.agent = None
cli_obj._background_tasks = {}
# The snapshot reads session_start to compute duration; supply a stub.
cli_obj.session_start = datetime.now()
return cli_obj
def test_snapshot_reports_zero_when_no_background_tasks():
cli_obj = _make_cli()
snap = cli_obj._get_status_bar_snapshot()
assert snap["active_background_tasks"] == 0
def test_snapshot_counts_live_background_tasks():
cli_obj = _make_cli()
cli_obj._background_tasks = {"bg_a": _stub_thread(), "bg_b": _stub_thread()}
snap = cli_obj._get_status_bar_snapshot()
assert snap["active_background_tasks"] == 2
def test_snapshot_safe_when_background_tasks_attr_missing():
"""Older HermesCLI instances (tests with __new__, etc.) may lack the attr."""
cli_obj = HermesCLI.__new__(HermesCLI)
cli_obj.model = "x"
cli_obj.agent = None
cli_obj.session_start = datetime.now()
# No _background_tasks at all — must not raise.
snap = cli_obj._get_status_bar_snapshot()
assert snap["active_background_tasks"] == 0
def test_plain_text_status_omits_indicator_when_idle():
cli_obj = _make_cli()
text = cli_obj._build_status_bar_text(width=80)
assert "" not in text
def test_plain_text_status_shows_indicator_when_active():
cli_obj = _make_cli()
cli_obj._background_tasks = {"bg_a": _stub_thread()}
text = cli_obj._build_status_bar_text(width=80)
assert "▶ 1" in text
def test_plain_text_status_shows_higher_count():
cli_obj = _make_cli()
cli_obj._background_tasks = {
"a": _stub_thread(),
"b": _stub_thread(),
"c": _stub_thread(),
}
text = cli_obj._build_status_bar_text(width=80)
assert "▶ 3" in text
def test_narrow_width_omits_bg_indicator():
"""The narrow tier (<52) is already cramped — bg is secondary, drop it."""
cli_obj = _make_cli()
cli_obj._background_tasks = {"bg_a": _stub_thread()}
text = cli_obj._build_status_bar_text(width=40)
assert "" not in text
def test_fragments_include_bg_segment_when_active():
cli_obj = _make_cli()
cli_obj._background_tasks = {"a": _stub_thread(), "b": _stub_thread()}
cli_obj._status_bar_visible = True
# _get_status_bar_fragments asks _get_tui_terminal_width(); stub it wide.
cli_obj._get_tui_terminal_width = lambda: 120 # type: ignore[method-assign]
frags = cli_obj._get_status_bar_fragments()
rendered = "".join(text for _style, text in frags)
assert "▶ 2" in rendered
def test_fragments_omit_bg_segment_when_idle():
cli_obj = _make_cli()
cli_obj._status_bar_visible = True
cli_obj._get_tui_terminal_width = lambda: 120 # type: ignore[method-assign]
frags = cli_obj._get_status_bar_fragments()
rendered = "".join(text for _style, text in frags)
assert "" not in rendered

View file

@ -0,0 +1,119 @@
"""Tests for `/exit --delete` and `/quit --delete` session deletion.
Ports the behavior from google-gemini/gemini-cli#19332: running `/exit` or
`/quit` with the `--delete` flag arms a one-shot `_delete_session_on_exit`
flag that the CLI shutdown path uses to remove the current session from
SQLite + on-disk transcripts before exit.
"""
from unittest.mock import MagicMock
def _make_cli():
"""Bare HermesCLI suitable for process_command() tests.
Uses ``__new__`` to skip the heavy __init__; only sets the attributes
the /exit branch touches.
"""
from cli import HermesCLI
cli = HermesCLI.__new__(HermesCLI)
cli.config = {}
cli.console = MagicMock()
cli.agent = None
cli.conversation_history = []
cli.session_id = "test-session"
cli._delete_session_on_exit = False
return cli
class TestExitDeleteFlag:
def test_plain_exit_does_not_arm_delete(self):
cli = _make_cli()
result = cli.process_command("/exit")
assert result is False
assert cli._delete_session_on_exit is False
def test_plain_quit_does_not_arm_delete(self):
cli = _make_cli()
result = cli.process_command("/quit")
assert result is False
assert cli._delete_session_on_exit is False
def test_exit_delete_arms_flag(self):
cli = _make_cli()
result = cli.process_command("/exit --delete")
assert result is False
assert cli._delete_session_on_exit is True
def test_quit_delete_arms_flag(self):
cli = _make_cli()
result = cli.process_command("/quit --delete")
assert result is False
assert cli._delete_session_on_exit is True
def test_exit_delete_short_form(self):
"""`-d` is a convenience alias for `--delete`."""
cli = _make_cli()
result = cli.process_command("/exit -d")
assert result is False
assert cli._delete_session_on_exit is True
def test_quit_alias_q_is_not_quit(self):
"""`/q` is the alias for `/queue`, not `/quit`. This test documents
that /q --delete does NOT arm session deletion it would dispatch
to /queue instead."""
cli = _make_cli()
cli._pending_input = __import__("queue").Queue()
# /q with no args shows a usage error and keeps the CLI running.
result = cli.process_command("/q")
assert result is not False # queue command doesn't exit
assert cli._delete_session_on_exit is False
def test_delete_flag_is_case_insensitive(self):
cli = _make_cli()
result = cli.process_command("/exit --DELETE")
assert result is False
assert cli._delete_session_on_exit is True
def test_delete_flag_trims_whitespace(self):
cli = _make_cli()
result = cli.process_command("/exit --delete ")
assert result is False
assert cli._delete_session_on_exit is True
def test_unknown_exit_argument_does_not_exit(self):
"""Unrecognised args should NOT exit the CLI — they surface an
error message and stay in the session. This prevents accidental
session destruction from typos like `/exit -delete`."""
cli = _make_cli()
result = cli.process_command("/exit --delte")
# process_command returns True = keep running
assert result is True
assert cli._delete_session_on_exit is False
def test_unknown_exit_argument_prints_help(self):
cli = _make_cli()
# _cprint goes through module-level print, so capture via console.
# We can't patch _cprint directly without import juggling; the
# previous assertion already proves the unknown-arg branch is
# reached (result True + flag False).
result = cli.process_command("/exit garbage")
assert result is True
assert cli._delete_session_on_exit is False
class TestCommandRegistry:
def test_quit_command_advertises_delete_flag(self):
"""The CommandDef args_hint should surface `--delete` in /help and
CLI autocomplete."""
from hermes_cli.commands import resolve_command
cmd = resolve_command("quit")
assert cmd is not None
assert cmd.args_hint == "[--delete]"
def test_exit_alias_resolves_to_quit_with_hint(self):
from hermes_cli.commands import resolve_command
cmd = resolve_command("exit")
assert cmd is not None
assert cmd.name == "quit"
assert cmd.args_hint == "[--delete]"

View file

@ -0,0 +1,122 @@
"""Tests for gateway.memory_monitor — periodic process memory logging.
Ported from cline/cline#10343. The module logs a structured
``[MEMORY] rss=...MB ...`` line periodically so long-running gateway
leaks show up as a time series in agent.log / gateway.log.
"""
from __future__ import annotations
import logging
import time
import pytest
from gateway import memory_monitor as mm
@pytest.fixture(autouse=True)
def _ensure_monitor_stopped():
"""Every test starts from a clean state and leaves one behind."""
mm.stop_memory_monitoring(timeout=1.0)
yield
mm.stop_memory_monitoring(timeout=1.0)
def test_log_memory_usage_emits_memory_line(caplog):
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
mm.log_memory_usage()
memory_lines = [r for r in caplog.records if "[MEMORY]" in r.getMessage()]
assert memory_lines, "expected at least one [MEMORY] log record"
def test_log_memory_usage_has_grep_friendly_format(caplog):
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
mm.log_memory_usage()
msg = caplog.records[-1].getMessage()
# Grep-friendly contract: line starts with [MEMORY] and carries RSS
# (or 'unavailable'), GC counts, thread count, uptime.
assert msg.startswith("[MEMORY]"), msg
assert "rss=" in msg
assert "gc=" in msg
assert "threads=" in msg
assert "uptime=" in msg
def test_log_memory_usage_with_prefix(caplog):
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
mm.log_memory_usage(prefix="baseline")
msg = caplog.records[-1].getMessage()
assert "[MEMORY] baseline " in msg
def test_start_logs_baseline_and_returns_true(caplog):
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
# Large interval so the background timer never fires during the test —
# we're only checking the synchronous baseline behavior here.
started = mm.start_memory_monitoring(interval_seconds=3600.0)
assert started is True
assert mm.is_running() is True
messages = [r.getMessage() for r in caplog.records]
assert any("[MEMORY] baseline " in m for m in messages), messages
assert any("Periodic memory monitoring started" in m for m in messages), messages
def test_double_start_is_noop():
assert mm.start_memory_monitoring(interval_seconds=3600.0) is True
assert mm.start_memory_monitoring(interval_seconds=3600.0) is False
assert mm.is_running() is True
def test_stop_logs_shutdown_snapshot(caplog):
mm.start_memory_monitoring(interval_seconds=3600.0)
caplog.clear()
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
mm.stop_memory_monitoring(timeout=1.0)
assert mm.is_running() is False
messages = [r.getMessage() for r in caplog.records]
assert any("[MEMORY] shutdown " in m for m in messages), messages
assert any("Periodic memory monitoring stopped" in m for m in messages), messages
def test_stop_without_start_is_noop():
# Must not raise, must not log shutdown snapshot.
mm.stop_memory_monitoring(timeout=0.5)
assert mm.is_running() is False
def test_periodic_timer_fires(caplog):
caplog.set_level(logging.INFO, logger="gateway.memory_monitor")
# Short interval so we can observe multiple ticks inside the test budget.
mm.start_memory_monitoring(interval_seconds=0.1)
time.sleep(0.45)
mm.stop_memory_monitoring(timeout=1.0)
periodic = [
r for r in caplog.records
if r.getMessage().startswith("[MEMORY] rss=") or r.getMessage().startswith("[MEMORY] rss=unavailable")
]
# baseline + at least 2 periodic + shutdown — but shutdown has the
# "shutdown " prefix so it won't match the strict "[MEMORY] rss=" start.
# We expect >= 3 bare "[MEMORY] rss=..." lines.
assert len(periodic) >= 3, [r.getMessage() for r in caplog.records]
def test_thread_is_daemon():
mm.start_memory_monitoring(interval_seconds=3600.0)
assert mm._monitor_thread is not None
assert mm._monitor_thread.daemon is True, (
"memory monitor thread must be daemon so it can never block process exit"
)
def test_unavailable_rss_warns_and_does_not_start(caplog, monkeypatch):
# Force both backends to claim unavailable; start should bail.
monkeypatch.setattr(mm, "_get_rss_mb", lambda: None)
caplog.set_level(logging.WARNING, logger="gateway.memory_monitor")
started = mm.start_memory_monitoring(interval_seconds=3600.0)
assert started is False
assert mm.is_running() is False
assert any("Memory monitoring unavailable" in r.getMessage() for r in caplog.records)

View file

@ -0,0 +1,387 @@
"""Tests for the ``hermes send`` CLI subcommand.
Covers the argument parsing / stdin / file / list behavior of
``hermes_cli.send_cmd``. The underlying ``send_message_tool`` is stubbed so
no network I/O or gateway is required.
"""
from __future__ import annotations
import io
import json
from pathlib import Path
import pytest
from hermes_cli import send_cmd
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse(argv):
"""Build the top-level parser and return the parsed args for ``argv``."""
import argparse
parser = argparse.ArgumentParser(prog="hermes")
subparsers = parser.add_subparsers(dest="command")
send_cmd.register_send_subparser(subparsers)
return parser.parse_args(["send", *argv])
class _FakeTool:
"""Replacement for ``tools.send_message_tool.send_message_tool``."""
def __init__(self, payload):
self.payload = payload
self.calls = []
def __call__(self, args, **_kw):
self.calls.append(dict(args))
return json.dumps(self.payload)
@pytest.fixture
def fake_tool(monkeypatch):
"""Install a fake send_message_tool and return the stub for inspection."""
import sys
import types
fake = _FakeTool({"success": True, "message_id": "m123"})
mod = types.ModuleType("tools.send_message_tool")
mod.send_message_tool = fake
# Register the stub so ``from tools.send_message_tool import ...`` inside
# cmd_send resolves to our fake. Also patch the parent ``tools`` package
# entry so attribute lookup works.
monkeypatch.setitem(sys.modules, "tools.send_message_tool", mod)
return fake
# ---------------------------------------------------------------------------
# Happy path
# ---------------------------------------------------------------------------
def test_positional_message_success(fake_tool, capsys):
args = _parse(["--to", "telegram", "hello world"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
assert fake_tool.calls == [
{"action": "send", "target": "telegram", "message": "hello world"}
]
out = capsys.readouterr()
assert "sent" in out.out or out.out == "" # "sent" is the default success banner
def test_stdin_message(fake_tool, monkeypatch, capsys):
# Piped stdin (not a tty) should be consumed as the message body.
monkeypatch.setattr("sys.stdin", io.StringIO("piped body\n"))
# Force isatty to return False so the CLI reads from stdin.
monkeypatch.setattr("sys.stdin.isatty", lambda: False)
args = _parse(["--to", "discord:#ops"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
assert fake_tool.calls[0]["message"] == "piped body\n"
assert fake_tool.calls[0]["target"] == "discord:#ops"
def test_file_message(fake_tool, tmp_path):
body = tmp_path / "msg.txt"
body.write_text("from a file\n")
args = _parse(["--to", "slack:#eng", "--file", str(body)])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
assert fake_tool.calls[0]["message"] == "from a file\n"
def test_file_dash_means_stdin(fake_tool, monkeypatch):
monkeypatch.setattr("sys.stdin", io.StringIO("dash body"))
args = _parse(["--to", "telegram", "--file", "-"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
assert fake_tool.calls[0]["message"] == "dash body"
def test_subject_prepends_header(fake_tool):
args = _parse(["--to", "telegram", "--subject", "[CI]", "body text"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
assert fake_tool.calls[0]["message"] == "[CI]\n\nbody text"
def test_json_mode_emits_payload(fake_tool, capsys):
args = _parse(["--to", "telegram", "--json", "hi"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
out = capsys.readouterr().out
payload = json.loads(out)
assert payload.get("success") is True
assert payload.get("message_id") == "m123"
def test_quiet_suppresses_stdout(fake_tool, capsys):
args = _parse(["--to", "telegram", "--quiet", "shh"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
out = capsys.readouterr()
assert out.out == ""
# ---------------------------------------------------------------------------
# Error paths
# ---------------------------------------------------------------------------
def test_missing_target(fake_tool, capsys, monkeypatch):
# Ensure stdin is a tty so the CLI does not try to consume it as a body.
monkeypatch.setattr("sys.stdin.isatty", lambda: True)
args = _parse(["hello"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 2
err = capsys.readouterr().err
assert "--to" in err
def test_missing_message(fake_tool, capsys, monkeypatch):
monkeypatch.setattr("sys.stdin.isatty", lambda: True)
args = _parse(["--to", "telegram"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 2
err = capsys.readouterr().err
assert "no message" in err.lower()
def test_file_not_found_is_usage_error(fake_tool, capsys, monkeypatch):
monkeypatch.setattr("sys.stdin.isatty", lambda: True)
args = _parse(["--to", "telegram", "--file", "/nonexistent/does-not-exist.txt"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 2
err = capsys.readouterr().err
assert "cannot read" in err.lower()
def test_tool_error_returns_failure_exit(monkeypatch, capsys):
import sys as _sys
import types as _types
fake_mod = _types.ModuleType("tools.send_message_tool")
def _bad_tool(args, **_kw):
return json.dumps({"error": "platform blew up"})
fake_mod.send_message_tool = _bad_tool
monkeypatch.setitem(_sys.modules, "tools.send_message_tool", fake_mod)
args = _parse(["--to", "telegram", "nope"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 1
err = capsys.readouterr().err
assert "platform blew up" in err
def test_skipped_result_is_success(monkeypatch):
import sys as _sys
import types as _types
fake_mod = _types.ModuleType("tools.send_message_tool")
fake_mod.send_message_tool = lambda args, **_kw: json.dumps(
{"success": True, "skipped": True, "reason": "duplicate"}
)
monkeypatch.setitem(_sys.modules, "tools.send_message_tool", fake_mod)
args = _parse(["--to", "telegram", "dup"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
# ---------------------------------------------------------------------------
# --list
# ---------------------------------------------------------------------------
def test_list_human_output(monkeypatch, capsys):
import sys as _sys
import types as _types
fake_dir = _types.ModuleType("gateway.channel_directory")
fake_dir.format_directory_for_display = lambda: "Available messaging targets:\n\nTelegram:\n telegram:-100123\n"
fake_dir.load_directory = lambda: {
"platforms": {"telegram": [{"id": "-100123", "name": "Test Group"}]}
}
monkeypatch.setitem(_sys.modules, "gateway.channel_directory", fake_dir)
args = _parse(["--list"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
out = capsys.readouterr().out
assert "Telegram" in out
def test_list_json(monkeypatch, capsys):
import sys as _sys
import types as _types
fake_dir = _types.ModuleType("gateway.channel_directory")
fake_dir.format_directory_for_display = lambda: "(ignored in json mode)"
fake_dir.load_directory = lambda: {
"platforms": {"telegram": [{"id": "-100123", "name": "Test Group"}]}
}
monkeypatch.setitem(_sys.modules, "gateway.channel_directory", fake_dir)
args = _parse(["--list", "--json"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
out = capsys.readouterr().out
payload = json.loads(out)
assert payload["platforms"]["telegram"][0]["name"] == "Test Group"
def test_list_filter_platform(monkeypatch, capsys):
import sys as _sys
import types as _types
fake_dir = _types.ModuleType("gateway.channel_directory")
fake_dir.format_directory_for_display = lambda: "(should not be called when filter set)"
fake_dir.load_directory = lambda: {
"platforms": {
"telegram": [{"id": "-100123", "name": "TG Chat"}],
"discord": [{"id": "555", "name": "bot-home"}],
}
}
monkeypatch.setitem(_sys.modules, "gateway.channel_directory", fake_dir)
# When --list is set, argparse puts the optional bareword in the
# `message` positional slot (where the send-mode body would go).
args = _parse(["--list", "telegram"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 0
out = capsys.readouterr().out
assert "telegram" in out.lower()
assert "discord" not in out.lower()
def test_list_unknown_platform_fails(monkeypatch, capsys):
import sys as _sys
import types as _types
fake_dir = _types.ModuleType("gateway.channel_directory")
fake_dir.format_directory_for_display = lambda: ""
fake_dir.load_directory = lambda: {"platforms": {"telegram": []}}
monkeypatch.setitem(_sys.modules, "gateway.channel_directory", fake_dir)
args = _parse(["--list", "pigeon-post"])
with pytest.raises(SystemExit) as exc:
send_cmd.cmd_send(args)
assert exc.value.code == 1
err = capsys.readouterr().err
assert "pigeon-post" in err
# ---------------------------------------------------------------------------
# Parser registration contract
# ---------------------------------------------------------------------------
def test_register_send_subparser_is_reusable():
"""Sanity check: the registrar returns a parser and wires ``cmd_send``."""
import argparse
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest="command")
send_parser = send_cmd.register_send_subparser(subparsers)
assert send_parser is not None
args = parser.parse_args(["send", "--to", "telegram", "hi"])
assert args.func is send_cmd.cmd_send
assert args.to == "telegram"
assert args.message == "hi"
# ---------------------------------------------------------------------------
# Env loader
# ---------------------------------------------------------------------------
def test_load_hermes_env_bridges_config_yaml_scalars(tmp_path, monkeypatch):
"""Top-level config.yaml scalars should be bridged into os.environ.
This mirrors the gateway/run.py bootstrap behavior: without this, running
``hermes send`` from a fresh shell cannot resolve the home channel
because ``TELEGRAM_HOME_CHANNEL`` (saved by ``hermes config set``) lives
in config.yaml, not in .env and the gateway's config loader reads via
``os.getenv(...)``.
"""
import os
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / ".env").write_text("SOME_TOKEN=abc123\n")
(hermes_home / "config.yaml").write_text(
"TELEGRAM_HOME_CHANNEL: '5550001111'\nnested:\n ignored: true\n"
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.delenv("TELEGRAM_HOME_CHANNEL", raising=False)
monkeypatch.delenv("SOME_TOKEN", raising=False)
# Force get_hermes_home() to re-resolve under the patched env.
from importlib import reload
import hermes_cli.config as _hc_config
reload(_hc_config)
send_cmd._load_hermes_env()
assert os.environ.get("SOME_TOKEN") == "abc123"
assert os.environ.get("TELEGRAM_HOME_CHANNEL") == "5550001111"
def test_load_hermes_env_does_not_override_existing(tmp_path, monkeypatch):
"""Existing env vars must not be clobbered by config.yaml values."""
import os
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("TELEGRAM_HOME_CHANNEL: yaml_value\n")
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("TELEGRAM_HOME_CHANNEL", "env_value")
from importlib import reload
import hermes_cli.config as _hc_config
reload(_hc_config)
send_cmd._load_hermes_env()
assert os.environ.get("TELEGRAM_HOME_CHANNEL") == "env_value"
def test_load_hermes_env_handles_missing_files(tmp_path, monkeypatch):
"""No .env or config.yaml should be a silent no-op, not an exception."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from importlib import reload
import hermes_cli.config as _hc_config
reload(_hc_config)
# Should not raise.
send_cmd._load_hermes_env()

View file

@ -0,0 +1,180 @@
"""Unit tests for hermes_cli.session_recap."""
from __future__ import annotations
import json
import pytest
from hermes_cli.session_recap import build_recap
def _user(text):
return {"role": "user", "content": text}
def _assistant(text=None, tool_calls=None):
msg = {"role": "assistant", "content": text}
if tool_calls:
msg["tool_calls"] = tool_calls
return msg
def _tool_call(name, args):
return {
"id": f"call_{name}",
"type": "function",
"function": {"name": name, "arguments": json.dumps(args)},
}
def _tool_result(content="ok"):
return {"role": "tool", "content": content}
def test_empty_history():
out = build_recap([])
assert "Session recap" in out
assert "nothing to recap" in out
def test_header_shows_title_when_provided():
out = build_recap([_user("hello")], session_title="Refactor the adapter")
assert "Refactor the adapter" in out.splitlines()[0]
def test_header_shows_short_id_when_no_title():
out = build_recap([_user("hello")], session_id="abcdef1234567890")
assert "abcdef12" in out.splitlines()[0]
def test_counts_recent_turns():
msgs = [
_user("one"),
_assistant("first reply"),
_user("two"),
_assistant("second reply"),
]
out = build_recap(msgs)
assert "2 user turn" in out
assert "assistant repl" in out
def test_last_ask_and_reply_are_surfaced():
msgs = [
_user("old question"),
_assistant("old answer"),
_user("summarise the docs"),
_assistant("here is the summary of the docs you asked for"),
]
out = build_recap(msgs)
assert "summarise the docs" in out
assert "summary of the docs" in out
def test_tool_counts_and_files():
msgs = [
_user("edit the readme and run tests"),
_assistant(
tool_calls=[
_tool_call("read_file", {"path": "README.md"}),
_tool_call("patch", {"path": "README.md"}),
]
),
_tool_result(),
_tool_result(),
_assistant(
tool_calls=[
_tool_call("terminal", {"command": "pytest"}),
]
),
_tool_result("tests ok"),
_assistant("All green."),
]
out = build_recap(msgs)
assert "patch×1" in out
assert "terminal×1" in out
assert "read_file×1" in out
# README.md should appear (may include cwd-relative prefix stripping).
assert "README.md" in out
def test_tool_preview_length_truncates_long_user_prompt():
long = "x " * 500
out = build_recap([_user(long)])
ask_line = [l for l in out.splitlines() if "Last ask" in l][0]
assert len(ask_line) < 300 # truncated with ellipsis
assert "" in ask_line
def test_respects_recent_window():
# 30 turns of user+assistant; only the most recent 20 should be summarised.
msgs = []
for i in range(30):
msgs.append(_user(f"question {i}"))
msgs.append(_assistant(f"answer {i}"))
out = build_recap(msgs)
# We scoped to the 20-turn window but show "of 30/30 total".
assert "of 30/30 total" in out
def test_multimodal_content_blocks_flattened():
msgs = [
{
"role": "user",
"content": [
{"type": "text", "text": "check this file"},
{"type": "image_url", "image_url": {"url": "..."}},
],
},
_assistant("Looked at your image."),
]
out = build_recap(msgs)
assert "check this file" in out
assert "Looked at your image" in out
def test_handles_arguments_as_dict_not_string():
# Some providers return arguments already as a dict.
msgs = [
_user("go"),
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"type": "function",
"function": {
"name": "patch",
"arguments": {"path": "foo.py"},
},
}
],
},
]
out = build_recap(msgs)
assert "patch×1" in out
assert "foo.py" in out
def test_no_assistant_activity_hint():
out = build_recap([_user("just sent my first message")])
assert "no assistant activity" in out or "Last ask" in out
def test_tool_message_count_reported():
msgs = [
_user("go"),
_assistant(tool_calls=[_tool_call("read_file", {"path": "a"})]),
_tool_result(),
_tool_result(),
_assistant("done"),
]
out = build_recap(msgs)
assert "2 tool result" in out
def test_ignores_non_mapping_entries_gracefully():
msgs = [None, "stray", _user("hi"), _assistant("hello")]
# Should not raise.
out = build_recap(msgs)
assert "Session recap" in out

View file

@ -0,0 +1,85 @@
"""Invariants for what is eager vs lazy in the root ``package.json``.
The root ``package.json`` is installed by ``hermes update`` on every user,
including users who never opted into a given browser backend. Anything
listed in ``dependencies`` therefore runs its npm postinstall script for
everyone including binary-fetching backends, on every update.
The contract:
* ``agent-browser`` IS eager. It is the default Chromium-driving backend
used whenever the agent makes a browser call without a cloud provider
configured, so it must already be installed before any session starts.
Its postinstall is also small.
* ``@askjo/camofox-browser`` is NOT eager. It is an explicit opt-in
alternative browser backend, selected by the user via
``hermes tools`` Browser Automation Camofox, and only used at
runtime when ``CAMOFOX_URL`` is set. Its postinstall fetches a ~300MB
Firefox-fork binary, which silently blocked ``hermes update`` for
multi-minute stretches on slow / network-restricted connections
(notably users in China running through a VPN). The package is
installed on demand by ``tools_config.py`` ``post_setup_key ==
"camofox"`` when the user actually selects Camofox.
If a future PR re-adds Camofox (or any other binary-postinstall package)
to root ``dependencies``, this test fails read the lazy-install
guidance in the ``hermes-agent-dev`` skill before changing the
expectations.
"""
from __future__ import annotations
import json
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
def _root_package_json() -> dict:
with (REPO_ROOT / "package.json").open("r", encoding="utf-8") as fh:
return json.load(fh)
def test_camofox_is_not_in_root_dependencies() -> None:
"""Camofox must be opt-in, installed lazily by its post_setup handler."""
deps = _root_package_json().get("dependencies", {})
assert "@askjo/camofox-browser" not in deps, (
"Camofox is a ~300MB binary-postinstall backend that must stay "
"out of root package.json dependencies. It belongs in the "
"Camofox post_setup handler in hermes_cli/tools_config.py so it "
"only installs when the user explicitly selects Camofox via "
"`hermes tools` → Browser Automation → Camofox."
)
def test_agent_browser_stays_eager() -> None:
"""agent-browser is the default backend; it must remain eager."""
deps = _root_package_json().get("dependencies", {})
assert "agent-browser" in deps, (
"agent-browser is the default browser-tool backend used by every "
"session that doesn't have a cloud browser provider configured. "
"It must stay in root package.json dependencies so it is present "
"after `hermes setup` / `hermes update` without an explicit "
"post_setup step."
)
def test_root_lockfile_has_no_camofox_entries() -> None:
"""Regenerated lockfiles should not contain Camofox tree entries."""
lock_path = REPO_ROOT / "package-lock.json"
if not lock_path.exists():
# Some CI matrix shards skip lockfile materialization.
return
text = lock_path.read_text(encoding="utf-8")
assert "@askjo/camofox-browser" not in text, (
"package-lock.json still references @askjo/camofox-browser. "
"Regenerate the lockfile after removing the dep: "
"`rm package-lock.json && npm install --package-lock-only "
"--ignore-scripts --no-fund --no-audit`."
)
assert "camoufox-js" not in text, (
"package-lock.json still references camoufox-js (transitive of "
"@askjo/camofox-browser). Regenerate the lockfile."
)

View file

@ -0,0 +1,125 @@
"""Tests for the MCP remote-URL validator.
Ported from anomalyco/opencode#25019 (``fix: handle invalid mcp urls``).
Previously, a typo in ``config.yaml`` (missing scheme, wrong scheme, empty
string, dict where a URL was expected) caused the MCP server startup code
to enter httpx's URL-parsing path and crash inside the transport layer.
The reconnect-backoff loop would then retry
``_MAX_INITIAL_CONNECT_RETRIES`` times with doubling backoff a minute or
more of pointless retries plus a confusing opaque error message before
eventually giving up.
The fix validates the URL once, up front, and fails fast with a specific
error message identifying the offending server.
"""
from __future__ import annotations
import pytest
from tools.mcp_tool import (
InvalidMcpUrlError,
_validate_remote_mcp_url,
)
class TestValidUrlsAccepted:
"""Every valid http(s) URL must pass through untouched (stripped of whitespace)."""
@pytest.mark.parametrize(
"url",
[
"http://localhost:3000/mcp",
"https://example.com/mcp",
"https://context7.liam.com/mcp",
"http://127.0.0.1:8080",
"https://api.example.com:443/v1/mcp?session=abc",
"http://[::1]:9000/mcp", # IPv6
"https://host.example.com", # no port, no path
],
)
def test_accepts_valid_http_url(self, url):
assert _validate_remote_mcp_url("test", url) == url
def test_strips_surrounding_whitespace(self):
assert (
_validate_remote_mcp_url("test", " https://example.com/mcp ")
== "https://example.com/mcp"
)
class TestInvalidUrlsRejected:
"""Every broken shape must raise ``InvalidMcpUrlError`` with a clear message."""
def test_none_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="context7.*expected a string"):
_validate_remote_mcp_url("context7", None)
def test_dict_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="expected a string, got dict"):
_validate_remote_mcp_url("ctx", {"url": "nested"})
def test_int_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="expected a string, got int"):
_validate_remote_mcp_url("ctx", 8080)
def test_empty_string_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="empty url"):
_validate_remote_mcp_url("ctx", "")
def test_whitespace_only_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="empty url"):
_validate_remote_mcp_url("ctx", " \t\n")
def test_missing_scheme_rejected(self):
# The most common typo — users copy a host from a web page.
with pytest.raises(
InvalidMcpUrlError, match="scheme must be http or https"
):
_validate_remote_mcp_url("ctx", "example.com/mcp")
def test_file_scheme_rejected(self):
with pytest.raises(
InvalidMcpUrlError, match="scheme must be http or https"
):
_validate_remote_mcp_url("ctx", "file:///etc/passwd")
def test_ws_scheme_rejected(self):
# WebSocket is not MCP's remote transport.
with pytest.raises(
InvalidMcpUrlError, match="scheme must be http or https"
):
_validate_remote_mcp_url("ctx", "ws://example.com/mcp")
def test_stdio_scheme_rejected(self):
# stdio servers use the ``command`` key, not ``url``.
with pytest.raises(
InvalidMcpUrlError, match="scheme must be http or https"
):
_validate_remote_mcp_url("ctx", "stdio:///node server.js")
def test_empty_host_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="missing host"):
_validate_remote_mcp_url("ctx", "http:///")
def test_empty_host_with_path_rejected(self):
with pytest.raises(InvalidMcpUrlError, match="missing host"):
_validate_remote_mcp_url("ctx", "https:///path/only")
def test_error_mentions_server_name(self):
# So users can find the bad entry when there are multiple configured.
with pytest.raises(InvalidMcpUrlError, match="my-weird-server"):
_validate_remote_mcp_url("my-weird-server", "not a url at all")
class TestErrorIsValueError:
"""InvalidMcpUrlError must be a ValueError for broad downstream catch blocks."""
def test_is_value_error(self):
try:
_validate_remote_mcp_url("ctx", "garbage")
except ValueError:
pass # expected
else:
pytest.fail("expected ValueError")

View file

@ -0,0 +1,249 @@
---
sidebar_position: 12
title: "Pipe Script Output to Messaging Platforms"
description: "Send text from any shell script, cron job, CI hook, or monitoring daemon to Telegram, Discord, Slack, Signal, and other platforms using `hermes send`."
---
# Pipe Script Output to Messaging Platforms
`hermes send` is a small, scriptable CLI that pushes a message to any
messaging platform Hermes is already configured for. Think of it as a
cross-platform `curl` for notifications — you don't need a running
gateway, you don't need an LLM, and you don't need to re-paste bot tokens
into each of your scripts.
Use it for:
- System monitoring (memory, disk, GPU temp, long-running job finished)
- CI/CD notifications (deploy done, test failure)
- Cron scripts that need to ping you with results
- Quick one-shot messages from a terminal
- Piping any tool's output anywhere (`make | hermes send --to slack:#builds`)
The command reuses the same credentials and platform adapters that `hermes
gateway` already uses, so there's no second configuration surface to
maintain.
---
## Quick Start
```bash
# Plain text to the home channel for a platform
hermes send --to telegram "deploy finished"
# Pipe in stdout from anything
echo "RAM 92%" | hermes send --to telegram:-1001234567890
# Send a file
hermes send --to discord:#ops --file /tmp/report.md
# Attach a subject/header line
hermes send --to slack:#eng --subject "[CI] build.log" --file build.log
# Thread target (Telegram topic, Discord thread)
hermes send --to telegram:-1001234567890:17585 "threaded reply"
# List every configured target
hermes send --list
# Filter by platform
hermes send --list telegram
```
---
## Argument Reference
| Flag | Description |
|------|-------------|
| `-t, --to TARGET` | Destination. See [target formats](#target-formats). |
| `message` (positional) | Message text. Omit to read from `--file` or stdin. |
| `-f, --file PATH` | Read the body from a file. `--file -` forces stdin. |
| `-s, --subject LINE` | Prepend a header/subject line before the body. |
| `-l, --list` | List available targets. Optional positional platform filter. |
| `-q, --quiet` | No stdout on success (exit code only — ideal for scripts). |
| `--json` | Emit the raw JSON result of the send. |
| `-h, --help` | Show the built-in help text. |
### Target Formats
| Format | Example | Meaning |
|--------|---------|---------|
| `platform` | `telegram` | Send to the platform's configured home channel |
| `platform:chat_id` | `telegram:-1001234567890` | Specific numeric chat / group / user |
| `platform:chat_id:thread_id` | `telegram:-1001234567890:17585` | Specific thread or Telegram forum topic |
| `platform:#channel` | `discord:#ops` | Human-friendly channel name (resolved against the channel directory) |
| `platform:+E164` | `signal:+15551234567` | Phone-addressed platforms: Signal, SMS, WhatsApp |
Any platform Hermes ships adapters for works as a target:
`telegram`, `discord`, `slack`, `signal`, `sms`, `whatsapp`, `matrix`,
`mattermost`, `feishu`, `dingtalk`, `wecom`, `weixin`, `email`, and
others.
### Exit Codes
| Code | Meaning |
|------|---------|
| `0` | Send (or list) succeeded |
| `1` | Delivery failed at the platform level (auth, permissions, network) |
| `2` | Usage / argument / config error |
Exit codes follow the standard Unix convention so your scripts can
branch on them the same way they would on `curl` or `grep`.
---
## Message Body Resolution
`hermes send` resolves the message body in this order:
1. **Positional argument**`hermes send --to telegram "hi"`
2. **`--file PATH`** — `hermes send --to telegram --file msg.txt`
3. **Piped stdin**`echo hi | hermes send --to telegram`
When stdin is a TTY (no pipe), Hermes does **not** wait for input — you'll
get a clear usage error instead. This keeps scripts from hanging if they
accidentally omit the body.
---
## Real-World Examples
### Monitoring: Memory / Disk Alerts
Replace ad-hoc `curl https://api.telegram.org/...` calls in your watchdogs
with a single portable line:
```bash
#!/usr/bin/env bash
ram_pct=$(free | awk '/^Mem:/ {printf "%d", $3 * 100 / $2}')
if [ "$ram_pct" -ge 85 ]; then
hermes send --to telegram --subject "⚠ MEMORY WARNING" \
"RAM ${ram_pct}% on $(hostname)"
fi
```
Because `hermes send` reuses your Hermes config, the same script works on
any host where Hermes is installed — no need to export bot tokens into
each machine's environment manually.
:::tip Don't alert the gateway about itself
For watchdogs that might fire when the gateway itself is struggling (OOM
alerts, disk-full alerts), keep using a minimal `curl` call instead of
`hermes send`. If the Python interpreter can't load because the box is
thrashing, you still want that alert to go out.
:::
### CI / CD: Build and Test Results
```bash
# In .github/workflows/deploy.yml or any CI script
if ./scripts/deploy.sh; then
hermes send --to slack:#deploys "✅ ${CI_COMMIT_SHA:0:7} deployed"
else
tail -n 100 deploy.log | hermes send \
--to slack:#deploys --subject "❌ deploy failed"
exit 1
fi
```
### Cron: Daily Report
```bash
# Crontab entry
0 9 * * * /usr/local/bin/generate-metrics.sh \
| /home/me/.hermes/bin/hermes send \
--to telegram --subject "Daily metrics $(date +%Y-%m-%d)"
```
### Long-Running Tasks: Ping When Done
```bash
./train.py --epochs 200 && \
hermes send --to telegram "training done" || \
hermes send --to telegram "training failed (exit $?)"
```
### Scripting with `--json` and `--quiet`
```bash
# Hard-fail a script if delivery fails; don't clutter logs on success
hermes send --to telegram --quiet "keepalive" || {
echo "Telegram delivery failed" >&2
exit 1
}
# Capture the message ID for later editing / threading
msg_id=$(hermes send --to discord:#ops --json "build started" \
| jq -r .message_id)
```
---
## Does `hermes send` Need the Gateway Running?
**Usually no.** For any bot-token platform — Telegram, Discord, Slack,
Signal, SMS, WhatsApp Cloud API, and most others — `hermes send` calls
the platform's REST endpoint directly using credentials from
`~/.hermes/.env` and `~/.hermes/config.yaml`. It's a standalone subprocess
that exits as soon as the message is delivered.
A live gateway is only required for **plugin platforms** that rely on a
persistent adapter connection (for example, a custom plugin that keeps
a long-lived WebSocket open). In that case you'll get a clear error
pointing at the gateway; start it with `hermes gateway start` and retry.
---
## Listing and Discovering Targets
Before sending to a specific channel, you can inspect what's available:
```bash
# Every target across every configured platform
hermes send --list
# Just Telegram targets
hermes send --list telegram
# Machine-readable
hermes send --list --json
```
The listing is built from `~/.hermes/channel_directory.json`, which the
gateway refreshes every few minutes while it's running. If you see
"no channels discovered yet", start the gateway once (`hermes gateway
start`) so it can populate the cache.
Human-friendly names (`discord:#ops`, `slack:#engineering`) are resolved
against this cache at send time, so you don't need to memorize numeric
IDs.
---
## Comparison with Other Approaches
| Approach | Multi-platform | Reuses Hermes creds | Needs gateway | Best for |
|----------|----------------|---------------------|---------------|----------|
| `hermes send` | ✅ | ✅ | No (bot-token) | Everything below |
| Raw `curl` to each platform | Each scripted separately | Manual | No | Critical watchdogs |
| `cron` job with `--deliver` | ✅ | ✅ | No | Scheduled agent tasks |
| `send_message` agent tool | ✅ | ✅ | No | Inside an agent loop |
`hermes send` is intentionally the simplest possible surface. If you need
an agent to decide what to say, use the `send_message` tool from within a
chat or cron job. If you need a scheduled run with LLM-generated content,
use `cronjob(action='create', prompt=...)` with `deliver='telegram:...'`.
If you just need to pipe a raw string, reach for `hermes send`.
---
## Related
- [Automate Anything with Cron](/docs/guides/automate-with-cron) —
scheduled jobs whose output auto-delivers to any platform.
- [Gateway Internals](/docs/developer-guide/gateway-internals) —
the delivery router that `hermes send` shares with cron delivery.
- [Messaging Platform Setup](/docs/user-guide/messaging/) —
one-time configuration for each platform.