mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
Lift the `_handle_*_command` cluster (2,077 LOC) out of HermesCLI into hermes_cli/cli_commands_mixin.py; HermesCLI now inherits CLICommandsMixin so every self.<handler> call resolves unchanged via the MRO. Behavior-neutral. Import discipline mirrors gateway/slash_commands.py (PR #41886): neutral deps imported at the mixin module top level; cli.py-internal helpers/constants (_cprint, _ACCENT, save_config_value, ...) imported lazily inside each handler via 'from cli import ...' so the mixin never imports cli at module scope. cli.py 16215 -> 14139 LOC. One test mock repointed (cli.is_browser_debug_ready -> hermes_cli.cli_commands_mixin.is_browser_debug_ready).
2175 lines
93 KiB
Python
2175 lines
93 KiB
Python
"""Slash-command handlers for the interactive CLI (god-file decomposition Phase 4).
|
|
|
|
This module hosts the ``_handle_*_command`` slash-command handlers lifted out of
|
|
``cli.py``'s ``HermesCLI`` class. ``HermesCLI`` inherits ``CLICommandsMixin`` so
|
|
every ``self.<handler>`` call resolves unchanged via the MRO — behavior-neutral.
|
|
|
|
Import discipline (mirrors gateway/slash_commands.py, PR #41886):
|
|
* Neutral, non-cyclic deps are imported at module top-level below.
|
|
* cli.py-internal symbols (the ``_cprint``/``_ACCENT``/``save_config_value``…
|
|
module-level helpers and constants) are imported LAZILY inside each handler
|
|
via ``from cli import ...`` — that resolves at call time when ``cli`` is fully
|
|
loaded, so the mixin module never imports ``cli`` at top level (no cycle).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
|
|
from rich import box as rich_box
|
|
from rich.markup import escape as _escape
|
|
from rich.panel import Panel
|
|
|
|
from hermes_constants import display_hermes_home, is_termux as _is_termux_environment
|
|
from hermes_cli.browser_connect import (
|
|
DEFAULT_BROWSER_CDP_URL,
|
|
is_browser_debug_ready,
|
|
manual_chrome_debug_command,
|
|
)
|
|
|
|
|
|
class CLICommandsMixin:
|
|
"""Mixin holding the interactive-CLI slash-command handlers.
|
|
|
|
All methods use only ``self`` state plus the imports above and per-method
|
|
lazy ``from cli import ...`` lines, so they compose cleanly onto
|
|
``HermesCLI`` via the MRO.
|
|
"""
|
|
|
|
def _handle_rollback_command(self, command: str):
|
|
"""Handle /rollback — list, diff, or restore filesystem checkpoints.
|
|
|
|
Syntax:
|
|
/rollback — list checkpoints
|
|
/rollback <N> — restore checkpoint N (also undoes last chat turn)
|
|
/rollback diff <N> — preview changes since checkpoint N
|
|
/rollback <N> <file> — restore a single file from checkpoint N
|
|
"""
|
|
from tools.checkpoint_manager import format_checkpoint_list
|
|
|
|
if not hasattr(self, 'agent') or not self.agent:
|
|
print(" No active agent session.")
|
|
return
|
|
|
|
mgr = self.agent._checkpoint_mgr
|
|
if not mgr.enabled:
|
|
print(" Checkpoints are not enabled.")
|
|
print(" Enable with: hermes --checkpoints")
|
|
print(" Or in config.yaml: checkpoints: { enabled: true }")
|
|
return
|
|
|
|
cwd = os.getenv("TERMINAL_CWD", os.getcwd())
|
|
parts = command.split()
|
|
args = parts[1:] if len(parts) > 1 else []
|
|
|
|
if not args:
|
|
# List checkpoints
|
|
checkpoints = mgr.list_checkpoints(cwd)
|
|
print(format_checkpoint_list(checkpoints, cwd))
|
|
return
|
|
|
|
# Handle /rollback diff <N>
|
|
if args[0].lower() == "diff":
|
|
if len(args) < 2:
|
|
print(" Usage: /rollback diff <N>")
|
|
return
|
|
checkpoints = mgr.list_checkpoints(cwd)
|
|
if not checkpoints:
|
|
print(f" No checkpoints found for {cwd}")
|
|
return
|
|
target_hash = self._resolve_checkpoint_ref(args[1], checkpoints)
|
|
if not target_hash:
|
|
return
|
|
result = mgr.diff(cwd, target_hash)
|
|
if result["success"]:
|
|
stat = result.get("stat", "")
|
|
diff = result.get("diff", "")
|
|
if not stat and not diff:
|
|
print(" No changes since this checkpoint.")
|
|
else:
|
|
if stat:
|
|
print(f"\n{stat}")
|
|
if diff:
|
|
# Limit diff output to avoid terminal flood
|
|
diff_lines = diff.splitlines()
|
|
if len(diff_lines) > 80:
|
|
print("\n".join(diff_lines[:80]))
|
|
print(f"\n ... ({len(diff_lines) - 80} more lines, showing first 80)")
|
|
else:
|
|
print(f"\n{diff}")
|
|
else:
|
|
print(f" ❌ {result['error']}")
|
|
return
|
|
|
|
# Resolve checkpoint reference (number or hash)
|
|
checkpoints = mgr.list_checkpoints(cwd)
|
|
if not checkpoints:
|
|
print(f" No checkpoints found for {cwd}")
|
|
return
|
|
|
|
target_hash = self._resolve_checkpoint_ref(args[0], checkpoints)
|
|
if not target_hash:
|
|
return
|
|
|
|
# Check for file-level restore: /rollback <N> <file>
|
|
file_path = args[1] if len(args) > 1 else None
|
|
|
|
result = mgr.restore(cwd, target_hash, file_path=file_path)
|
|
if result["success"]:
|
|
if file_path:
|
|
print(f" ✅ Restored {file_path} from checkpoint {result['restored_to']}: {result['reason']}")
|
|
else:
|
|
print(f" ✅ Restored to checkpoint {result['restored_to']}: {result['reason']}")
|
|
print(" A pre-rollback snapshot was saved automatically.")
|
|
|
|
# Also undo the last conversation turn so the agent's context
|
|
# matches the restored filesystem state
|
|
if self.conversation_history:
|
|
self.undo_last(prefill=False)
|
|
print(" Chat turn undone to match restored file state.")
|
|
else:
|
|
print(f" ❌ {result['error']}")
|
|
|
|
def _handle_snapshot_command(self, command: str):
|
|
"""Handle /snapshot — lightweight state snapshots for Hermes config/state.
|
|
|
|
Syntax:
|
|
/snapshot — list recent snapshots
|
|
/snapshot create [label] — create a snapshot
|
|
/snapshot restore <id> — restore state from snapshot
|
|
/snapshot prune [N] — prune to N snapshots (default 20)
|
|
"""
|
|
from hermes_cli.backup import (
|
|
create_quick_snapshot, list_quick_snapshots,
|
|
restore_quick_snapshot, prune_quick_snapshots,
|
|
)
|
|
from hermes_constants import display_hermes_home
|
|
|
|
parts = command.split()
|
|
subcmd = parts[1].lower() if len(parts) > 1 else "list"
|
|
|
|
if subcmd in {"list", "ls"}:
|
|
snaps = list_quick_snapshots()
|
|
if not snaps:
|
|
print(" No state snapshots yet.")
|
|
print(" Create one: /snapshot create [label]")
|
|
return
|
|
print(f" State snapshots ({display_hermes_home()}/state-snapshots/):\n")
|
|
print(f" {'#':>3} {'ID':<35} {'Files':>5} {'Size':>10} {'Label'}")
|
|
print(f" {'─'*3} {'─'*35} {'─'*5} {'─'*10} {'─'*20}")
|
|
for i, s in enumerate(snaps, 1):
|
|
size = s.get("total_size", 0)
|
|
if size < 1024:
|
|
size_str = f"{size} B"
|
|
elif size < 1024 * 1024:
|
|
size_str = f"{size / 1024:.0f} KB"
|
|
else:
|
|
size_str = f"{size / 1024 / 1024:.1f} MB"
|
|
label = s.get("label") or ""
|
|
print(f" {i:3} {s['id']:<35} {s.get('file_count', 0):>5} {size_str:>10} {label}")
|
|
|
|
elif subcmd == "create":
|
|
label = " ".join(parts[2:]) if len(parts) > 2 else None
|
|
snap_id = create_quick_snapshot(label=label)
|
|
if snap_id:
|
|
print(f" Snapshot created: {snap_id}")
|
|
else:
|
|
print(" No state files found to snapshot.")
|
|
|
|
elif subcmd in {"restore", "rewind"}:
|
|
if len(parts) < 3:
|
|
print(" Usage: /snapshot restore <snapshot-id>")
|
|
# Show hint with most recent snapshot
|
|
snaps = list_quick_snapshots(limit=1)
|
|
if snaps:
|
|
print(f" Most recent: {snaps[0]['id']}")
|
|
return
|
|
snap_id = parts[2]
|
|
# Allow restore by number (1-indexed)
|
|
try:
|
|
idx = int(snap_id)
|
|
snaps = list_quick_snapshots()
|
|
if 1 <= idx <= len(snaps):
|
|
snap_id = snaps[idx - 1]["id"]
|
|
else:
|
|
print(f" Invalid snapshot number. Use 1-{len(snaps)}.")
|
|
return
|
|
except ValueError:
|
|
pass
|
|
if restore_quick_snapshot(snap_id):
|
|
print(f" Restored state from: {snap_id}")
|
|
print(" Restart recommended for state.db changes to take effect.")
|
|
else:
|
|
print(f" Snapshot not found: {snap_id}")
|
|
|
|
elif subcmd == "prune":
|
|
keep = 20
|
|
if len(parts) > 2:
|
|
try:
|
|
keep = int(parts[2])
|
|
except ValueError:
|
|
print(" Usage: /snapshot prune [keep-count]")
|
|
return
|
|
deleted = prune_quick_snapshots(keep=keep)
|
|
print(f" Pruned {deleted} old snapshot(s) (keeping {keep}).")
|
|
|
|
else:
|
|
print(f" Unknown subcommand: {subcmd}")
|
|
print(" Usage: /snapshot [list|create [label]|restore <id>|prune [N]]")
|
|
|
|
def _handle_stop_command(self):
|
|
"""Handle /stop — kill all running background processes.
|
|
|
|
Inspired by OpenAI Codex's separation of interrupt (stop current turn)
|
|
from /stop (clean up background processes). See openai/codex#14602.
|
|
"""
|
|
from tools.process_registry import process_registry
|
|
|
|
processes = process_registry.list_sessions()
|
|
running = [p for p in processes if p.get("status") == "running"]
|
|
|
|
if not running:
|
|
print(" No running background processes.")
|
|
return
|
|
|
|
print(f" Stopping {len(running)} background process(es)...")
|
|
killed = process_registry.kill_all()
|
|
print(f" ✅ Stopped {killed} process(es).")
|
|
|
|
def _handle_agents_command(self):
|
|
"""Handle /agents — show background processes and agent status."""
|
|
from cli import _cprint
|
|
from tools.process_registry import format_uptime_short, process_registry
|
|
|
|
processes = process_registry.list_sessions()
|
|
running = [p for p in processes if p.get("status") == "running"]
|
|
finished = [p for p in processes if p.get("status") != "running"]
|
|
|
|
_cprint(f" Running processes: {len(running)}")
|
|
for p in running:
|
|
cmd = p.get("command", "")[:80]
|
|
up = format_uptime_short(p.get("uptime_seconds", 0))
|
|
_cprint(f" {p.get('session_id', '?')} · {up} · {cmd}")
|
|
|
|
if finished:
|
|
_cprint(f" Recently finished: {len(finished)}")
|
|
|
|
agent_running = getattr(self, "_agent_running", False)
|
|
_cprint(f" Agent: {'running' if agent_running else 'idle'}")
|
|
|
|
def _handle_paste_command(self):
|
|
"""Handle /paste — explicitly check clipboard for an image.
|
|
|
|
This is the reliable fallback for terminals where BracketedPaste
|
|
doesn't fire for image-only clipboard content (e.g., VSCode terminal,
|
|
Windows Terminal with WSL2).
|
|
"""
|
|
from cli import _DIM, _RST, _cprint, _termux_example_image_path
|
|
if _is_termux_environment():
|
|
_cprint(
|
|
f" {_DIM}Clipboard image paste is not available on Termux — "
|
|
f"use /image <path> or paste a local image path like "
|
|
f"{_termux_example_image_path()}{_RST}"
|
|
)
|
|
return
|
|
|
|
from hermes_cli.clipboard import has_clipboard_image
|
|
if has_clipboard_image():
|
|
if self._try_attach_clipboard_image():
|
|
n = len(self._attached_images)
|
|
_cprint(f" 📎 Image #{n} attached from clipboard")
|
|
else:
|
|
_cprint(f" {_DIM}(>_<) Clipboard has an image but extraction failed{_RST}")
|
|
else:
|
|
_cprint(f" {_DIM}(._.) No image found in clipboard{_RST}")
|
|
|
|
def _handle_copy_command(self, cmd_original: str) -> None:
|
|
"""Handle /copy [number] — copy assistant output to clipboard."""
|
|
from cli import _assistant_copy_text, _cprint
|
|
parts = cmd_original.split(maxsplit=1)
|
|
arg = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
assistant = [m for m in self.conversation_history if m.get("role") == "assistant"]
|
|
if not assistant:
|
|
_cprint(" Nothing to copy yet.")
|
|
return
|
|
|
|
if arg:
|
|
try:
|
|
idx = int(arg) - 1
|
|
except ValueError:
|
|
_cprint(" Usage: /copy [number]")
|
|
return
|
|
if idx < 0 or idx >= len(assistant):
|
|
_cprint(f" Invalid response number. Use 1-{len(assistant)}.")
|
|
return
|
|
else:
|
|
idx = len(assistant) - 1
|
|
while idx >= 0 and not _assistant_copy_text(assistant[idx].get("content")):
|
|
idx -= 1
|
|
if idx < 0:
|
|
_cprint(" Nothing to copy in assistant responses yet.")
|
|
return
|
|
|
|
text = _assistant_copy_text(assistant[idx].get("content"))
|
|
if not text:
|
|
_cprint(" Nothing to copy in that assistant response.")
|
|
return
|
|
|
|
try:
|
|
self._write_osc52_clipboard(text)
|
|
_cprint(f" Copied assistant response #{idx + 1} to clipboard")
|
|
except Exception as e:
|
|
_cprint(f" Clipboard copy failed: {e}")
|
|
|
|
def _handle_image_command(self, cmd_original: str):
|
|
"""Handle /image <path> — attach a local image file for the next prompt."""
|
|
from cli import _DIM, _IMAGE_EXTENSIONS, _RST, _cprint, _resolve_attachment_path, _split_path_input, _termux_example_image_path
|
|
raw_args = (cmd_original.split(None, 1)[1].strip() if " " in cmd_original else "")
|
|
if not raw_args:
|
|
hint = _termux_example_image_path() if _is_termux_environment() else "/path/to/image.png"
|
|
_cprint(f" {_DIM}Usage: /image <path> e.g. /image {hint}{_RST}")
|
|
return
|
|
|
|
path_token, _remainder = _split_path_input(raw_args)
|
|
image_path = _resolve_attachment_path(path_token)
|
|
if image_path is None:
|
|
_cprint(f" {_DIM}(>_<) File not found: {path_token}{_RST}")
|
|
return
|
|
if image_path.suffix.lower() not in _IMAGE_EXTENSIONS:
|
|
_cprint(f" {_DIM}(._.) Not a supported image file: {image_path.name}{_RST}")
|
|
return
|
|
|
|
self._attached_images.append(image_path)
|
|
_cprint(f" 📎 Attached image: {image_path.name}")
|
|
if _remainder:
|
|
_cprint(f" {_DIM}Now type your prompt (or use --image in single-query mode): {_remainder}{_RST}")
|
|
elif _is_termux_environment():
|
|
_cprint(f" {_DIM}Tip: type your next message, or run hermes chat -q --image {_termux_example_image_path(image_path.name)} \"What do you see?\"{_RST}")
|
|
|
|
def _handle_tools_command(self, cmd: str):
|
|
"""Handle /tools [list|disable|enable] slash commands.
|
|
|
|
/tools (no args) shows the tool list.
|
|
/tools list shows enabled/disabled status per toolset.
|
|
/tools disable/enable saves the change to config and resets
|
|
the session so the new tool set takes effect cleanly (no
|
|
prompt-cache breakage mid-conversation).
|
|
"""
|
|
from cli import _ACCENT, _DIM, _RST, _cprint
|
|
import shlex
|
|
from argparse import Namespace
|
|
from contextlib import redirect_stdout
|
|
from io import StringIO
|
|
from hermes_cli.tools_config import tools_disable_enable_command
|
|
|
|
def _run_capture(ns: Namespace) -> None:
|
|
"""Run tools_disable_enable_command, routing its ANSI-colored
|
|
print() output through _cprint when inside the interactive TUI
|
|
so escapes aren't mangled by patch_stdout's StdoutProxy into
|
|
garbled '?[32m...?[0m' text.
|
|
|
|
Outside the TUI (standalone mode, tests), call straight through
|
|
so real stdout / pytest capture works as expected.
|
|
"""
|
|
# Standalone/tests, run as usual
|
|
if getattr(self, "_app", None) is None:
|
|
tools_disable_enable_command(ns)
|
|
return
|
|
|
|
# Buffer reports isatty()=True so color() in hermes_cli/colors.py
|
|
# still emits ANSI escapes. StringIO.isatty() is False, which
|
|
# would otherwise strip all colors before we re-render them.
|
|
class _TTYBuf(StringIO):
|
|
def isatty(self) -> bool:
|
|
return True
|
|
|
|
buf = _TTYBuf()
|
|
with redirect_stdout(buf):
|
|
tools_disable_enable_command(ns)
|
|
for line in buf.getvalue().splitlines():
|
|
_cprint(line)
|
|
|
|
try:
|
|
parts = shlex.split(cmd)
|
|
except ValueError:
|
|
parts = cmd.split()
|
|
|
|
subcommand = parts[1] if len(parts) > 1 else ""
|
|
if subcommand not in {"list", "disable", "enable"}:
|
|
self.show_tools()
|
|
return
|
|
|
|
if subcommand == "list":
|
|
_run_capture(Namespace(tools_action="list", platform="cli"))
|
|
return
|
|
|
|
names = parts[2:]
|
|
if not names:
|
|
print(f"(._.) Usage: /tools {subcommand} <name> [name ...]")
|
|
print(f" Built-in toolset: /tools {subcommand} web")
|
|
print(f" MCP tool: /tools {subcommand} github:create_issue")
|
|
return
|
|
|
|
# Apply the change directly — the user typing the command is implicit
|
|
# consent. Do NOT use input() here; it hangs inside prompt_toolkit's
|
|
# TUI event loop (known pitfall).
|
|
verb = "Disabling" if subcommand == "disable" else "Enabling"
|
|
label = ", ".join(names)
|
|
_cprint(f"{_ACCENT}{verb} {label}...{_RST}")
|
|
|
|
_run_capture(Namespace(tools_action=subcommand, names=names, platform="cli"))
|
|
|
|
# Reset session so the new tool config is picked up from a clean state
|
|
from hermes_cli.tools_config import _get_platform_tools
|
|
from hermes_cli.config import load_config
|
|
self.enabled_toolsets = _get_platform_tools(load_config(), "cli")
|
|
self.new_session()
|
|
_cprint(f"{_DIM}Session reset. New tool configuration is active.{_RST}")
|
|
|
|
def _handle_profile_command(self):
|
|
"""Display active profile name and home directory."""
|
|
from hermes_constants import display_hermes_home
|
|
from hermes_cli.profiles import get_active_profile_name
|
|
|
|
display = display_hermes_home()
|
|
profile_name = get_active_profile_name()
|
|
|
|
print()
|
|
print(f" Profile: {profile_name}")
|
|
print(f" Home: {display}")
|
|
print()
|
|
|
|
def _handle_handoff_command(self, cmd_original: str) -> bool:
|
|
"""Handle ``/handoff <platform>`` — transfer this CLI session to a gateway platform.
|
|
|
|
Flow:
|
|
1. Validate platform name + the gateway has a home channel for it.
|
|
2. Reject if the agent is currently running (the in-flight turn
|
|
would race with the gateway's switch_session).
|
|
3. Write ``handoff_state='pending'`` on this session row.
|
|
4. Block-poll ``state.db`` for terminal state (timeout 60s).
|
|
5. On ``completed`` → print resume hint and signal CLI exit by
|
|
returning False (the caller honors that like ``/quit``).
|
|
6. On ``failed`` / timeout → print error and return True so the
|
|
user keeps their CLI session.
|
|
|
|
Returns:
|
|
False to signal CLI exit, True to keep going.
|
|
"""
|
|
from cli import _cprint
|
|
from hermes_state import format_session_db_unavailable
|
|
|
|
parts = cmd_original.split(maxsplit=1)
|
|
if len(parts) < 2 or not parts[1].strip():
|
|
_cprint(" Usage: /handoff <platform>")
|
|
_cprint(" Hands the current session off to that platform's home channel.")
|
|
_cprint(" The CLI session ends here; resume it later with /resume.")
|
|
return True
|
|
|
|
platform_name = parts[1].strip().lower()
|
|
|
|
# Validate platform name + home channel via the live gateway config.
|
|
try:
|
|
from gateway.config import load_gateway_config, Platform
|
|
except Exception as exc: # pragma: no cover — gateway pkg always shipped
|
|
_cprint(f" Could not load gateway config: {exc}")
|
|
return True
|
|
|
|
try:
|
|
platform = Platform(platform_name)
|
|
except (ValueError, KeyError):
|
|
_cprint(f" Unknown platform '{platform_name}'.")
|
|
return True
|
|
|
|
try:
|
|
gw_config = load_gateway_config()
|
|
except Exception as exc:
|
|
_cprint(f" Could not load gateway config: {exc}")
|
|
return True
|
|
|
|
pcfg = gw_config.platforms.get(platform)
|
|
if not pcfg or not pcfg.enabled:
|
|
_cprint(f" Platform '{platform_name}' is not configured/enabled in the gateway.")
|
|
return True
|
|
|
|
home = gw_config.get_home_channel(platform)
|
|
if not home or not home.chat_id:
|
|
_cprint(f" No home channel configured for {platform_name}.")
|
|
_cprint(f" Set one with /sethome on the destination chat first.")
|
|
return True
|
|
|
|
# Refuse mid-turn: an in-flight agent run would race with the
|
|
# gateway's switch_session and the synthetic turn dispatch.
|
|
if getattr(self, "_agent_running", False):
|
|
_cprint(" Agent is busy. Wait for the current turn to finish, then retry /handoff.")
|
|
return True
|
|
|
|
# Make sure we have a SessionDB handle.
|
|
if not self._session_db:
|
|
try:
|
|
from hermes_state import SessionDB
|
|
self._session_db = SessionDB()
|
|
except Exception:
|
|
pass
|
|
if not self._session_db:
|
|
_cprint(f" {format_session_db_unavailable()}")
|
|
return True
|
|
|
|
# Make sure the session row exists in state.db. Most CLI sessions
|
|
# are written via _flush_messages_to_session_db on the first turn
|
|
# already, but if the user tries to hand off an empty session we
|
|
# still want a row to mark.
|
|
try:
|
|
row = self._session_db.get_session(self.session_id)
|
|
if not row:
|
|
# Nothing has flushed yet. Create a stub so the gateway has
|
|
# something to switch_session onto. Inserting via title-set
|
|
# is the simplest path because set_session_title's INSERT OR
|
|
# IGNORE creates the row.
|
|
placeholder_title = f"handoff-{self.session_id[:8]}"
|
|
self._session_db.set_session_title(self.session_id, placeholder_title)
|
|
except Exception as exc:
|
|
_cprint(f" Could not ensure session row in state.db: {exc}")
|
|
return True
|
|
|
|
# Display title for messaging.
|
|
session_title = ""
|
|
try:
|
|
row = self._session_db.get_session(self.session_id)
|
|
if row:
|
|
session_title = row.get("title") or ""
|
|
except Exception:
|
|
pass
|
|
if not session_title:
|
|
session_title = self.session_id[:8]
|
|
|
|
# Mark pending — gateway watcher will pick this up.
|
|
ok = self._session_db.request_handoff(self.session_id, platform_name)
|
|
if not ok:
|
|
_cprint(" Session is already in flight for handoff. Wait for it to settle, then retry.")
|
|
return True
|
|
|
|
_cprint(f" Queued handoff of '{session_title}' → {platform_name} (home: {home.name}).")
|
|
_cprint(f" Waiting for the gateway to pick it up...")
|
|
|
|
# Poll-block on terminal state. Tick every 0.5s; bail at ~60s.
|
|
import time as _time
|
|
deadline = _time.time() + 60.0
|
|
last_state = "pending"
|
|
while _time.time() < deadline:
|
|
try:
|
|
state_row = self._session_db.get_handoff_state(self.session_id)
|
|
except Exception:
|
|
state_row = None
|
|
current = (state_row or {}).get("state") or "pending"
|
|
if current != last_state:
|
|
if current == "running":
|
|
_cprint(" Gateway picked it up; transferring...")
|
|
last_state = current
|
|
if current == "completed":
|
|
_cprint("")
|
|
_cprint(f" ↻ Handoff complete. The session is now active on {platform_name}.")
|
|
_cprint(f" Resume it on this CLI later with: /resume {session_title}")
|
|
_cprint("")
|
|
# End the CLI cleanly — same exit semantics as /quit.
|
|
self._should_exit = True
|
|
return False
|
|
if current == "failed":
|
|
err = (state_row or {}).get("error") or "unknown error"
|
|
_cprint(f" Handoff failed: {err}")
|
|
_cprint(" Your CLI session is intact. Try /handoff again, or /resume on the platform manually.")
|
|
return True
|
|
_time.sleep(0.5)
|
|
|
|
# Timed out. Clear the pending flag so the user can retry.
|
|
try:
|
|
self._session_db.fail_handoff(self.session_id, "timed out waiting for gateway")
|
|
except Exception:
|
|
pass
|
|
_cprint(" Timed out waiting for the gateway. Is `hermes gateway` running?")
|
|
_cprint(" Your CLI session is intact.")
|
|
return True
|
|
|
|
def _handle_resume_command(self, cmd_original: str) -> None:
|
|
"""Handle /resume <session_id_or_title> — switch to a previous session mid-conversation."""
|
|
from cli import _cprint, _sync_process_session_id
|
|
parts = cmd_original.split(None, 1)
|
|
target = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
# Strip common outer brackets/quotes users may type literally from the
|
|
# usage hint (e.g. ``/resume <abc123>`` or ``/resume [abc123]``). The
|
|
# `/resume` help text shows angle brackets as a placeholder and a few
|
|
# users copy them through verbatim. Stripping them keeps the lookup
|
|
# working without changing the help string.
|
|
if len(target) >= 2 and (
|
|
(target[0] == "<" and target[-1] == ">")
|
|
or (target[0] == "[" and target[-1] == "]")
|
|
or (target[0] == '"' and target[-1] == '"')
|
|
or (target[0] == "'" and target[-1] == "'")
|
|
):
|
|
target = target[1:-1].strip()
|
|
|
|
if not target:
|
|
_cprint(" Usage: /resume <number|session_id_or_title>")
|
|
if self._show_recent_sessions(reason="resume"):
|
|
# Arm a one-shot pending-resume selection so the user can type
|
|
# just the number (`3`) on the next line instead of having to
|
|
# retype `/resume 3`. The list here must match the one shown by
|
|
# _show_recent_sessions and used for index resolution below —
|
|
# all three go through _list_recent_sessions(limit=10). See
|
|
# #34584.
|
|
self._pending_resume_sessions = self._list_recent_sessions(limit=10)
|
|
return
|
|
_cprint(" Tip: Use /history or `hermes sessions list` to find sessions.")
|
|
return
|
|
|
|
# Any explicit /resume <target> supersedes a previously-armed bare
|
|
# numbered prompt.
|
|
self._pending_resume_sessions = None
|
|
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
_cprint(f" {format_session_db_unavailable()}")
|
|
return
|
|
|
|
# Resolve numbered selection, title, or ID
|
|
if target.isdigit():
|
|
sessions = self._list_recent_sessions(limit=10)
|
|
index = int(target)
|
|
if index < 1 or index > len(sessions):
|
|
_cprint(f" Resume index {index} is out of range.")
|
|
_cprint(" Use /resume with no arguments to see available sessions.")
|
|
return
|
|
selected = sessions[index - 1]
|
|
target_id = selected["id"]
|
|
else:
|
|
from hermes_cli.main import _resolve_session_by_name_or_id
|
|
resolved = _resolve_session_by_name_or_id(target)
|
|
target_id = resolved or target
|
|
|
|
session_meta = self._session_db.get_session(target_id)
|
|
if not session_meta:
|
|
_cprint(f" Session not found: {target}")
|
|
_cprint(" Use /history or `hermes sessions list` to see available sessions.")
|
|
return
|
|
|
|
# If the target is the empty head of a compression chain, redirect to
|
|
# the descendant that actually holds the transcript. See #15000.
|
|
try:
|
|
resolved_id = self._session_db.resolve_resume_session_id(target_id)
|
|
except Exception:
|
|
resolved_id = target_id
|
|
if resolved_id and resolved_id != target_id:
|
|
_cprint(
|
|
f" Session {target_id} was compressed into {resolved_id}; "
|
|
f"resuming the descendant with your transcript."
|
|
)
|
|
target_id = resolved_id
|
|
resolved_meta = self._session_db.get_session(target_id)
|
|
if resolved_meta:
|
|
session_meta = resolved_meta
|
|
|
|
if target_id == self.session_id:
|
|
_cprint(" Already on that session.")
|
|
return
|
|
|
|
old_session_id = self.session_id
|
|
# End current session
|
|
try:
|
|
self._session_db.end_session(self.session_id, "resumed_other")
|
|
except Exception:
|
|
pass
|
|
|
|
# Switch to the target session
|
|
self.session_id = target_id
|
|
self._resumed = True
|
|
self._pending_title = None
|
|
_sync_process_session_id(target_id)
|
|
|
|
# Load conversation history (strip transcript-only metadata entries)
|
|
restored = self._session_db.get_messages_as_conversation(target_id)
|
|
restored = [m for m in (restored or []) if m.get("role") != "session_meta"]
|
|
self.conversation_history = restored
|
|
|
|
# Re-open the target session so it's not marked as ended
|
|
try:
|
|
self._session_db.reopen_session(target_id)
|
|
except Exception:
|
|
pass
|
|
|
|
# Sync the agent if already initialised
|
|
if self.agent:
|
|
self.agent.session_id = target_id
|
|
self.agent.reset_session_state()
|
|
if hasattr(self.agent, "_last_flushed_db_idx"):
|
|
self.agent._last_flushed_db_idx = len(self.conversation_history)
|
|
if hasattr(self.agent, "_todo_store"):
|
|
try:
|
|
from tools.todo_tool import TodoStore
|
|
self.agent._todo_store = TodoStore()
|
|
except Exception:
|
|
pass
|
|
if hasattr(self.agent, "_invalidate_system_prompt"):
|
|
self.agent._invalidate_system_prompt()
|
|
|
|
# Notify memory providers that session_id rotated to a resumed
|
|
# session. reset=False — the provider's accumulated state is
|
|
# still valid; it just needs to target the new session_id for
|
|
# subsequent writes. See #6672.
|
|
try:
|
|
_mm = getattr(self.agent, "_memory_manager", None)
|
|
if _mm is not None:
|
|
_mm.on_session_switch(
|
|
target_id,
|
|
parent_session_id=old_session_id or "",
|
|
reset=False,
|
|
reason="resume",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
title_part = f" \"{session_meta['title']}\"" if session_meta.get("title") else ""
|
|
msg_count = len([m for m in self.conversation_history if m.get("role") == "user"])
|
|
if self.conversation_history:
|
|
_cprint(
|
|
f" ↻ Resumed session {target_id}{title_part}"
|
|
f" ({msg_count} user message{'s' if msg_count != 1 else ''},"
|
|
f" {len(self.conversation_history)} total)"
|
|
)
|
|
self._display_resumed_history()
|
|
else:
|
|
_cprint(f" ↻ Resumed session {target_id}{title_part} — no messages, starting fresh.")
|
|
|
|
def _handle_sessions_command(self, cmd_original: str) -> None:
|
|
"""Handle /sessions [list|<id_or_title>] — browse or resume previous sessions.
|
|
|
|
Without arguments, prints the same recent-sessions table that /resume
|
|
shows when called without a target, and tells the user how to resume.
|
|
With an explicit subcommand or target, delegates to the resume flow so
|
|
``/sessions <id>`` and ``/resume <id>`` behave identically.
|
|
|
|
The TUI ships an interactive picker overlay for this command; the
|
|
classic CLI prints an inline list because there is no equivalent
|
|
overlay primitive here. Without this handler the canonical name
|
|
``sessions`` falls through ``process_command``'s elif chain and
|
|
prints ``Unknown command: sessions`` even though the command is
|
|
registered in the central COMMAND_REGISTRY.
|
|
"""
|
|
from cli import _cprint
|
|
parts = cmd_original.split(None, 1)
|
|
arg = parts[1].strip() if len(parts) > 1 else ""
|
|
sub = arg.lower()
|
|
|
|
# Bare /sessions or /sessions list — show recent sessions inline.
|
|
if not arg or sub in {"list", "ls", "browse"}:
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
_cprint(f" {format_session_db_unavailable()}")
|
|
return
|
|
if not self._show_recent_sessions(reason="sessions"):
|
|
_cprint(" (._.) No previous sessions yet.")
|
|
return
|
|
|
|
# /sessions <id_or_title> behaves the same as /resume <id_or_title>.
|
|
self._handle_resume_command(f"/resume {arg}")
|
|
|
|
def _handle_branch_command(self, cmd_original: str) -> None:
|
|
"""Handle /branch [name] — fork the current session into a new independent copy.
|
|
|
|
Copies the full conversation history to a new session so the user can
|
|
explore a different approach without losing the original session state.
|
|
Inspired by Claude Code's /branch command.
|
|
"""
|
|
from cli import _cprint, _sync_process_session_id
|
|
if not self.conversation_history:
|
|
_cprint(" No conversation to branch — send a message first.")
|
|
return
|
|
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
_cprint(f" {format_session_db_unavailable()}")
|
|
return
|
|
|
|
parts = cmd_original.split(None, 1)
|
|
branch_name = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
# Generate the new session ID
|
|
now = datetime.now()
|
|
timestamp_str = now.strftime("%Y%m%d_%H%M%S")
|
|
short_uuid = uuid.uuid4().hex[:6]
|
|
new_session_id = f"{timestamp_str}_{short_uuid}"
|
|
|
|
# Determine branch title
|
|
if branch_name:
|
|
branch_title = branch_name
|
|
else:
|
|
# Auto-generate from the current session title
|
|
current_title = None
|
|
if self._session_db:
|
|
current_title = self._session_db.get_session_title(self.session_id)
|
|
base = current_title or "branch"
|
|
branch_title = self._session_db.get_next_title_in_lineage(base)
|
|
|
|
# Save the current session's state before branching
|
|
parent_session_id = self.session_id
|
|
|
|
# End the old session
|
|
try:
|
|
self._session_db.end_session(self.session_id, "branched")
|
|
except Exception:
|
|
pass
|
|
|
|
# Create the new session with parent link.
|
|
# Persist a stable ``_branched_from`` marker in model_config so
|
|
# list_sessions_rich() can keep the branch visible in /resume and
|
|
# /sessions even after the parent is reopened and re-ended with a
|
|
# different end_reason (e.g. tui_shutdown overwriting 'branched').
|
|
try:
|
|
self._session_db.create_session(
|
|
session_id=new_session_id,
|
|
source=os.environ.get("HERMES_SESSION_SOURCE", "cli"),
|
|
model=self.model,
|
|
model_config={
|
|
"max_iterations": self.max_turns,
|
|
"reasoning_config": self.reasoning_config,
|
|
"_branched_from": parent_session_id,
|
|
},
|
|
parent_session_id=parent_session_id,
|
|
)
|
|
except Exception as e:
|
|
_cprint(f" Failed to create branch session: {e}")
|
|
return
|
|
|
|
# Copy conversation history to the new session
|
|
for msg in self.conversation_history:
|
|
try:
|
|
self._session_db.append_message(
|
|
session_id=new_session_id,
|
|
role=msg.get("role", "user"),
|
|
content=msg.get("content"),
|
|
tool_name=msg.get("tool_name") or msg.get("name"),
|
|
tool_calls=msg.get("tool_calls"),
|
|
tool_call_id=msg.get("tool_call_id"),
|
|
reasoning=msg.get("reasoning"),
|
|
)
|
|
except Exception:
|
|
pass # Best-effort copy
|
|
|
|
# Set title on the branch
|
|
try:
|
|
self._session_db.set_session_title(new_session_id, branch_title)
|
|
except Exception:
|
|
pass
|
|
|
|
# Switch to the new session
|
|
self._transfer_session_yolo(self.session_id, new_session_id)
|
|
self.session_id = new_session_id
|
|
self.session_start = now
|
|
self._pending_title = None
|
|
self._resumed = True # Prevents auto-title generation
|
|
_sync_process_session_id(new_session_id)
|
|
|
|
# Sync the agent
|
|
if self.agent:
|
|
self.agent.session_id = new_session_id
|
|
self.agent.session_start = now
|
|
self.agent.reset_session_state()
|
|
if hasattr(self.agent, "_last_flushed_db_idx"):
|
|
self.agent._last_flushed_db_idx = len(self.conversation_history)
|
|
if hasattr(self.agent, "_todo_store"):
|
|
try:
|
|
from tools.todo_tool import TodoStore
|
|
self.agent._todo_store = TodoStore()
|
|
except Exception:
|
|
pass
|
|
if hasattr(self.agent, "_invalidate_system_prompt"):
|
|
self.agent._invalidate_system_prompt()
|
|
|
|
# Notify memory providers that session_id forked to a new branch.
|
|
# reset=False — the branched session carries the transcript
|
|
# forward, so provider state tracks the lineage. parent_session_id
|
|
# links the branch back to the original. See #6672.
|
|
try:
|
|
_mm = getattr(self.agent, "_memory_manager", None)
|
|
if _mm is not None:
|
|
_mm.on_session_switch(
|
|
new_session_id,
|
|
parent_session_id=parent_session_id or "",
|
|
reset=False,
|
|
reason="branch",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
msg_count = len([m for m in self.conversation_history if m.get("role") == "user"])
|
|
_cprint(
|
|
f" ⑂ Branched session \"{branch_title}\""
|
|
f" ({msg_count} user message{'s' if msg_count != 1 else ''})"
|
|
)
|
|
_cprint(f" Original session: {parent_session_id}")
|
|
_cprint(f" Branch session: {new_session_id}")
|
|
|
|
def _handle_gquota_command(self, cmd_original: str) -> None:
|
|
"""Show Google Gemini Code Assist quota usage for the current OAuth account."""
|
|
try:
|
|
from agent.google_oauth import get_valid_access_token, GoogleOAuthError, load_credentials
|
|
from agent.google_code_assist import retrieve_user_quota, CodeAssistError
|
|
except ImportError as exc:
|
|
self._console_print(f" [red]Gemini modules unavailable: {exc}[/]")
|
|
return
|
|
|
|
try:
|
|
access_token = get_valid_access_token()
|
|
except GoogleOAuthError as exc:
|
|
self._console_print(f" [yellow]{exc}[/]")
|
|
self._console_print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.")
|
|
return
|
|
|
|
creds = load_credentials()
|
|
project_id = (creds.project_id if creds else "") or ""
|
|
|
|
try:
|
|
buckets = retrieve_user_quota(access_token, project_id=project_id)
|
|
except CodeAssistError as exc:
|
|
self._console_print(f" [red]Quota lookup failed:[/] {exc}")
|
|
return
|
|
|
|
if not buckets:
|
|
self._console_print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]")
|
|
return
|
|
|
|
# Sort for stable display, group by model
|
|
buckets.sort(key=lambda b: (b.model_id, b.token_type))
|
|
self._console_print()
|
|
self._console_print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})")
|
|
self._console_print()
|
|
for b in buckets:
|
|
pct = max(0.0, min(1.0, b.remaining_fraction))
|
|
width = 20
|
|
filled = int(round(pct * width))
|
|
bar = "▓" * filled + "░" * (width - filled)
|
|
pct_str = f"{int(pct * 100):3d}%"
|
|
header = b.model_id
|
|
if b.token_type:
|
|
header += f" [{b.token_type}]"
|
|
self._console_print(f" {header:40s} {bar} {pct_str}")
|
|
self._console_print()
|
|
|
|
def _handle_personality_command(self, cmd: str):
|
|
"""Handle the /personality command to set predefined personalities."""
|
|
from cli import save_config_value
|
|
parts = cmd.split(maxsplit=1)
|
|
|
|
if len(parts) > 1:
|
|
# Set personality
|
|
personality_name = parts[1].strip().lower()
|
|
|
|
if personality_name in {"none", "default", "neutral"}:
|
|
self.system_prompt = ""
|
|
self.agent = None # Force re-init
|
|
if save_config_value("agent.system_prompt", ""):
|
|
print("(^_^)b Personality cleared (saved to config)")
|
|
else:
|
|
print("(^_^) Personality cleared (session only)")
|
|
print(" No personality overlay — using base agent behavior.")
|
|
elif personality_name in self.personalities:
|
|
self.system_prompt = self._resolve_personality_prompt(self.personalities[personality_name])
|
|
self.agent = None # Force re-init
|
|
if save_config_value("agent.system_prompt", self.system_prompt):
|
|
print(f"(^_^)b Personality set to '{personality_name}' (saved to config)")
|
|
else:
|
|
print(f"(^_^) Personality set to '{personality_name}' (session only)")
|
|
print(f" \"{self.system_prompt[:60]}{'...' if len(self.system_prompt) > 60 else ''}\"")
|
|
else:
|
|
print(f"(._.) Unknown personality: {personality_name}")
|
|
print(f" Available: none, {', '.join(self.personalities.keys())}")
|
|
else:
|
|
# Show available personalities
|
|
print()
|
|
print("+" + "-" * 50 + "+")
|
|
print("|" + " " * 12 + "(^o^)/ Personalities" + " " * 15 + "|")
|
|
print("+" + "-" * 50 + "+")
|
|
print()
|
|
print(f" {'none':<12} - (no personality overlay)")
|
|
for name, prompt in self.personalities.items():
|
|
if isinstance(prompt, dict):
|
|
preview = prompt.get("description") or prompt.get("system_prompt", "")[:50]
|
|
else:
|
|
preview = str(prompt)[:50]
|
|
print(f" {name:<12} - {preview}")
|
|
print()
|
|
print(" Usage: /personality <name>")
|
|
print()
|
|
|
|
def _handle_cron_command(self, cmd: str):
|
|
"""Handle the /cron command to manage scheduled tasks."""
|
|
from cli import get_job
|
|
import shlex
|
|
from tools.cronjob_tools import cronjob as cronjob_tool
|
|
|
|
def _cron_api(**kwargs):
|
|
return json.loads(cronjob_tool(**kwargs))
|
|
|
|
def _normalize_skills(values):
|
|
normalized = []
|
|
for value in values:
|
|
text = str(value or "").strip()
|
|
if text and text not in normalized:
|
|
normalized.append(text)
|
|
return normalized
|
|
|
|
def _parse_flags(tokens):
|
|
opts = {
|
|
"name": None,
|
|
"deliver": None,
|
|
"repeat": None,
|
|
"skills": [],
|
|
"add_skills": [],
|
|
"remove_skills": [],
|
|
"clear_skills": False,
|
|
"all": False,
|
|
"prompt": None,
|
|
"schedule": None,
|
|
"positionals": [],
|
|
}
|
|
i = 0
|
|
while i < len(tokens):
|
|
token = tokens[i]
|
|
if token == "--name" and i + 1 < len(tokens):
|
|
opts["name"] = tokens[i + 1]
|
|
i += 2
|
|
elif token == "--deliver" and i + 1 < len(tokens):
|
|
opts["deliver"] = tokens[i + 1]
|
|
i += 2
|
|
elif token == "--repeat" and i + 1 < len(tokens):
|
|
try:
|
|
opts["repeat"] = int(tokens[i + 1])
|
|
except ValueError:
|
|
print("(._.) --repeat must be an integer")
|
|
return None
|
|
i += 2
|
|
elif token == "--skill" and i + 1 < len(tokens):
|
|
opts["skills"].append(tokens[i + 1])
|
|
i += 2
|
|
elif token == "--add-skill" and i + 1 < len(tokens):
|
|
opts["add_skills"].append(tokens[i + 1])
|
|
i += 2
|
|
elif token == "--remove-skill" and i + 1 < len(tokens):
|
|
opts["remove_skills"].append(tokens[i + 1])
|
|
i += 2
|
|
elif token == "--clear-skills":
|
|
opts["clear_skills"] = True
|
|
i += 1
|
|
elif token == "--all":
|
|
opts["all"] = True
|
|
i += 1
|
|
elif token == "--prompt" and i + 1 < len(tokens):
|
|
opts["prompt"] = tokens[i + 1]
|
|
i += 2
|
|
elif token == "--schedule" and i + 1 < len(tokens):
|
|
opts["schedule"] = tokens[i + 1]
|
|
i += 2
|
|
else:
|
|
opts["positionals"].append(token)
|
|
i += 1
|
|
return opts
|
|
|
|
tokens = shlex.split(cmd)
|
|
|
|
if len(tokens) == 1:
|
|
print()
|
|
print("+" + "-" * 68 + "+")
|
|
print("|" + " " * 22 + "(^_^) Scheduled Tasks" + " " * 23 + "|")
|
|
print("+" + "-" * 68 + "+")
|
|
print()
|
|
print(" Commands:")
|
|
print(" /cron list")
|
|
print(' /cron add "every 2h" "Check server status" [--skill blogwatcher]')
|
|
print(' /cron edit <job_id> --schedule "every 4h" --prompt "New task"')
|
|
print(" /cron edit <job_id> --skill blogwatcher --skill maps")
|
|
print(" /cron edit <job_id> --remove-skill blogwatcher")
|
|
print(" /cron edit <job_id> --clear-skills")
|
|
print(" /cron pause <job_id>")
|
|
print(" /cron resume <job_id>")
|
|
print(" /cron run <job_id>")
|
|
print(" /cron remove <job_id>")
|
|
print()
|
|
result = _cron_api(action="list")
|
|
jobs = result.get("jobs", []) if result.get("success") else []
|
|
if jobs:
|
|
print(" Current Jobs:")
|
|
print(" " + "-" * 63)
|
|
for job in jobs:
|
|
repeat_str = job.get("repeat", "?")
|
|
print(f" {job['job_id'][:12]:<12} | {job['schedule']:<15} | {repeat_str:<8}")
|
|
if job.get("skills"):
|
|
print(f" Skills: {', '.join(job['skills'])}")
|
|
print(f" {job.get('prompt_preview', '')}")
|
|
if job.get("next_run_at"):
|
|
print(f" Next: {job['next_run_at']}")
|
|
print()
|
|
else:
|
|
print(" No scheduled jobs. Use '/cron add' to create one.")
|
|
print()
|
|
return
|
|
|
|
subcommand = tokens[1].lower()
|
|
opts = _parse_flags(tokens[2:])
|
|
if opts is None:
|
|
return
|
|
|
|
if subcommand == "list":
|
|
result = _cron_api(action="list", include_disabled=opts["all"])
|
|
jobs = result.get("jobs", []) if result.get("success") else []
|
|
if not jobs:
|
|
print("(._.) No scheduled jobs.")
|
|
return
|
|
|
|
print()
|
|
print("Scheduled Jobs:")
|
|
print("-" * 80)
|
|
for job in jobs:
|
|
print(f" ID: {job['job_id']}")
|
|
print(f" Name: {job['name']}")
|
|
print(f" State: {job.get('state', '?')}")
|
|
print(f" Schedule: {job['schedule']} ({job.get('repeat', '?')})")
|
|
print(f" Next run: {job.get('next_run_at', 'N/A')}")
|
|
if job.get("skills"):
|
|
print(f" Skills: {', '.join(job['skills'])}")
|
|
print(f" Prompt: {job.get('prompt_preview', '')}")
|
|
if job.get("last_run_at"):
|
|
print(f" Last run: {job['last_run_at']} ({job.get('last_status', '?')})")
|
|
print()
|
|
return
|
|
|
|
if subcommand in {"add", "create"}:
|
|
positionals = opts["positionals"]
|
|
if not positionals:
|
|
print("(._.) Usage: /cron add <schedule> <prompt>")
|
|
return
|
|
schedule = opts["schedule"] or positionals[0]
|
|
prompt = opts["prompt"] or " ".join(positionals[1:])
|
|
skills = _normalize_skills(opts["skills"])
|
|
if not prompt and not skills:
|
|
print("(._.) Please provide a prompt or at least one skill")
|
|
return
|
|
result = _cron_api(
|
|
action="create",
|
|
schedule=schedule,
|
|
prompt=prompt or None,
|
|
name=opts["name"],
|
|
deliver=opts["deliver"],
|
|
repeat=opts["repeat"],
|
|
skills=skills or None,
|
|
)
|
|
if result.get("success"):
|
|
print(f"(^_^)b Created job: {result['job_id']}")
|
|
print(f" Schedule: {result['schedule']}")
|
|
if result.get("skills"):
|
|
print(f" Skills: {', '.join(result['skills'])}")
|
|
print(f" Next run: {result['next_run_at']}")
|
|
else:
|
|
print(f"(x_x) Failed to create job: {result.get('error')}")
|
|
return
|
|
|
|
if subcommand == "edit":
|
|
positionals = opts["positionals"]
|
|
if not positionals:
|
|
print("(._.) Usage: /cron edit <job_id> [--schedule ...] [--prompt ...] [--skill ...]")
|
|
return
|
|
job_id = positionals[0]
|
|
existing = get_job(job_id)
|
|
if not existing:
|
|
print(f"(._.) Job not found: {job_id}")
|
|
return
|
|
|
|
final_skills = None
|
|
replacement_skills = _normalize_skills(opts["skills"])
|
|
add_skills = _normalize_skills(opts["add_skills"])
|
|
remove_skills = set(_normalize_skills(opts["remove_skills"]))
|
|
existing_skills = list(existing.get("skills") or ([] if not existing.get("skill") else [existing.get("skill")]))
|
|
if opts["clear_skills"]:
|
|
final_skills = []
|
|
elif replacement_skills:
|
|
final_skills = replacement_skills
|
|
elif add_skills or remove_skills:
|
|
final_skills = [skill for skill in existing_skills if skill not in remove_skills]
|
|
for skill in add_skills:
|
|
if skill not in final_skills:
|
|
final_skills.append(skill)
|
|
|
|
result = _cron_api(
|
|
action="update",
|
|
job_id=job_id,
|
|
schedule=opts["schedule"],
|
|
prompt=opts["prompt"],
|
|
name=opts["name"],
|
|
deliver=opts["deliver"],
|
|
repeat=opts["repeat"],
|
|
skills=final_skills,
|
|
)
|
|
if result.get("success"):
|
|
job = result["job"]
|
|
print(f"(^_^)b Updated job: {job['job_id']}")
|
|
print(f" Schedule: {job['schedule']}")
|
|
if job.get("skills"):
|
|
print(f" Skills: {', '.join(job['skills'])}")
|
|
else:
|
|
print(" Skills: none")
|
|
else:
|
|
print(f"(x_x) Failed to update job: {result.get('error')}")
|
|
return
|
|
|
|
if subcommand in {"pause", "resume", "run", "remove", "rm", "delete"}:
|
|
positionals = opts["positionals"]
|
|
if not positionals:
|
|
print(f"(._.) Usage: /cron {subcommand} <job_id>")
|
|
return
|
|
job_id = positionals[0]
|
|
action = "remove" if subcommand in {"remove", "rm", "delete"} else subcommand
|
|
result = _cron_api(action=action, job_id=job_id, reason="paused from /cron" if action == "pause" else None)
|
|
if not result.get("success"):
|
|
print(f"(x_x) Failed to {action} job: {result.get('error')}")
|
|
return
|
|
if action == "pause":
|
|
print(f"(^_^)b Paused job: {result['job']['name']} ({job_id})")
|
|
elif action == "resume":
|
|
print(f"(^_^)b Resumed job: {result['job']['name']} ({job_id})")
|
|
print(f" Next run: {result['job'].get('next_run_at')}")
|
|
elif action == "run":
|
|
print(f"(^_^)b Triggered job: {result['job']['name']} ({job_id})")
|
|
print(" It will run on the next scheduler tick.")
|
|
else:
|
|
removed = result.get("removed_job", {})
|
|
print(f"(^_^)b Removed job: {removed.get('name', job_id)} ({job_id})")
|
|
return
|
|
|
|
print(f"(._.) Unknown cron command: {subcommand}")
|
|
print(" Available: list, add, edit, pause, resume, run, remove")
|
|
|
|
def _handle_curator_command(self, cmd: str):
|
|
"""Handle /curator slash command.
|
|
|
|
Delegates to hermes_cli.curator so the CLI and the `hermes curator`
|
|
subcommand share the same handler set.
|
|
"""
|
|
import shlex
|
|
|
|
tokens = shlex.split(cmd)[1:] if cmd else []
|
|
if not tokens:
|
|
tokens = ["status"]
|
|
|
|
try:
|
|
from hermes_cli.curator import cli_main
|
|
cli_main(tokens)
|
|
except SystemExit:
|
|
# argparse calls sys.exit() on --help or errors; swallow so we
|
|
# don't kill the interactive session.
|
|
pass
|
|
except Exception as exc:
|
|
print(f"(._.) curator: {exc}")
|
|
|
|
def _handle_kanban_command(self, cmd: str):
|
|
"""Handle the /kanban command — delegate to the shared kanban CLI.
|
|
|
|
The string form passed here is the user's full ``/kanban ...``
|
|
including the leading slash; we strip it and hand the remainder
|
|
to ``kanban.run_slash`` which returns a single formatted string.
|
|
"""
|
|
from hermes_cli.kanban import run_slash
|
|
|
|
rest = cmd.strip()
|
|
if rest.startswith("/"):
|
|
rest = rest.lstrip("/")
|
|
if rest.startswith("kanban"):
|
|
rest = rest[len("kanban"):].lstrip()
|
|
try:
|
|
output = run_slash(rest)
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
output = f"(._.) kanban error: {exc}"
|
|
if output:
|
|
print(output)
|
|
|
|
def _handle_skills_command(self, cmd: str):
|
|
"""Handle /skills slash command — delegates to hermes_cli.skills_hub."""
|
|
from cli import ChatConsole
|
|
from hermes_cli.skills_hub import handle_skills_slash
|
|
handle_skills_slash(cmd, ChatConsole())
|
|
|
|
def _handle_background_command(self, cmd: str):
|
|
"""Handle /background <prompt> — run a prompt in a separate background session.
|
|
|
|
Spawns a new AIAgent in a background thread with its own session.
|
|
When it completes, prints the result to the CLI without modifying
|
|
the active session's conversation history.
|
|
"""
|
|
from cli import AIAgent, ChatConsole, _accent_hex, _cprint, _maybe_remap_for_light_mode, _render_final_assistant_content, set_approval_callback, set_secret_capture_callback, set_sudo_password_callback
|
|
parts = cmd.strip().split(maxsplit=1)
|
|
if len(parts) < 2 or not parts[1].strip():
|
|
_cprint(" Usage: /background <prompt>")
|
|
_cprint(" Example: /background Summarize the top HN stories today")
|
|
_cprint(" The task runs in a separate session and results display here when done.")
|
|
return
|
|
|
|
prompt = parts[1].strip()
|
|
self._background_task_counter += 1
|
|
task_num = self._background_task_counter
|
|
task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
|
|
|
# Make sure we have valid credentials
|
|
if not self._ensure_runtime_credentials():
|
|
_cprint(" (>_<) Cannot start background task: no valid credentials.")
|
|
return
|
|
|
|
_cprint(f" 🔄 Background task #{task_num} started: \"{prompt[:60]}{'...' if len(prompt) > 60 else ''}\"")
|
|
_cprint(f" Task ID: {task_id}")
|
|
_cprint(" You can continue chatting — results will appear when done.\n")
|
|
|
|
turn_route = self._resolve_turn_agent_config(prompt)
|
|
|
|
def run_background():
|
|
set_sudo_password_callback(self._sudo_password_callback)
|
|
set_approval_callback(self._approval_callback)
|
|
try:
|
|
set_secret_capture_callback(self._secret_capture_callback)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
bg_agent = AIAgent(
|
|
model=turn_route["model"],
|
|
api_key=turn_route["runtime"].get("api_key"),
|
|
base_url=turn_route["runtime"].get("base_url"),
|
|
provider=turn_route["runtime"].get("provider"),
|
|
api_mode=turn_route["runtime"].get("api_mode"),
|
|
acp_command=turn_route["runtime"].get("command"),
|
|
acp_args=turn_route["runtime"].get("args"),
|
|
max_tokens=turn_route["runtime"].get("max_tokens"),
|
|
max_iterations=self.max_turns,
|
|
enabled_toolsets=self.enabled_toolsets,
|
|
quiet_mode=True,
|
|
verbose_logging=False,
|
|
session_id=task_id,
|
|
platform="cli",
|
|
session_db=self._session_db,
|
|
reasoning_config=self.reasoning_config,
|
|
service_tier=self.service_tier,
|
|
request_overrides=turn_route.get("request_overrides"),
|
|
providers_allowed=self._providers_only,
|
|
providers_ignored=self._providers_ignore,
|
|
providers_order=self._providers_order,
|
|
provider_sort=self._provider_sort,
|
|
provider_require_parameters=self._provider_require_params,
|
|
provider_data_collection=self._provider_data_collection,
|
|
openrouter_min_coding_score=self._openrouter_min_coding_score,
|
|
fallback_model=self._fallback_model,
|
|
)
|
|
# Silence raw spinner; route thinking through TUI widget when no foreground agent is active.
|
|
bg_agent._print_fn = lambda *_a, **_kw: None
|
|
|
|
def _bg_thinking(text: str) -> None:
|
|
# Concurrent bg tasks may race on _spinner_text; acceptable for best-effort UI.
|
|
if not self._agent_running:
|
|
self._spinner_text = text
|
|
if self._app:
|
|
self._app.invalidate()
|
|
|
|
bg_agent.thinking_callback = _bg_thinking
|
|
|
|
result = bg_agent.run_conversation(
|
|
user_message=prompt,
|
|
task_id=task_id,
|
|
)
|
|
|
|
response = result.get("final_response", "") if result else ""
|
|
if not response and result and result.get("error"):
|
|
response = f"Error: {result['error']}"
|
|
|
|
# Display result in the CLI (thread-safe via patch_stdout).
|
|
# Force a TUI refresh first so spinner/status bar don't overlap
|
|
# with the output (fixes #2718).
|
|
if self._app:
|
|
self._app.invalidate()
|
|
time.sleep(0.05) # brief pause for refresh
|
|
print()
|
|
ChatConsole().print(f"[{_accent_hex()}]{'─' * 40}[/]")
|
|
_cprint(f" ✅ Background task #{task_num} complete")
|
|
_cprint(f" Prompt: \"{prompt[:60]}{'...' if len(prompt) > 60 else ''}\"")
|
|
ChatConsole().print(f"[{_accent_hex()}]{'─' * 40}[/]")
|
|
if response:
|
|
try:
|
|
from hermes_cli.skin_engine import get_active_skin
|
|
_skin = get_active_skin()
|
|
label = _skin.get_branding("response_label", "⚕ Hermes")
|
|
_resp_color = _maybe_remap_for_light_mode(_skin.get_color("response_border", "#CD7F32"))
|
|
_resp_text = _maybe_remap_for_light_mode(_skin.get_color("banner_text", "#FFF8DC"))
|
|
except Exception:
|
|
label = "⚕ Hermes"
|
|
_resp_color = "#CD7F32"
|
|
_resp_text = "#FFF8DC"
|
|
|
|
_chat_console = ChatConsole()
|
|
_chat_console.print(Panel(
|
|
_render_final_assistant_content(response, mode=self.final_response_markdown),
|
|
title=f"[{_resp_color} bold]{label} (background #{task_num})[/]",
|
|
title_align="left",
|
|
border_style=_resp_color,
|
|
style=_resp_text,
|
|
box=rich_box.HORIZONTALS,
|
|
padding=(1, 4),
|
|
width=self._scrollback_box_width(),
|
|
))
|
|
else:
|
|
_cprint(" (No response generated)")
|
|
|
|
# Play bell if enabled
|
|
if self.bell_on_complete:
|
|
sys.stdout.write("\a")
|
|
sys.stdout.flush()
|
|
|
|
except Exception as e:
|
|
# Same TUI refresh pattern as success path (#2718)
|
|
if self._app:
|
|
self._app.invalidate()
|
|
time.sleep(0.05)
|
|
print()
|
|
_cprint(f" ❌ Background task #{task_num} failed: {e}")
|
|
finally:
|
|
try:
|
|
set_sudo_password_callback(None)
|
|
set_approval_callback(None)
|
|
set_secret_capture_callback(None)
|
|
except Exception:
|
|
pass
|
|
self._background_tasks.pop(task_id, None)
|
|
# Clear spinner only if no foreground agent owns it
|
|
if not self._agent_running:
|
|
self._spinner_text = ""
|
|
if self._app:
|
|
self._invalidate(min_interval=0)
|
|
|
|
thread = threading.Thread(target=run_background, daemon=True, name=f"bg-task-{task_id}")
|
|
self._background_tasks[task_id] = thread
|
|
thread.start()
|
|
|
|
def _handle_bundles_command(self, cmd: str) -> None:
|
|
"""In-session ``/bundles`` — show installed skill bundles.
|
|
|
|
Mirrors ``hermes bundles list`` but renders inside the running
|
|
CLI so users can discover what's available without dropping out
|
|
of their session. Bundles are loaded via ``/<bundle-name>``.
|
|
"""
|
|
from cli import ChatConsole, _BOLD, _DIM, _RST, _accent_hex, _cprint
|
|
try:
|
|
from agent.skill_bundles import list_bundles, _bundles_dir
|
|
except Exception as exc:
|
|
_cprint(f"\033[1;31mBundle subsystem unavailable: {exc}{_RST}")
|
|
return
|
|
|
|
bundles = list_bundles()
|
|
if not bundles:
|
|
_cprint(" No skill bundles installed.")
|
|
_cprint(
|
|
f" {_DIM}Create one with: hermes bundles create "
|
|
f"<name> --skill <s1> --skill <s2>{_RST}"
|
|
)
|
|
_cprint(f" {_DIM}Directory: {_bundles_dir()}{_RST}")
|
|
return
|
|
|
|
_cprint(f"\n ▣ {_BOLD}Skill Bundles{_RST} ({len(bundles)} installed):")
|
|
for info in bundles:
|
|
skill_count = len(info.get("skills", []))
|
|
desc = info.get("description") or f"Load {skill_count} skills"
|
|
ChatConsole().print(
|
|
f" [bold {_accent_hex()}]/{info['slug']:<20}[/] "
|
|
f"[dim]-[/] {_escape(desc)} [dim]({skill_count} skills)[/]"
|
|
)
|
|
for s in info.get("skills", []):
|
|
ChatConsole().print(f" [dim]· {_escape(s)}[/]")
|
|
_cprint(
|
|
f"\n {_DIM}Invoke a bundle with /<slug>. "
|
|
f"Manage with `hermes bundles`.{_RST}"
|
|
)
|
|
|
|
def _handle_browser_command(self, cmd: str):
|
|
"""Handle /browser connect|disconnect|status — manage live Chromium-family CDP connection."""
|
|
import platform as _plat
|
|
|
|
parts = cmd.strip().split(None, 1)
|
|
sub = parts[1].lower().strip() if len(parts) > 1 else "status"
|
|
|
|
_DEFAULT_CDP = DEFAULT_BROWSER_CDP_URL
|
|
current = os.environ.get("BROWSER_CDP_URL", "").strip()
|
|
|
|
if sub.startswith("connect"):
|
|
# Optionally accept a custom CDP URL: /browser connect ws://host:port
|
|
connect_parts = cmd.strip().split(None, 2) # ["/browser", "connect", "ws://..."]
|
|
cdp_url = connect_parts[2].strip() if len(connect_parts) > 2 else _DEFAULT_CDP
|
|
parsed_cdp = urlparse(cdp_url if "://" in cdp_url else f"http://{cdp_url}")
|
|
if parsed_cdp.scheme not in {"http", "https", "ws", "wss"}:
|
|
print()
|
|
print(
|
|
f" ⚠ Unsupported browser url scheme: {parsed_cdp.scheme or '(missing)'} "
|
|
"(expected one of: http, https, ws, wss)"
|
|
)
|
|
print()
|
|
return
|
|
try:
|
|
_port = parsed_cdp.port or (443 if parsed_cdp.scheme in {"https", "wss"} else 80)
|
|
except ValueError:
|
|
print()
|
|
print(f" ⚠ Invalid port in browser url: {cdp_url}")
|
|
print()
|
|
return
|
|
if not parsed_cdp.hostname:
|
|
print()
|
|
print(f" ⚠ Missing host in browser url: {cdp_url}")
|
|
print()
|
|
return
|
|
_host = parsed_cdp.hostname
|
|
if parsed_cdp.path.startswith("/devtools/browser/"):
|
|
cdp_url = parsed_cdp.geturl()
|
|
else:
|
|
cdp_url = parsed_cdp._replace(
|
|
path="",
|
|
params="",
|
|
query="",
|
|
fragment="",
|
|
).geturl()
|
|
|
|
# Clear any existing browser sessions so the next tool call uses the new backend
|
|
try:
|
|
from tools.browser_tool import cleanup_all_browsers
|
|
cleanup_all_browsers()
|
|
except Exception:
|
|
pass
|
|
|
|
print()
|
|
|
|
# Check if a Chromium-family browser is already serving CDP on the debug port
|
|
_already_open = is_browser_debug_ready(cdp_url, timeout=1.0)
|
|
|
|
if _already_open:
|
|
print(f" ✓ Chromium-family browser is already listening on port {_port}")
|
|
elif cdp_url == _DEFAULT_CDP:
|
|
# Try to auto-launch a Chromium-family browser with remote debugging
|
|
print(" Chromium-family browser isn't running with remote debugging — attempting to launch...")
|
|
_launched = self._try_launch_chrome_debug(_port, _plat.system())
|
|
if _launched:
|
|
# Wait for the DevTools discovery endpoint to come up
|
|
for _wait in range(10):
|
|
if is_browser_debug_ready(cdp_url, timeout=1.0):
|
|
_already_open = True
|
|
break
|
|
time.sleep(0.5)
|
|
if _already_open:
|
|
print(f" ✓ Chromium-family browser launched and listening on port {_port}")
|
|
else:
|
|
print(f" ⚠ Browser launched but port {_port} isn't responding yet")
|
|
print(" Try again in a few seconds — the debug instance may still be starting")
|
|
else:
|
|
print(" ⚠ Could not auto-launch a Chromium-family browser")
|
|
sys_name = _plat.system()
|
|
chrome_cmd = manual_chrome_debug_command(_port, sys_name)
|
|
if chrome_cmd:
|
|
print(f" Launch a Chromium-family browser manually:")
|
|
print(f" {chrome_cmd}")
|
|
else:
|
|
print(" No supported Chromium-family browser executable found in this environment")
|
|
else:
|
|
print(f" ⚠ Port {_port} is not reachable at {cdp_url}")
|
|
|
|
if not _already_open:
|
|
print()
|
|
print("Browser not connected — start a Chromium-family browser with remote debugging and retry /browser connect")
|
|
print()
|
|
return
|
|
|
|
os.environ["BROWSER_CDP_URL"] = cdp_url
|
|
# Eagerly start the CDP supervisor so pending_dialogs + frame_tree
|
|
# show up in the next browser_snapshot. No-op if already started.
|
|
try:
|
|
from tools.browser_tool import _ensure_cdp_supervisor # type: ignore[import-not-found]
|
|
_ensure_cdp_supervisor("default")
|
|
except Exception:
|
|
pass
|
|
print()
|
|
print("🌐 Browser connected to live Chromium-family browser via CDP")
|
|
print(f" Endpoint: {cdp_url}")
|
|
print()
|
|
|
|
# Inject context message so the model knows this slash command
|
|
# intentionally makes the dev/debug CDP browser available for use.
|
|
if hasattr(self, '_pending_input'):
|
|
self._pending_input.put(
|
|
"[System note: The user invoked /browser connect and connected your browser tools to "
|
|
"a Chromium-family dev/debug browser via Chrome DevTools Protocol. "
|
|
"Your browser_navigate, browser_snapshot, browser_click, and other browser tools now "
|
|
"control that CDP browser. The command itself is a signal that using browser tools for "
|
|
"their current browser-related request is expected; do not wait for separate permission "
|
|
"just because CDP is connected. This is typically a Hermes-managed isolated debug "
|
|
"profile, not the user's main everyday browser. It is still user-visible and may contain "
|
|
"pages, logged-in sessions, or cookies in that debug profile, so avoid destructive actions, "
|
|
"closing tabs, or navigating away unless the user's task calls for it.]"
|
|
)
|
|
|
|
elif sub == "disconnect":
|
|
if current:
|
|
os.environ.pop("BROWSER_CDP_URL", None)
|
|
try:
|
|
from tools.browser_tool import cleanup_all_browsers, _stop_cdp_supervisor
|
|
_stop_cdp_supervisor("default")
|
|
cleanup_all_browsers()
|
|
except Exception:
|
|
pass
|
|
print()
|
|
print("🌐 Browser disconnected from live Chromium-family browser")
|
|
print(" Browser tools reverted to default mode (local headless or cloud provider)")
|
|
print()
|
|
|
|
if hasattr(self, '_pending_input'):
|
|
self._pending_input.put(
|
|
"[System note: The user has disconnected the browser tools from their live Chromium-family browser. "
|
|
"Browser tools are back to default mode (headless local browser or cloud provider).]"
|
|
)
|
|
else:
|
|
print()
|
|
print("Browser is not connected to a live Chromium-family browser (already using default mode)")
|
|
print()
|
|
|
|
elif sub == "status":
|
|
print()
|
|
if current:
|
|
print("🌐 Browser: connected to live Chromium-family browser via CDP")
|
|
print(f" Endpoint: {current}")
|
|
|
|
_port = 9222
|
|
try:
|
|
_port = int(current.rsplit(":", 1)[-1].split("/")[0])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
try:
|
|
import socket
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
s.settimeout(1)
|
|
s.connect(("127.0.0.1", _port))
|
|
s.close()
|
|
print(" Status: ✓ reachable")
|
|
except (OSError, Exception):
|
|
print(" Status: ⚠ not reachable (browser may not be running)")
|
|
else:
|
|
try:
|
|
from tools.browser_tool import _get_cloud_provider
|
|
provider = _get_cloud_provider()
|
|
except Exception:
|
|
provider = None
|
|
|
|
if provider is not None:
|
|
print(f"🌐 Browser: {provider.provider_name()} (cloud)")
|
|
else:
|
|
# Show engine info for local mode
|
|
try:
|
|
from tools.browser_tool import _get_browser_engine
|
|
engine = _get_browser_engine()
|
|
except Exception:
|
|
engine = "auto"
|
|
if engine == "lightpanda":
|
|
print("🌐 Browser: local Lightpanda (agent-browser --engine lightpanda)")
|
|
print(" ⚡ Lightpanda: faster navigation, no screenshot support")
|
|
print(" Automatic Chromium fallback for screenshots and failed commands")
|
|
elif engine == "chrome":
|
|
print("🌐 Browser: local headless Chromium (agent-browser --engine chrome)")
|
|
else:
|
|
print("🌐 Browser: local headless Chromium (agent-browser)")
|
|
print()
|
|
print(" /browser connect — connect to your live Chromium-family browser")
|
|
print(" /browser disconnect — revert to default")
|
|
print()
|
|
|
|
else:
|
|
print()
|
|
print("Usage: /browser connect|disconnect|status")
|
|
print()
|
|
print(" connect Connect browser tools to your live Chromium-family browser session")
|
|
print(" disconnect Revert to default browser backend")
|
|
print(" status Show current browser mode")
|
|
print()
|
|
|
|
def _handle_goal_command(self, cmd: str) -> None:
|
|
"""Dispatch /goal subcommands: set / status / pause / resume / clear."""
|
|
from cli import _DIM, _RST, _cprint
|
|
parts = (cmd or "").strip().split(None, 1)
|
|
arg = parts[1].strip() if len(parts) > 1 else ""
|
|
|
|
mgr = self._get_goal_manager()
|
|
if mgr is None:
|
|
_cprint(f" {_DIM}Goals unavailable (no active session).{_RST}")
|
|
return
|
|
|
|
lower = arg.lower()
|
|
|
|
# Bare /goal or /goal status → show current state
|
|
if not arg or lower == "status":
|
|
_cprint(f" {mgr.status_line()}")
|
|
return
|
|
|
|
if lower == "pause":
|
|
state = mgr.pause(reason="user-paused")
|
|
if state is None:
|
|
_cprint(f" {_DIM}No goal set.{_RST}")
|
|
else:
|
|
_cprint(f" ⏸ Goal paused: {state.goal}")
|
|
return
|
|
|
|
if lower == "resume":
|
|
state = mgr.resume()
|
|
if state is None:
|
|
_cprint(f" {_DIM}No goal to resume.{_RST}")
|
|
else:
|
|
_cprint(f" ▶ Goal resumed: {state.goal}")
|
|
_cprint(
|
|
f" {_DIM}Send any message (or press Enter on an empty prompt "
|
|
f"is a no-op; type 'continue' to kick it off).{_RST}"
|
|
)
|
|
return
|
|
|
|
if lower in {"clear", "stop", "done"}:
|
|
had = mgr.has_goal()
|
|
mgr.clear()
|
|
if had:
|
|
_cprint(" ✓ Goal cleared.")
|
|
else:
|
|
_cprint(f" {_DIM}No active goal.{_RST}")
|
|
return
|
|
|
|
# Otherwise treat the arg as the goal text.
|
|
try:
|
|
state = mgr.set(arg)
|
|
except ValueError as exc:
|
|
_cprint(f" Invalid goal: {exc}")
|
|
return
|
|
|
|
_cprint(f" ⊙ Goal set ({state.max_turns}-turn budget): {state.goal}")
|
|
_cprint(
|
|
f" {_DIM}After each turn, a judge model will check if the goal is done. "
|
|
f"Hermes keeps working until it is, you pause/clear it, or the budget is "
|
|
f"exhausted. Use /goal status, /goal pause, /goal resume, /goal clear.{_RST}"
|
|
)
|
|
# Kick the loop off immediately so the user doesn't have to send a
|
|
# separate message after setting the goal.
|
|
try:
|
|
self._pending_input.put(state.goal)
|
|
except Exception:
|
|
pass
|
|
|
|
def _handle_subgoal_command(self, cmd: str) -> None:
|
|
"""Dispatch /subgoal subcommands.
|
|
|
|
Forms:
|
|
/subgoal show current subgoals
|
|
/subgoal <text> append a criterion
|
|
/subgoal remove <n> drop subgoal n (1-based)
|
|
/subgoal clear wipe all subgoals
|
|
|
|
Subgoals are extra criteria the user adds mid-loop. They get
|
|
appended to both the judge prompt (verdict must consider them)
|
|
and the continuation prompt (agent sees them) on the next turn
|
|
boundary. No special kick — the running turn finishes, the next
|
|
judge call includes them.
|
|
"""
|
|
from cli import _DIM, _RST, _cprint
|
|
parts = (cmd or "").strip().split(None, 2)
|
|
arg = " ".join(parts[1:]).strip() if len(parts) > 1 else ""
|
|
|
|
mgr = self._get_goal_manager()
|
|
if mgr is None:
|
|
_cprint(f" {_DIM}Goals unavailable (no active session).{_RST}")
|
|
return
|
|
|
|
if not mgr.has_goal():
|
|
_cprint(f" {_DIM}No active goal. Set one with /goal <text>.{_RST}")
|
|
return
|
|
|
|
# No args → list current subgoals.
|
|
if not arg:
|
|
_cprint(f" {mgr.status_line()}")
|
|
_cprint(f" {mgr.render_subgoals()}")
|
|
return
|
|
|
|
tokens = arg.split(None, 1)
|
|
verb = tokens[0].lower()
|
|
rest = tokens[1].strip() if len(tokens) > 1 else ""
|
|
|
|
if verb == "remove":
|
|
if not rest:
|
|
_cprint(" Usage: /subgoal remove <n>")
|
|
return
|
|
try:
|
|
idx = int(rest.split()[0])
|
|
except ValueError:
|
|
_cprint(" /subgoal remove: <n> must be an integer (1-based index).")
|
|
return
|
|
try:
|
|
removed = mgr.remove_subgoal(idx)
|
|
except (IndexError, RuntimeError) as exc:
|
|
_cprint(f" /subgoal remove: {exc}")
|
|
return
|
|
_cprint(f" ✓ Removed subgoal {idx}: {removed}")
|
|
return
|
|
|
|
if verb == "clear":
|
|
try:
|
|
prev = mgr.clear_subgoals()
|
|
except RuntimeError as exc:
|
|
_cprint(f" /subgoal clear: {exc}")
|
|
return
|
|
if prev:
|
|
_cprint(f" ✓ Cleared {prev} subgoal{'s' if prev != 1 else ''}.")
|
|
else:
|
|
_cprint(f" {_DIM}No subgoals to clear.{_RST}")
|
|
return
|
|
|
|
# Otherwise — append the whole arg as a new subgoal.
|
|
try:
|
|
text = mgr.add_subgoal(arg)
|
|
except (ValueError, RuntimeError) as exc:
|
|
_cprint(f" /subgoal: {exc}")
|
|
return
|
|
idx = len(mgr.state.subgoals) if mgr.state else 0
|
|
_cprint(f" ✓ Added subgoal {idx}: {text}")
|
|
|
|
def _handle_skin_command(self, cmd: str):
|
|
"""Handle /skin [name] — show or change the display skin."""
|
|
from cli import _ACCENT, save_config_value
|
|
try:
|
|
from hermes_cli.skin_engine import list_skins, set_active_skin, get_active_skin_name
|
|
except ImportError:
|
|
print("Skin engine not available.")
|
|
return
|
|
|
|
parts = cmd.strip().split(maxsplit=1)
|
|
if len(parts) < 2 or not parts[1].strip():
|
|
# Show current skin and list available
|
|
current = get_active_skin_name()
|
|
skins = list_skins()
|
|
print(f"\n Current skin: {current}")
|
|
print(" Available skins:")
|
|
for s in skins:
|
|
marker = " ●" if s["name"] == current else " "
|
|
source = f" ({s['source']})" if s["source"] == "user" else ""
|
|
print(f" {marker} {s['name']}{source} — {s['description']}")
|
|
print("\n Usage: /skin <name>")
|
|
print(f" Custom skins: drop a YAML file in {display_hermes_home()}/skins/\n")
|
|
return
|
|
|
|
new_skin = parts[1].strip().lower()
|
|
available = {s["name"] for s in list_skins()}
|
|
if new_skin not in available:
|
|
print(f" Unknown skin: {new_skin}")
|
|
print(f" Available: {', '.join(sorted(available))}")
|
|
return
|
|
|
|
set_active_skin(new_skin)
|
|
_ACCENT.reset() # Re-resolve ANSI color for the new skin
|
|
# _DIM is now a fixed dim+italic ANSI escape (terminal-default fg)
|
|
# so it doesn't need re-resolving on skin switch.
|
|
if save_config_value("display.skin", new_skin):
|
|
print(f" Skin set to: {new_skin} (saved)")
|
|
else:
|
|
print(f" Skin set to: {new_skin}")
|
|
print(" Note: banner colors will update on next session start.")
|
|
if self._apply_tui_skin_style():
|
|
print(" Prompt + TUI colors updated.")
|
|
|
|
def _handle_footer_command(self, cmd_original: str) -> None:
|
|
"""Toggle or inspect ``display.runtime_footer.enabled`` from the CLI.
|
|
|
|
Usage:
|
|
/footer → toggle
|
|
/footer on|off → explicit
|
|
/footer status → show current state
|
|
"""
|
|
from cli import _cprint, save_config_value
|
|
from hermes_cli.config import load_config
|
|
from hermes_cli.colors import Colors as _Colors
|
|
|
|
# Parse arg
|
|
arg = ""
|
|
try:
|
|
parts = (cmd_original or "").strip().split(None, 1)
|
|
if len(parts) > 1:
|
|
arg = parts[1].strip().lower()
|
|
except Exception:
|
|
arg = ""
|
|
|
|
cfg = load_config() or {}
|
|
footer_cfg = ((cfg.get("display") or {}).get("runtime_footer") or {})
|
|
current = bool(footer_cfg.get("enabled", False))
|
|
fields = footer_cfg.get("fields") or ["model", "context_pct", "cwd"]
|
|
|
|
if arg in {"status", "?"}:
|
|
state = "ON" if current else "OFF"
|
|
_cprint(
|
|
f" {_Colors.BOLD}Runtime footer:{_Colors.RESET} {state}\n"
|
|
f" Fields: {', '.join(fields)}"
|
|
)
|
|
return
|
|
|
|
if arg in {"on", "enable", "true", "1"}:
|
|
new_state = True
|
|
elif arg in {"off", "disable", "false", "0"}:
|
|
new_state = False
|
|
elif arg == "":
|
|
new_state = not current
|
|
else:
|
|
_cprint(" Usage: /footer [on|off|status]")
|
|
return
|
|
|
|
if save_config_value("display.runtime_footer.enabled", new_state):
|
|
state = (
|
|
f"{_Colors.GREEN}ON{_Colors.RESET}" if new_state
|
|
else f"{_Colors.DIM}OFF{_Colors.RESET}"
|
|
)
|
|
_cprint(f" Runtime footer: {state}")
|
|
else:
|
|
_cprint(" Failed to save runtime_footer setting to config.yaml")
|
|
|
|
def _handle_reasoning_command(self, cmd: str):
|
|
"""Handle /reasoning — manage effort level and display toggle.
|
|
|
|
Usage:
|
|
/reasoning Show current effort level and display state
|
|
/reasoning <level> Set reasoning effort (none, minimal, low, medium, high, xhigh)
|
|
/reasoning show|on Show model thinking/reasoning in output
|
|
/reasoning hide|off Hide model thinking/reasoning from output
|
|
"""
|
|
from cli import _ACCENT, _DIM, _RST, _cprint, _parse_reasoning_config, save_config_value
|
|
parts = cmd.strip().split(maxsplit=1)
|
|
|
|
if len(parts) < 2:
|
|
# Show current state
|
|
rc = self.reasoning_config
|
|
if rc is None:
|
|
level = "medium (default)"
|
|
elif rc.get("enabled") is False:
|
|
level = "none (disabled)"
|
|
else:
|
|
level = rc.get("effort", "medium")
|
|
display_state = "on ✓" if self.show_reasoning else "off"
|
|
_cprint(f" {_ACCENT}Reasoning effort: {level}{_RST}")
|
|
_cprint(f" {_ACCENT}Reasoning display: {display_state}{_RST}")
|
|
_cprint(f" {_DIM}Usage: /reasoning <none|minimal|low|medium|high|xhigh|show|hide>{_RST}")
|
|
return
|
|
|
|
arg = parts[1].strip().lower()
|
|
|
|
# Display toggle
|
|
if arg in {"show", "on"}:
|
|
self.show_reasoning = True
|
|
if self.agent:
|
|
self.agent.reasoning_callback = self._current_reasoning_callback()
|
|
save_config_value("display.show_reasoning", True)
|
|
_cprint(f" {_ACCENT}✓ Reasoning display: ON (saved){_RST}")
|
|
_cprint(f" {_DIM} Model thinking will be shown during and after each response.{_RST}")
|
|
return
|
|
if arg in {"hide", "off"}:
|
|
self.show_reasoning = False
|
|
if self.agent:
|
|
self.agent.reasoning_callback = self._current_reasoning_callback()
|
|
save_config_value("display.show_reasoning", False)
|
|
_cprint(f" {_ACCENT}✓ Reasoning display: OFF (saved){_RST}")
|
|
return
|
|
|
|
# Effort level change
|
|
parsed = _parse_reasoning_config(arg)
|
|
if parsed is None:
|
|
_cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}")
|
|
_cprint(f" {_DIM}Valid levels: none, minimal, low, medium, high, xhigh{_RST}")
|
|
_cprint(f" {_DIM}Display: show, hide{_RST}")
|
|
return
|
|
|
|
self.reasoning_config = parsed
|
|
self.agent = None # Force agent re-init with new reasoning config
|
|
|
|
if save_config_value("agent.reasoning_effort", arg):
|
|
_cprint(f" {_ACCENT}✓ Reasoning effort set to '{arg}' (saved to config){_RST}")
|
|
else:
|
|
_cprint(f" {_ACCENT}✓ Reasoning effort set to '{arg}' (session only){_RST}")
|
|
|
|
def _handle_busy_command(self, cmd: str):
|
|
"""Handle /busy — control what Enter does while Hermes is working.
|
|
|
|
Usage:
|
|
/busy Show current busy input mode
|
|
/busy status Show current busy input mode
|
|
/busy queue Queue input for the next turn instead of interrupting
|
|
/busy steer Inject Enter mid-run via /steer (after next tool call)
|
|
/busy interrupt Interrupt the current run on Enter (default)
|
|
"""
|
|
from cli import _ACCENT, _DIM, _RST, _cprint, save_config_value
|
|
parts = cmd.strip().split(maxsplit=1)
|
|
if len(parts) < 2 or parts[1].strip().lower() == "status":
|
|
_cprint(f" {_ACCENT}Busy input mode: {self.busy_input_mode}{_RST}")
|
|
if self.busy_input_mode == "queue":
|
|
_behavior = "queues for next turn"
|
|
elif self.busy_input_mode == "steer":
|
|
_behavior = "steers into current run (after next tool call)"
|
|
else:
|
|
_behavior = "interrupts current run"
|
|
_cprint(f" {_DIM}Enter while busy: {_behavior}{_RST}")
|
|
_cprint(f" {_DIM}Usage: /busy [queue|steer|interrupt|status]{_RST}")
|
|
return
|
|
|
|
arg = parts[1].strip().lower()
|
|
if arg not in {"queue", "interrupt", "steer"}:
|
|
_cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}")
|
|
_cprint(f" {_DIM}Usage: /busy [queue|steer|interrupt|status]{_RST}")
|
|
return
|
|
|
|
self.busy_input_mode = arg
|
|
if save_config_value("display.busy_input_mode", arg):
|
|
if arg == "queue":
|
|
behavior = "Enter will queue follow-up input while Hermes is busy."
|
|
elif arg == "steer":
|
|
behavior = "Enter will steer your message into the current run (after the next tool call)."
|
|
else:
|
|
behavior = "Enter will interrupt the current run while Hermes is busy."
|
|
_cprint(f" {_ACCENT}✓ Busy input mode set to '{arg}' (saved to config){_RST}")
|
|
_cprint(f" {_DIM}{behavior}{_RST}")
|
|
else:
|
|
_cprint(f" {_ACCENT}✓ Busy input mode set to '{arg}' (session only){_RST}")
|
|
|
|
def _handle_fast_command(self, cmd: str):
|
|
"""Handle /fast — toggle fast mode (OpenAI Priority Processing / Anthropic Fast Mode)."""
|
|
from cli import _ACCENT, _DIM, _RST, _cprint, save_config_value
|
|
if not self._fast_command_available():
|
|
_cprint(" (._.) /fast is only available for models that support fast mode (OpenAI Priority Processing or Anthropic Fast Mode).")
|
|
return
|
|
|
|
# Determine the branding for the current model
|
|
try:
|
|
from hermes_cli.models import _is_anthropic_fast_model
|
|
agent = getattr(self, "agent", None)
|
|
model = getattr(agent, "model", None) or getattr(self, "model", None)
|
|
feature_name = "Anthropic Fast Mode" if _is_anthropic_fast_model(model) else "Priority Processing"
|
|
except Exception:
|
|
feature_name = "Fast mode"
|
|
|
|
parts = cmd.strip().split(maxsplit=1)
|
|
if len(parts) < 2 or parts[1].strip().lower() == "status":
|
|
status = "fast" if self.service_tier == "priority" else "normal"
|
|
_cprint(f" {_ACCENT}{feature_name}: {status}{_RST}")
|
|
_cprint(f" {_DIM}Usage: /fast [normal|fast|status]{_RST}")
|
|
return
|
|
|
|
arg = parts[1].strip().lower()
|
|
|
|
if arg in {"fast", "on"}:
|
|
self.service_tier = "priority"
|
|
saved_value = "fast"
|
|
label = "FAST"
|
|
elif arg in {"normal", "off"}:
|
|
self.service_tier = None
|
|
saved_value = "normal"
|
|
label = "NORMAL"
|
|
else:
|
|
_cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}")
|
|
_cprint(f" {_DIM}Usage: /fast [normal|fast|status]{_RST}")
|
|
return
|
|
|
|
self.agent = None # Force agent re-init with new service-tier config
|
|
if save_config_value("agent.service_tier", saved_value):
|
|
_cprint(f" {_ACCENT}✓ {feature_name} set to {label} (saved to config){_RST}")
|
|
else:
|
|
_cprint(f" {_ACCENT}✓ {feature_name} set to {label} (session only){_RST}")
|
|
|
|
def _handle_debug_command(self):
|
|
"""Handle /debug — upload debug report + logs and print paste URLs."""
|
|
from hermes_cli.debug import run_debug_share
|
|
from types import SimpleNamespace
|
|
|
|
args = SimpleNamespace(lines=200, expire=7, local=False)
|
|
run_debug_share(args)
|
|
|
|
def _handle_update_command(self) -> bool:
|
|
"""Handle /update — update Hermes Agent to the latest version.
|
|
|
|
In the classic CLI this exits the session and relaunches as
|
|
``hermes update`` so the user sees update output directly and gets
|
|
the new version on next launch.
|
|
|
|
Returns ``True`` when the update was confirmed (caller should trigger
|
|
app exit so the relaunch is deferred to the main thread after
|
|
prompt_toolkit cleans up terminal modes). Returns ``False`` / falsy
|
|
when cancelled.
|
|
"""
|
|
from hermes_cli.config import is_managed, format_managed_message
|
|
|
|
if is_managed():
|
|
print(f" ✗ {format_managed_message('update Hermes Agent')}")
|
|
return False
|
|
|
|
# Use the prompt_toolkit-native modal so the confirmation panel
|
|
# renders properly above the composer and avoids raw input() races
|
|
# with the prompt_toolkit event loop (same pattern as
|
|
# _confirm_destructive_slash).
|
|
choices = [
|
|
("once", "Update Now", "exit the current session and update Hermes Agent"),
|
|
("cancel", "Cancel", "keep the current session"),
|
|
]
|
|
raw = self._prompt_text_input_modal(
|
|
title="⚕ Update Hermes Agent",
|
|
detail="This will exit the current session and run `hermes update`.",
|
|
choices=choices,
|
|
)
|
|
if raw is None:
|
|
print(" 🟡 /update cancelled.")
|
|
return False
|
|
choice = self._normalize_slash_confirm_choice(raw, choices)
|
|
if choice != "once":
|
|
print(" 🟡 /update cancelled.")
|
|
return False
|
|
|
|
print()
|
|
print(" ⚕ Launching update...")
|
|
print()
|
|
|
|
# Store the relaunch args so run() can exec them from the main thread
|
|
# after prompt_toolkit exits and restores terminal modes. Calling
|
|
# relaunch() directly here (from the process_loop daemon thread) would
|
|
# skip terminal cleanup on POSIX (execvp replaces the process mid-TUI)
|
|
# and only exit the worker thread on Windows (subprocess.run +
|
|
# sys.exit inside a non-main thread does not exit the process).
|
|
self._pending_relaunch = ["update"]
|
|
return True
|
|
|
|
def _handle_voice_command(self, command: str):
|
|
"""Handle /voice [on|off|tts|status] command."""
|
|
from cli import _cprint
|
|
parts = command.strip().split(maxsplit=1)
|
|
subcommand = parts[1].lower().strip() if len(parts) > 1 else ""
|
|
|
|
if subcommand == "on":
|
|
self._enable_voice_mode()
|
|
elif subcommand == "off":
|
|
self._disable_voice_mode()
|
|
elif subcommand == "tts":
|
|
self._toggle_voice_tts()
|
|
elif subcommand == "status":
|
|
self._show_voice_status()
|
|
elif subcommand == "":
|
|
# Toggle
|
|
if self._voice_mode:
|
|
self._disable_voice_mode()
|
|
else:
|
|
self._enable_voice_mode()
|
|
else:
|
|
_cprint(f"Unknown voice subcommand: {subcommand}")
|
|
_cprint("Usage: /voice [on|off|tts|status]")
|