"""Slash-command handlers for the interactive CLI (god-file decomposition Phase 4). This module hosts the ``_handle_*_command`` slash-command handlers lifted out of ``cli.py``'s ``HermesCLI`` class. ``HermesCLI`` inherits ``CLICommandsMixin`` so every ``self.`` call resolves unchanged via the MRO — behavior-neutral. Import discipline (mirrors gateway/slash_commands.py, PR #41886): * Neutral, non-cyclic deps are imported at module top-level below. * cli.py-internal symbols (the ``_cprint``/``_ACCENT``/``save_config_value``… module-level helpers and constants) are imported LAZILY inside each handler via ``from cli import ...`` — that resolves at call time when ``cli`` is fully loaded, so the mixin module never imports ``cli`` at top level (no cycle). """ from __future__ import annotations import json import os import sys import threading import time import uuid from datetime import datetime from urllib.parse import urlparse from rich import box as rich_box from rich.markup import escape as _escape from rich.panel import Panel from hermes_constants import display_hermes_home, is_termux as _is_termux_environment from hermes_cli.browser_connect import ( DEFAULT_BROWSER_CDP_URL, is_browser_debug_ready, manual_chrome_debug_command, ) class CLICommandsMixin: """Mixin holding the interactive-CLI slash-command handlers. All methods use only ``self`` state plus the imports above and per-method lazy ``from cli import ...`` lines, so they compose cleanly onto ``HermesCLI`` via the MRO. """ def _handle_rollback_command(self, command: str): """Handle /rollback — list, diff, or restore filesystem checkpoints. Syntax: /rollback — list checkpoints /rollback — restore checkpoint N (also undoes last chat turn) /rollback diff — preview changes since checkpoint N /rollback — restore a single file from checkpoint N """ from tools.checkpoint_manager import format_checkpoint_list if not hasattr(self, 'agent') or not self.agent: print(" No active agent session.") return mgr = self.agent._checkpoint_mgr if not mgr.enabled: print(" Checkpoints are not enabled.") print(" Enable with: hermes --checkpoints") print(" Or in config.yaml: checkpoints: { enabled: true }") return cwd = os.getenv("TERMINAL_CWD", os.getcwd()) parts = command.split() args = parts[1:] if len(parts) > 1 else [] if not args: # List checkpoints checkpoints = mgr.list_checkpoints(cwd) print(format_checkpoint_list(checkpoints, cwd)) return # Handle /rollback diff if args[0].lower() == "diff": if len(args) < 2: print(" Usage: /rollback diff ") return checkpoints = mgr.list_checkpoints(cwd) if not checkpoints: print(f" No checkpoints found for {cwd}") return target_hash = self._resolve_checkpoint_ref(args[1], checkpoints) if not target_hash: return result = mgr.diff(cwd, target_hash) if result["success"]: stat = result.get("stat", "") diff = result.get("diff", "") if not stat and not diff: print(" No changes since this checkpoint.") else: if stat: print(f"\n{stat}") if diff: # Limit diff output to avoid terminal flood diff_lines = diff.splitlines() if len(diff_lines) > 80: print("\n".join(diff_lines[:80])) print(f"\n ... ({len(diff_lines) - 80} more lines, showing first 80)") else: print(f"\n{diff}") else: print(f" ❌ {result['error']}") return # Resolve checkpoint reference (number or hash) checkpoints = mgr.list_checkpoints(cwd) if not checkpoints: print(f" No checkpoints found for {cwd}") return target_hash = self._resolve_checkpoint_ref(args[0], checkpoints) if not target_hash: return # Check for file-level restore: /rollback file_path = args[1] if len(args) > 1 else None result = mgr.restore(cwd, target_hash, file_path=file_path) if result["success"]: if file_path: print(f" ✅ Restored {file_path} from checkpoint {result['restored_to']}: {result['reason']}") else: print(f" ✅ Restored to checkpoint {result['restored_to']}: {result['reason']}") print(" A pre-rollback snapshot was saved automatically.") # Also undo the last conversation turn so the agent's context # matches the restored filesystem state if self.conversation_history: self.undo_last(prefill=False) print(" Chat turn undone to match restored file state.") else: print(f" ❌ {result['error']}") def _handle_snapshot_command(self, command: str): """Handle /snapshot — lightweight state snapshots for Hermes config/state. Syntax: /snapshot — list recent snapshots /snapshot create [label] — create a snapshot /snapshot restore — restore state from snapshot /snapshot prune [N] — prune to N snapshots (default 20) """ from hermes_cli.backup import ( create_quick_snapshot, list_quick_snapshots, restore_quick_snapshot, prune_quick_snapshots, ) from hermes_constants import display_hermes_home parts = command.split() subcmd = parts[1].lower() if len(parts) > 1 else "list" if subcmd in {"list", "ls"}: snaps = list_quick_snapshots() if not snaps: print(" No state snapshots yet.") print(" Create one: /snapshot create [label]") return print(f" State snapshots ({display_hermes_home()}/state-snapshots/):\n") print(f" {'#':>3} {'ID':<35} {'Files':>5} {'Size':>10} {'Label'}") print(f" {'─'*3} {'─'*35} {'─'*5} {'─'*10} {'─'*20}") for i, s in enumerate(snaps, 1): size = s.get("total_size", 0) if size < 1024: size_str = f"{size} B" elif size < 1024 * 1024: size_str = f"{size / 1024:.0f} KB" else: size_str = f"{size / 1024 / 1024:.1f} MB" label = s.get("label") or "" print(f" {i:3} {s['id']:<35} {s.get('file_count', 0):>5} {size_str:>10} {label}") elif subcmd == "create": label = " ".join(parts[2:]) if len(parts) > 2 else None snap_id = create_quick_snapshot(label=label) if snap_id: print(f" Snapshot created: {snap_id}") else: print(" No state files found to snapshot.") elif subcmd in {"restore", "rewind"}: if len(parts) < 3: print(" Usage: /snapshot restore ") # Show hint with most recent snapshot snaps = list_quick_snapshots(limit=1) if snaps: print(f" Most recent: {snaps[0]['id']}") return snap_id = parts[2] # Allow restore by number (1-indexed) try: idx = int(snap_id) snaps = list_quick_snapshots() if 1 <= idx <= len(snaps): snap_id = snaps[idx - 1]["id"] else: print(f" Invalid snapshot number. Use 1-{len(snaps)}.") return except ValueError: pass if restore_quick_snapshot(snap_id): print(f" Restored state from: {snap_id}") print(" Restart recommended for state.db changes to take effect.") else: print(f" Snapshot not found: {snap_id}") elif subcmd == "prune": keep = 20 if len(parts) > 2: try: keep = int(parts[2]) except ValueError: print(" Usage: /snapshot prune [keep-count]") return deleted = prune_quick_snapshots(keep=keep) print(f" Pruned {deleted} old snapshot(s) (keeping {keep}).") else: print(f" Unknown subcommand: {subcmd}") print(" Usage: /snapshot [list|create [label]|restore |prune [N]]") def _handle_stop_command(self): """Handle /stop — kill all running background processes. Inspired by OpenAI Codex's separation of interrupt (stop current turn) from /stop (clean up background processes). See openai/codex#14602. """ from tools.process_registry import process_registry processes = process_registry.list_sessions() running = [p for p in processes if p.get("status") == "running"] if not running: print(" No running background processes.") return print(f" Stopping {len(running)} background process(es)...") killed = process_registry.kill_all() print(f" ✅ Stopped {killed} process(es).") def _handle_agents_command(self): """Handle /agents — show background processes and agent status.""" from cli import _cprint from tools.process_registry import format_uptime_short, process_registry processes = process_registry.list_sessions() running = [p for p in processes if p.get("status") == "running"] finished = [p for p in processes if p.get("status") != "running"] _cprint(f" Running processes: {len(running)}") for p in running: cmd = p.get("command", "")[:80] up = format_uptime_short(p.get("uptime_seconds", 0)) _cprint(f" {p.get('session_id', '?')} · {up} · {cmd}") if finished: _cprint(f" Recently finished: {len(finished)}") agent_running = getattr(self, "_agent_running", False) _cprint(f" Agent: {'running' if agent_running else 'idle'}") def _handle_paste_command(self): """Handle /paste — explicitly check clipboard for an image. This is the reliable fallback for terminals where BracketedPaste doesn't fire for image-only clipboard content (e.g., VSCode terminal, Windows Terminal with WSL2). """ from cli import _DIM, _RST, _cprint, _termux_example_image_path if _is_termux_environment(): _cprint( f" {_DIM}Clipboard image paste is not available on Termux — " f"use /image or paste a local image path like " f"{_termux_example_image_path()}{_RST}" ) return from hermes_cli.clipboard import has_clipboard_image if has_clipboard_image(): if self._try_attach_clipboard_image(): n = len(self._attached_images) _cprint(f" 📎 Image #{n} attached from clipboard") else: _cprint(f" {_DIM}(>_<) Clipboard has an image but extraction failed{_RST}") else: _cprint(f" {_DIM}(._.) No image found in clipboard{_RST}") def _handle_copy_command(self, cmd_original: str) -> None: """Handle /copy [number] — copy assistant output to clipboard.""" from cli import _assistant_copy_text, _cprint parts = cmd_original.split(maxsplit=1) arg = parts[1].strip() if len(parts) > 1 else "" assistant = [m for m in self.conversation_history if m.get("role") == "assistant"] if not assistant: _cprint(" Nothing to copy yet.") return if arg: try: idx = int(arg) - 1 except ValueError: _cprint(" Usage: /copy [number]") return if idx < 0 or idx >= len(assistant): _cprint(f" Invalid response number. Use 1-{len(assistant)}.") return else: idx = len(assistant) - 1 while idx >= 0 and not _assistant_copy_text(assistant[idx].get("content")): idx -= 1 if idx < 0: _cprint(" Nothing to copy in assistant responses yet.") return text = _assistant_copy_text(assistant[idx].get("content")) if not text: _cprint(" Nothing to copy in that assistant response.") return try: self._write_osc52_clipboard(text) _cprint(f" Copied assistant response #{idx + 1} to clipboard") except Exception as e: _cprint(f" Clipboard copy failed: {e}") def _handle_image_command(self, cmd_original: str): """Handle /image — attach a local image file for the next prompt.""" from cli import _DIM, _IMAGE_EXTENSIONS, _RST, _cprint, _resolve_attachment_path, _split_path_input, _termux_example_image_path raw_args = (cmd_original.split(None, 1)[1].strip() if " " in cmd_original else "") if not raw_args: hint = _termux_example_image_path() if _is_termux_environment() else "/path/to/image.png" _cprint(f" {_DIM}Usage: /image e.g. /image {hint}{_RST}") return path_token, _remainder = _split_path_input(raw_args) image_path = _resolve_attachment_path(path_token) if image_path is None: _cprint(f" {_DIM}(>_<) File not found: {path_token}{_RST}") return if image_path.suffix.lower() not in _IMAGE_EXTENSIONS: _cprint(f" {_DIM}(._.) Not a supported image file: {image_path.name}{_RST}") return self._attached_images.append(image_path) _cprint(f" 📎 Attached image: {image_path.name}") if _remainder: _cprint(f" {_DIM}Now type your prompt (or use --image in single-query mode): {_remainder}{_RST}") elif _is_termux_environment(): _cprint(f" {_DIM}Tip: type your next message, or run hermes chat -q --image {_termux_example_image_path(image_path.name)} \"What do you see?\"{_RST}") def _handle_tools_command(self, cmd: str): """Handle /tools [list|disable|enable] slash commands. /tools (no args) shows the tool list. /tools list shows enabled/disabled status per toolset. /tools disable/enable saves the change to config and resets the session so the new tool set takes effect cleanly (no prompt-cache breakage mid-conversation). """ from cli import _ACCENT, _DIM, _RST, _cprint import shlex from argparse import Namespace from contextlib import redirect_stdout from io import StringIO from hermes_cli.tools_config import tools_disable_enable_command def _run_capture(ns: Namespace) -> None: """Run tools_disable_enable_command, routing its ANSI-colored print() output through _cprint when inside the interactive TUI so escapes aren't mangled by patch_stdout's StdoutProxy into garbled '?[32m...?[0m' text. Outside the TUI (standalone mode, tests), call straight through so real stdout / pytest capture works as expected. """ # Standalone/tests, run as usual if getattr(self, "_app", None) is None: tools_disable_enable_command(ns) return # Buffer reports isatty()=True so color() in hermes_cli/colors.py # still emits ANSI escapes. StringIO.isatty() is False, which # would otherwise strip all colors before we re-render them. class _TTYBuf(StringIO): def isatty(self) -> bool: return True buf = _TTYBuf() with redirect_stdout(buf): tools_disable_enable_command(ns) for line in buf.getvalue().splitlines(): _cprint(line) try: parts = shlex.split(cmd) except ValueError: parts = cmd.split() subcommand = parts[1] if len(parts) > 1 else "" if subcommand not in {"list", "disable", "enable"}: self.show_tools() return if subcommand == "list": _run_capture(Namespace(tools_action="list", platform="cli")) return names = parts[2:] if not names: print(f"(._.) Usage: /tools {subcommand} [name ...]") print(f" Built-in toolset: /tools {subcommand} web") print(f" MCP tool: /tools {subcommand} github:create_issue") return # Apply the change directly — the user typing the command is implicit # consent. Do NOT use input() here; it hangs inside prompt_toolkit's # TUI event loop (known pitfall). verb = "Disabling" if subcommand == "disable" else "Enabling" label = ", ".join(names) _cprint(f"{_ACCENT}{verb} {label}...{_RST}") _run_capture(Namespace(tools_action=subcommand, names=names, platform="cli")) # Reset session so the new tool config is picked up from a clean state from hermes_cli.tools_config import _get_platform_tools from hermes_cli.config import load_config self.enabled_toolsets = _get_platform_tools(load_config(), "cli") self.new_session() _cprint(f"{_DIM}Session reset. New tool configuration is active.{_RST}") def _handle_profile_command(self): """Display active profile name and home directory.""" from hermes_constants import display_hermes_home from hermes_cli.profiles import get_active_profile_name display = display_hermes_home() profile_name = get_active_profile_name() print() print(f" Profile: {profile_name}") print(f" Home: {display}") print() def _handle_handoff_command(self, cmd_original: str) -> bool: """Handle ``/handoff `` — transfer this CLI session to a gateway platform. Flow: 1. Validate platform name + the gateway has a home channel for it. 2. Reject if the agent is currently running (the in-flight turn would race with the gateway's switch_session). 3. Write ``handoff_state='pending'`` on this session row. 4. Block-poll ``state.db`` for terminal state (timeout 60s). 5. On ``completed`` → print resume hint and signal CLI exit by returning False (the caller honors that like ``/quit``). 6. On ``failed`` / timeout → print error and return True so the user keeps their CLI session. Returns: False to signal CLI exit, True to keep going. """ from cli import _cprint from hermes_state import format_session_db_unavailable parts = cmd_original.split(maxsplit=1) if len(parts) < 2 or not parts[1].strip(): _cprint(" Usage: /handoff ") _cprint(" Hands the current session off to that platform's home channel.") _cprint(" The CLI session ends here; resume it later with /resume.") return True platform_name = parts[1].strip().lower() # Validate platform name + home channel via the live gateway config. try: from gateway.config import load_gateway_config, Platform except Exception as exc: # pragma: no cover — gateway pkg always shipped _cprint(f" Could not load gateway config: {exc}") return True try: platform = Platform(platform_name) except (ValueError, KeyError): _cprint(f" Unknown platform '{platform_name}'.") return True try: gw_config = load_gateway_config() except Exception as exc: _cprint(f" Could not load gateway config: {exc}") return True pcfg = gw_config.platforms.get(platform) if not pcfg or not pcfg.enabled: _cprint(f" Platform '{platform_name}' is not configured/enabled in the gateway.") return True home = gw_config.get_home_channel(platform) if not home or not home.chat_id: _cprint(f" No home channel configured for {platform_name}.") _cprint(f" Set one with /sethome on the destination chat first.") return True # Refuse mid-turn: an in-flight agent run would race with the # gateway's switch_session and the synthetic turn dispatch. if getattr(self, "_agent_running", False): _cprint(" Agent is busy. Wait for the current turn to finish, then retry /handoff.") return True # Make sure we have a SessionDB handle. if not self._session_db: try: from hermes_state import SessionDB self._session_db = SessionDB() except Exception: pass if not self._session_db: _cprint(f" {format_session_db_unavailable()}") return True # Make sure the session row exists in state.db. Most CLI sessions # are written via _flush_messages_to_session_db on the first turn # already, but if the user tries to hand off an empty session we # still want a row to mark. try: row = self._session_db.get_session(self.session_id) if not row: # Nothing has flushed yet. Create a stub so the gateway has # something to switch_session onto. Inserting via title-set # is the simplest path because set_session_title's INSERT OR # IGNORE creates the row. placeholder_title = f"handoff-{self.session_id[:8]}" self._session_db.set_session_title(self.session_id, placeholder_title) except Exception as exc: _cprint(f" Could not ensure session row in state.db: {exc}") return True # Display title for messaging. session_title = "" try: row = self._session_db.get_session(self.session_id) if row: session_title = row.get("title") or "" except Exception: pass if not session_title: session_title = self.session_id[:8] # Mark pending — gateway watcher will pick this up. ok = self._session_db.request_handoff(self.session_id, platform_name) if not ok: _cprint(" Session is already in flight for handoff. Wait for it to settle, then retry.") return True _cprint(f" Queued handoff of '{session_title}' → {platform_name} (home: {home.name}).") _cprint(f" Waiting for the gateway to pick it up...") # Poll-block on terminal state. Tick every 0.5s; bail at ~60s. import time as _time deadline = _time.time() + 60.0 last_state = "pending" while _time.time() < deadline: try: state_row = self._session_db.get_handoff_state(self.session_id) except Exception: state_row = None current = (state_row or {}).get("state") or "pending" if current != last_state: if current == "running": _cprint(" Gateway picked it up; transferring...") last_state = current if current == "completed": _cprint("") _cprint(f" ↻ Handoff complete. The session is now active on {platform_name}.") _cprint(f" Resume it on this CLI later with: /resume {session_title}") _cprint("") # End the CLI cleanly — same exit semantics as /quit. self._should_exit = True return False if current == "failed": err = (state_row or {}).get("error") or "unknown error" _cprint(f" Handoff failed: {err}") _cprint(" Your CLI session is intact. Try /handoff again, or /resume on the platform manually.") return True _time.sleep(0.5) # Timed out. Clear the pending flag so the user can retry. try: self._session_db.fail_handoff(self.session_id, "timed out waiting for gateway") except Exception: pass _cprint(" Timed out waiting for the gateway. Is `hermes gateway` running?") _cprint(" Your CLI session is intact.") return True def _handle_resume_command(self, cmd_original: str) -> None: """Handle /resume — switch to a previous session mid-conversation.""" from cli import _cprint, _sync_process_session_id parts = cmd_original.split(None, 1) target = parts[1].strip() if len(parts) > 1 else "" # Strip common outer brackets/quotes users may type literally from the # usage hint (e.g. ``/resume `` or ``/resume [abc123]``). The # `/resume` help text shows angle brackets as a placeholder and a few # users copy them through verbatim. Stripping them keeps the lookup # working without changing the help string. if len(target) >= 2 and ( (target[0] == "<" and target[-1] == ">") or (target[0] == "[" and target[-1] == "]") or (target[0] == '"' and target[-1] == '"') or (target[0] == "'" and target[-1] == "'") ): target = target[1:-1].strip() if not target: _cprint(" Usage: /resume ") if self._show_recent_sessions(reason="resume"): # Arm a one-shot pending-resume selection so the user can type # just the number (`3`) on the next line instead of having to # retype `/resume 3`. The list here must match the one shown by # _show_recent_sessions and used for index resolution below — # all three go through _list_recent_sessions(limit=10). See # #34584. self._pending_resume_sessions = self._list_recent_sessions(limit=10) return _cprint(" Tip: Use /history or `hermes sessions list` to find sessions.") return # Any explicit /resume supersedes a previously-armed bare # numbered prompt. self._pending_resume_sessions = None if not self._session_db: from hermes_state import format_session_db_unavailable _cprint(f" {format_session_db_unavailable()}") return # Resolve numbered selection, title, or ID if target.isdigit(): sessions = self._list_recent_sessions(limit=10) index = int(target) if index < 1 or index > len(sessions): _cprint(f" Resume index {index} is out of range.") _cprint(" Use /resume with no arguments to see available sessions.") return selected = sessions[index - 1] target_id = selected["id"] else: from hermes_cli.main import _resolve_session_by_name_or_id resolved = _resolve_session_by_name_or_id(target) target_id = resolved or target session_meta = self._session_db.get_session(target_id) if not session_meta: _cprint(f" Session not found: {target}") _cprint(" Use /history or `hermes sessions list` to see available sessions.") return # If the target is the empty head of a compression chain, redirect to # the descendant that actually holds the transcript. See #15000. try: resolved_id = self._session_db.resolve_resume_session_id(target_id) except Exception: resolved_id = target_id if resolved_id and resolved_id != target_id: _cprint( f" Session {target_id} was compressed into {resolved_id}; " f"resuming the descendant with your transcript." ) target_id = resolved_id resolved_meta = self._session_db.get_session(target_id) if resolved_meta: session_meta = resolved_meta if target_id == self.session_id: _cprint(" Already on that session.") return old_session_id = self.session_id # End current session try: self._session_db.end_session(self.session_id, "resumed_other") except Exception: pass # Switch to the target session self.session_id = target_id self._resumed = True self._pending_title = None _sync_process_session_id(target_id) # Load conversation history (strip transcript-only metadata entries) restored = self._session_db.get_messages_as_conversation(target_id) restored = [m for m in (restored or []) if m.get("role") != "session_meta"] self.conversation_history = restored # Re-open the target session so it's not marked as ended try: self._session_db.reopen_session(target_id) except Exception: pass # Sync the agent if already initialised if self.agent: self.agent.session_id = target_id self.agent.reset_session_state() if hasattr(self.agent, "_last_flushed_db_idx"): self.agent._last_flushed_db_idx = len(self.conversation_history) if hasattr(self.agent, "_todo_store"): try: from tools.todo_tool import TodoStore self.agent._todo_store = TodoStore() except Exception: pass if hasattr(self.agent, "_invalidate_system_prompt"): self.agent._invalidate_system_prompt() # Notify memory providers that session_id rotated to a resumed # session. reset=False — the provider's accumulated state is # still valid; it just needs to target the new session_id for # subsequent writes. See #6672. try: _mm = getattr(self.agent, "_memory_manager", None) if _mm is not None: _mm.on_session_switch( target_id, parent_session_id=old_session_id or "", reset=False, reason="resume", ) except Exception: pass title_part = f" \"{session_meta['title']}\"" if session_meta.get("title") else "" msg_count = len([m for m in self.conversation_history if m.get("role") == "user"]) if self.conversation_history: _cprint( f" ↻ Resumed session {target_id}{title_part}" f" ({msg_count} user message{'s' if msg_count != 1 else ''}," f" {len(self.conversation_history)} total)" ) self._display_resumed_history() else: _cprint(f" ↻ Resumed session {target_id}{title_part} — no messages, starting fresh.") def _handle_sessions_command(self, cmd_original: str) -> None: """Handle /sessions [list|] — browse or resume previous sessions. Without arguments, prints the same recent-sessions table that /resume shows when called without a target, and tells the user how to resume. With an explicit subcommand or target, delegates to the resume flow so ``/sessions `` and ``/resume `` behave identically. The TUI ships an interactive picker overlay for this command; the classic CLI prints an inline list because there is no equivalent overlay primitive here. Without this handler the canonical name ``sessions`` falls through ``process_command``'s elif chain and prints ``Unknown command: sessions`` even though the command is registered in the central COMMAND_REGISTRY. """ from cli import _cprint parts = cmd_original.split(None, 1) arg = parts[1].strip() if len(parts) > 1 else "" sub = arg.lower() # Bare /sessions or /sessions list — show recent sessions inline. if not arg or sub in {"list", "ls", "browse"}: if not self._session_db: from hermes_state import format_session_db_unavailable _cprint(f" {format_session_db_unavailable()}") return if not self._show_recent_sessions(reason="sessions"): _cprint(" (._.) No previous sessions yet.") return # /sessions behaves the same as /resume . self._handle_resume_command(f"/resume {arg}") def _handle_branch_command(self, cmd_original: str) -> None: """Handle /branch [name] — fork the current session into a new independent copy. Copies the full conversation history to a new session so the user can explore a different approach without losing the original session state. Inspired by Claude Code's /branch command. """ from cli import _cprint, _sync_process_session_id if not self.conversation_history: _cprint(" No conversation to branch — send a message first.") return if not self._session_db: from hermes_state import format_session_db_unavailable _cprint(f" {format_session_db_unavailable()}") return parts = cmd_original.split(None, 1) branch_name = parts[1].strip() if len(parts) > 1 else "" # Generate the new session ID now = datetime.now() timestamp_str = now.strftime("%Y%m%d_%H%M%S") short_uuid = uuid.uuid4().hex[:6] new_session_id = f"{timestamp_str}_{short_uuid}" # Determine branch title if branch_name: branch_title = branch_name else: # Auto-generate from the current session title current_title = None if self._session_db: current_title = self._session_db.get_session_title(self.session_id) base = current_title or "branch" branch_title = self._session_db.get_next_title_in_lineage(base) # Save the current session's state before branching parent_session_id = self.session_id # End the old session try: self._session_db.end_session(self.session_id, "branched") except Exception: pass # Create the new session with parent link. # Persist a stable ``_branched_from`` marker in model_config so # list_sessions_rich() can keep the branch visible in /resume and # /sessions even after the parent is reopened and re-ended with a # different end_reason (e.g. tui_shutdown overwriting 'branched'). try: self._session_db.create_session( session_id=new_session_id, source=os.environ.get("HERMES_SESSION_SOURCE", "cli"), model=self.model, model_config={ "max_iterations": self.max_turns, "reasoning_config": self.reasoning_config, "_branched_from": parent_session_id, }, parent_session_id=parent_session_id, ) except Exception as e: _cprint(f" Failed to create branch session: {e}") return # Copy conversation history to the new session for msg in self.conversation_history: try: self._session_db.append_message( session_id=new_session_id, role=msg.get("role", "user"), content=msg.get("content"), tool_name=msg.get("tool_name") or msg.get("name"), tool_calls=msg.get("tool_calls"), tool_call_id=msg.get("tool_call_id"), reasoning=msg.get("reasoning"), ) except Exception: pass # Best-effort copy # Set title on the branch try: self._session_db.set_session_title(new_session_id, branch_title) except Exception: pass # Switch to the new session self._transfer_session_yolo(self.session_id, new_session_id) self.session_id = new_session_id self.session_start = now self._pending_title = None self._resumed = True # Prevents auto-title generation _sync_process_session_id(new_session_id) # Sync the agent if self.agent: self.agent.session_id = new_session_id self.agent.session_start = now self.agent.reset_session_state() if hasattr(self.agent, "_last_flushed_db_idx"): self.agent._last_flushed_db_idx = len(self.conversation_history) if hasattr(self.agent, "_todo_store"): try: from tools.todo_tool import TodoStore self.agent._todo_store = TodoStore() except Exception: pass if hasattr(self.agent, "_invalidate_system_prompt"): self.agent._invalidate_system_prompt() # Notify memory providers that session_id forked to a new branch. # reset=False — the branched session carries the transcript # forward, so provider state tracks the lineage. parent_session_id # links the branch back to the original. See #6672. try: _mm = getattr(self.agent, "_memory_manager", None) if _mm is not None: _mm.on_session_switch( new_session_id, parent_session_id=parent_session_id or "", reset=False, reason="branch", ) except Exception: pass msg_count = len([m for m in self.conversation_history if m.get("role") == "user"]) _cprint( f" ⑂ Branched session \"{branch_title}\"" f" ({msg_count} user message{'s' if msg_count != 1 else ''})" ) _cprint(f" Original session: {parent_session_id}") _cprint(f" Branch session: {new_session_id}") def _handle_gquota_command(self, cmd_original: str) -> None: """Show Google Gemini Code Assist quota usage for the current OAuth account.""" try: from agent.google_oauth import get_valid_access_token, GoogleOAuthError, load_credentials from agent.google_code_assist import retrieve_user_quota, CodeAssistError except ImportError as exc: self._console_print(f" [red]Gemini modules unavailable: {exc}[/]") return try: access_token = get_valid_access_token() except GoogleOAuthError as exc: self._console_print(f" [yellow]{exc}[/]") self._console_print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.") return creds = load_credentials() project_id = (creds.project_id if creds else "") or "" try: buckets = retrieve_user_quota(access_token, project_id=project_id) except CodeAssistError as exc: self._console_print(f" [red]Quota lookup failed:[/] {exc}") return if not buckets: self._console_print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]") return # Sort for stable display, group by model buckets.sort(key=lambda b: (b.model_id, b.token_type)) self._console_print() self._console_print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})") self._console_print() for b in buckets: pct = max(0.0, min(1.0, b.remaining_fraction)) width = 20 filled = int(round(pct * width)) bar = "▓" * filled + "░" * (width - filled) pct_str = f"{int(pct * 100):3d}%" header = b.model_id if b.token_type: header += f" [{b.token_type}]" self._console_print(f" {header:40s} {bar} {pct_str}") self._console_print() def _handle_personality_command(self, cmd: str): """Handle the /personality command to set predefined personalities.""" from cli import save_config_value parts = cmd.split(maxsplit=1) if len(parts) > 1: # Set personality personality_name = parts[1].strip().lower() if personality_name in {"none", "default", "neutral"}: self.system_prompt = "" self.agent = None # Force re-init if save_config_value("agent.system_prompt", ""): print("(^_^)b Personality cleared (saved to config)") else: print("(^_^) Personality cleared (session only)") print(" No personality overlay — using base agent behavior.") elif personality_name in self.personalities: self.system_prompt = self._resolve_personality_prompt(self.personalities[personality_name]) self.agent = None # Force re-init if save_config_value("agent.system_prompt", self.system_prompt): print(f"(^_^)b Personality set to '{personality_name}' (saved to config)") else: print(f"(^_^) Personality set to '{personality_name}' (session only)") print(f" \"{self.system_prompt[:60]}{'...' if len(self.system_prompt) > 60 else ''}\"") else: print(f"(._.) Unknown personality: {personality_name}") print(f" Available: none, {', '.join(self.personalities.keys())}") else: # Show available personalities print() print("+" + "-" * 50 + "+") print("|" + " " * 12 + "(^o^)/ Personalities" + " " * 15 + "|") print("+" + "-" * 50 + "+") print() print(f" {'none':<12} - (no personality overlay)") for name, prompt in self.personalities.items(): if isinstance(prompt, dict): preview = prompt.get("description") or prompt.get("system_prompt", "")[:50] else: preview = str(prompt)[:50] print(f" {name:<12} - {preview}") print() print(" Usage: /personality ") print() def _handle_cron_command(self, cmd: str): """Handle the /cron command to manage scheduled tasks.""" from cli import get_job import shlex from tools.cronjob_tools import cronjob as cronjob_tool def _cron_api(**kwargs): return json.loads(cronjob_tool(**kwargs)) def _normalize_skills(values): normalized = [] for value in values: text = str(value or "").strip() if text and text not in normalized: normalized.append(text) return normalized def _parse_flags(tokens): opts = { "name": None, "deliver": None, "repeat": None, "skills": [], "add_skills": [], "remove_skills": [], "clear_skills": False, "all": False, "prompt": None, "schedule": None, "positionals": [], } i = 0 while i < len(tokens): token = tokens[i] if token == "--name" and i + 1 < len(tokens): opts["name"] = tokens[i + 1] i += 2 elif token == "--deliver" and i + 1 < len(tokens): opts["deliver"] = tokens[i + 1] i += 2 elif token == "--repeat" and i + 1 < len(tokens): try: opts["repeat"] = int(tokens[i + 1]) except ValueError: print("(._.) --repeat must be an integer") return None i += 2 elif token == "--skill" and i + 1 < len(tokens): opts["skills"].append(tokens[i + 1]) i += 2 elif token == "--add-skill" and i + 1 < len(tokens): opts["add_skills"].append(tokens[i + 1]) i += 2 elif token == "--remove-skill" and i + 1 < len(tokens): opts["remove_skills"].append(tokens[i + 1]) i += 2 elif token == "--clear-skills": opts["clear_skills"] = True i += 1 elif token == "--all": opts["all"] = True i += 1 elif token == "--prompt" and i + 1 < len(tokens): opts["prompt"] = tokens[i + 1] i += 2 elif token == "--schedule" and i + 1 < len(tokens): opts["schedule"] = tokens[i + 1] i += 2 else: opts["positionals"].append(token) i += 1 return opts tokens = shlex.split(cmd) if len(tokens) == 1: print() print("+" + "-" * 68 + "+") print("|" + " " * 22 + "(^_^) Scheduled Tasks" + " " * 23 + "|") print("+" + "-" * 68 + "+") print() print(" Commands:") print(" /cron list") print(' /cron add "every 2h" "Check server status" [--skill blogwatcher]') print(' /cron edit --schedule "every 4h" --prompt "New task"') print(" /cron edit --skill blogwatcher --skill maps") print(" /cron edit --remove-skill blogwatcher") print(" /cron edit --clear-skills") print(" /cron pause ") print(" /cron resume ") print(" /cron run ") print(" /cron remove ") print() result = _cron_api(action="list") jobs = result.get("jobs", []) if result.get("success") else [] if jobs: print(" Current Jobs:") print(" " + "-" * 63) for job in jobs: repeat_str = job.get("repeat", "?") print(f" {job['job_id'][:12]:<12} | {job['schedule']:<15} | {repeat_str:<8}") if job.get("skills"): print(f" Skills: {', '.join(job['skills'])}") print(f" {job.get('prompt_preview', '')}") if job.get("next_run_at"): print(f" Next: {job['next_run_at']}") print() else: print(" No scheduled jobs. Use '/cron add' to create one.") print() return subcommand = tokens[1].lower() opts = _parse_flags(tokens[2:]) if opts is None: return if subcommand == "list": result = _cron_api(action="list", include_disabled=opts["all"]) jobs = result.get("jobs", []) if result.get("success") else [] if not jobs: print("(._.) No scheduled jobs.") return print() print("Scheduled Jobs:") print("-" * 80) for job in jobs: print(f" ID: {job['job_id']}") print(f" Name: {job['name']}") print(f" State: {job.get('state', '?')}") print(f" Schedule: {job['schedule']} ({job.get('repeat', '?')})") print(f" Next run: {job.get('next_run_at', 'N/A')}") if job.get("skills"): print(f" Skills: {', '.join(job['skills'])}") print(f" Prompt: {job.get('prompt_preview', '')}") if job.get("last_run_at"): print(f" Last run: {job['last_run_at']} ({job.get('last_status', '?')})") print() return if subcommand in {"add", "create"}: positionals = opts["positionals"] if not positionals: print("(._.) Usage: /cron add ") return schedule = opts["schedule"] or positionals[0] prompt = opts["prompt"] or " ".join(positionals[1:]) skills = _normalize_skills(opts["skills"]) if not prompt and not skills: print("(._.) Please provide a prompt or at least one skill") return result = _cron_api( action="create", schedule=schedule, prompt=prompt or None, name=opts["name"], deliver=opts["deliver"], repeat=opts["repeat"], skills=skills or None, ) if result.get("success"): print(f"(^_^)b Created job: {result['job_id']}") print(f" Schedule: {result['schedule']}") if result.get("skills"): print(f" Skills: {', '.join(result['skills'])}") print(f" Next run: {result['next_run_at']}") else: print(f"(x_x) Failed to create job: {result.get('error')}") return if subcommand == "edit": positionals = opts["positionals"] if not positionals: print("(._.) Usage: /cron edit [--schedule ...] [--prompt ...] [--skill ...]") return job_id = positionals[0] existing = get_job(job_id) if not existing: print(f"(._.) Job not found: {job_id}") return final_skills = None replacement_skills = _normalize_skills(opts["skills"]) add_skills = _normalize_skills(opts["add_skills"]) remove_skills = set(_normalize_skills(opts["remove_skills"])) existing_skills = list(existing.get("skills") or ([] if not existing.get("skill") else [existing.get("skill")])) if opts["clear_skills"]: final_skills = [] elif replacement_skills: final_skills = replacement_skills elif add_skills or remove_skills: final_skills = [skill for skill in existing_skills if skill not in remove_skills] for skill in add_skills: if skill not in final_skills: final_skills.append(skill) result = _cron_api( action="update", job_id=job_id, schedule=opts["schedule"], prompt=opts["prompt"], name=opts["name"], deliver=opts["deliver"], repeat=opts["repeat"], skills=final_skills, ) if result.get("success"): job = result["job"] print(f"(^_^)b Updated job: {job['job_id']}") print(f" Schedule: {job['schedule']}") if job.get("skills"): print(f" Skills: {', '.join(job['skills'])}") else: print(" Skills: none") else: print(f"(x_x) Failed to update job: {result.get('error')}") return if subcommand in {"pause", "resume", "run", "remove", "rm", "delete"}: positionals = opts["positionals"] if not positionals: print(f"(._.) Usage: /cron {subcommand} ") return job_id = positionals[0] action = "remove" if subcommand in {"remove", "rm", "delete"} else subcommand result = _cron_api(action=action, job_id=job_id, reason="paused from /cron" if action == "pause" else None) if not result.get("success"): print(f"(x_x) Failed to {action} job: {result.get('error')}") return if action == "pause": print(f"(^_^)b Paused job: {result['job']['name']} ({job_id})") elif action == "resume": print(f"(^_^)b Resumed job: {result['job']['name']} ({job_id})") print(f" Next run: {result['job'].get('next_run_at')}") elif action == "run": print(f"(^_^)b Triggered job: {result['job']['name']} ({job_id})") print(" It will run on the next scheduler tick.") else: removed = result.get("removed_job", {}) print(f"(^_^)b Removed job: {removed.get('name', job_id)} ({job_id})") return print(f"(._.) Unknown cron command: {subcommand}") print(" Available: list, add, edit, pause, resume, run, remove") def _handle_curator_command(self, cmd: str): """Handle /curator slash command. Delegates to hermes_cli.curator so the CLI and the `hermes curator` subcommand share the same handler set. """ import shlex tokens = shlex.split(cmd)[1:] if cmd else [] if not tokens: tokens = ["status"] try: from hermes_cli.curator import cli_main cli_main(tokens) except SystemExit: # argparse calls sys.exit() on --help or errors; swallow so we # don't kill the interactive session. pass except Exception as exc: print(f"(._.) curator: {exc}") def _handle_kanban_command(self, cmd: str): """Handle the /kanban command — delegate to the shared kanban CLI. The string form passed here is the user's full ``/kanban ...`` including the leading slash; we strip it and hand the remainder to ``kanban.run_slash`` which returns a single formatted string. """ from hermes_cli.kanban import run_slash rest = cmd.strip() if rest.startswith("/"): rest = rest.lstrip("/") if rest.startswith("kanban"): rest = rest[len("kanban"):].lstrip() try: output = run_slash(rest) except Exception as exc: # pragma: no cover - defensive output = f"(._.) kanban error: {exc}" if output: print(output) def _handle_skills_command(self, cmd: str): """Handle /skills slash command — delegates to hermes_cli.skills_hub.""" from cli import ChatConsole from hermes_cli.skills_hub import handle_skills_slash handle_skills_slash(cmd, ChatConsole()) def _handle_background_command(self, cmd: str): """Handle /background — run a prompt in a separate background session. Spawns a new AIAgent in a background thread with its own session. When it completes, prints the result to the CLI without modifying the active session's conversation history. """ from cli import AIAgent, ChatConsole, _accent_hex, _cprint, _maybe_remap_for_light_mode, _render_final_assistant_content, set_approval_callback, set_secret_capture_callback, set_sudo_password_callback parts = cmd.strip().split(maxsplit=1) if len(parts) < 2 or not parts[1].strip(): _cprint(" Usage: /background ") _cprint(" Example: /background Summarize the top HN stories today") _cprint(" The task runs in a separate session and results display here when done.") return prompt = parts[1].strip() self._background_task_counter += 1 task_num = self._background_task_counter task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{uuid.uuid4().hex[:6]}" # Make sure we have valid credentials if not self._ensure_runtime_credentials(): _cprint(" (>_<) Cannot start background task: no valid credentials.") return _cprint(f" 🔄 Background task #{task_num} started: \"{prompt[:60]}{'...' if len(prompt) > 60 else ''}\"") _cprint(f" Task ID: {task_id}") _cprint(" You can continue chatting — results will appear when done.\n") turn_route = self._resolve_turn_agent_config(prompt) def run_background(): set_sudo_password_callback(self._sudo_password_callback) set_approval_callback(self._approval_callback) try: set_secret_capture_callback(self._secret_capture_callback) except Exception: pass try: bg_agent = AIAgent( model=turn_route["model"], api_key=turn_route["runtime"].get("api_key"), base_url=turn_route["runtime"].get("base_url"), provider=turn_route["runtime"].get("provider"), api_mode=turn_route["runtime"].get("api_mode"), acp_command=turn_route["runtime"].get("command"), acp_args=turn_route["runtime"].get("args"), max_tokens=turn_route["runtime"].get("max_tokens"), max_iterations=self.max_turns, enabled_toolsets=self.enabled_toolsets, quiet_mode=True, verbose_logging=False, session_id=task_id, platform="cli", session_db=self._session_db, reasoning_config=self.reasoning_config, service_tier=self.service_tier, request_overrides=turn_route.get("request_overrides"), providers_allowed=self._providers_only, providers_ignored=self._providers_ignore, providers_order=self._providers_order, provider_sort=self._provider_sort, provider_require_parameters=self._provider_require_params, provider_data_collection=self._provider_data_collection, openrouter_min_coding_score=self._openrouter_min_coding_score, fallback_model=self._fallback_model, ) # Silence raw spinner; route thinking through TUI widget when no foreground agent is active. bg_agent._print_fn = lambda *_a, **_kw: None def _bg_thinking(text: str) -> None: # Concurrent bg tasks may race on _spinner_text; acceptable for best-effort UI. if not self._agent_running: self._spinner_text = text if self._app: self._app.invalidate() bg_agent.thinking_callback = _bg_thinking result = bg_agent.run_conversation( user_message=prompt, task_id=task_id, ) response = result.get("final_response", "") if result else "" if not response and result and result.get("error"): response = f"Error: {result['error']}" # Display result in the CLI (thread-safe via patch_stdout). # Force a TUI refresh first so spinner/status bar don't overlap # with the output (fixes #2718). if self._app: self._app.invalidate() time.sleep(0.05) # brief pause for refresh print() ChatConsole().print(f"[{_accent_hex()}]{'─' * 40}[/]") _cprint(f" ✅ Background task #{task_num} complete") _cprint(f" Prompt: \"{prompt[:60]}{'...' if len(prompt) > 60 else ''}\"") ChatConsole().print(f"[{_accent_hex()}]{'─' * 40}[/]") if response: try: from hermes_cli.skin_engine import get_active_skin _skin = get_active_skin() label = _skin.get_branding("response_label", "⚕ Hermes") _resp_color = _maybe_remap_for_light_mode(_skin.get_color("response_border", "#CD7F32")) _resp_text = _maybe_remap_for_light_mode(_skin.get_color("banner_text", "#FFF8DC")) except Exception: label = "⚕ Hermes" _resp_color = "#CD7F32" _resp_text = "#FFF8DC" _chat_console = ChatConsole() _chat_console.print(Panel( _render_final_assistant_content(response, mode=self.final_response_markdown), title=f"[{_resp_color} bold]{label} (background #{task_num})[/]", title_align="left", border_style=_resp_color, style=_resp_text, box=rich_box.HORIZONTALS, padding=(1, 4), width=self._scrollback_box_width(), )) else: _cprint(" (No response generated)") # Play bell if enabled if self.bell_on_complete: sys.stdout.write("\a") sys.stdout.flush() except Exception as e: # Same TUI refresh pattern as success path (#2718) if self._app: self._app.invalidate() time.sleep(0.05) print() _cprint(f" ❌ Background task #{task_num} failed: {e}") finally: try: set_sudo_password_callback(None) set_approval_callback(None) set_secret_capture_callback(None) except Exception: pass self._background_tasks.pop(task_id, None) # Clear spinner only if no foreground agent owns it if not self._agent_running: self._spinner_text = "" if self._app: self._invalidate(min_interval=0) thread = threading.Thread(target=run_background, daemon=True, name=f"bg-task-{task_id}") self._background_tasks[task_id] = thread thread.start() def _handle_bundles_command(self, cmd: str) -> None: """In-session ``/bundles`` — show installed skill bundles. Mirrors ``hermes bundles list`` but renders inside the running CLI so users can discover what's available without dropping out of their session. Bundles are loaded via ``/``. """ from cli import ChatConsole, _BOLD, _DIM, _RST, _accent_hex, _cprint try: from agent.skill_bundles import list_bundles, _bundles_dir except Exception as exc: _cprint(f"\033[1;31mBundle subsystem unavailable: {exc}{_RST}") return bundles = list_bundles() if not bundles: _cprint(" No skill bundles installed.") _cprint( f" {_DIM}Create one with: hermes bundles create " f" --skill --skill {_RST}" ) _cprint(f" {_DIM}Directory: {_bundles_dir()}{_RST}") return _cprint(f"\n ▣ {_BOLD}Skill Bundles{_RST} ({len(bundles)} installed):") for info in bundles: skill_count = len(info.get("skills", [])) desc = info.get("description") or f"Load {skill_count} skills" ChatConsole().print( f" [bold {_accent_hex()}]/{info['slug']:<20}[/] " f"[dim]-[/] {_escape(desc)} [dim]({skill_count} skills)[/]" ) for s in info.get("skills", []): ChatConsole().print(f" [dim]· {_escape(s)}[/]") _cprint( f"\n {_DIM}Invoke a bundle with /. " f"Manage with `hermes bundles`.{_RST}" ) def _handle_browser_command(self, cmd: str): """Handle /browser connect|disconnect|status — manage live Chromium-family CDP connection.""" import platform as _plat parts = cmd.strip().split(None, 1) sub = parts[1].lower().strip() if len(parts) > 1 else "status" _DEFAULT_CDP = DEFAULT_BROWSER_CDP_URL current = os.environ.get("BROWSER_CDP_URL", "").strip() if sub.startswith("connect"): # Optionally accept a custom CDP URL: /browser connect ws://host:port connect_parts = cmd.strip().split(None, 2) # ["/browser", "connect", "ws://..."] cdp_url = connect_parts[2].strip() if len(connect_parts) > 2 else _DEFAULT_CDP parsed_cdp = urlparse(cdp_url if "://" in cdp_url else f"http://{cdp_url}") if parsed_cdp.scheme not in {"http", "https", "ws", "wss"}: print() print( f" ⚠ Unsupported browser url scheme: {parsed_cdp.scheme or '(missing)'} " "(expected one of: http, https, ws, wss)" ) print() return try: _port = parsed_cdp.port or (443 if parsed_cdp.scheme in {"https", "wss"} else 80) except ValueError: print() print(f" ⚠ Invalid port in browser url: {cdp_url}") print() return if not parsed_cdp.hostname: print() print(f" ⚠ Missing host in browser url: {cdp_url}") print() return _host = parsed_cdp.hostname if parsed_cdp.path.startswith("/devtools/browser/"): cdp_url = parsed_cdp.geturl() else: cdp_url = parsed_cdp._replace( path="", params="", query="", fragment="", ).geturl() # Clear any existing browser sessions so the next tool call uses the new backend try: from tools.browser_tool import cleanup_all_browsers cleanup_all_browsers() except Exception: pass print() # Check if a Chromium-family browser is already serving CDP on the debug port _already_open = is_browser_debug_ready(cdp_url, timeout=1.0) if _already_open: print(f" ✓ Chromium-family browser is already listening on port {_port}") elif cdp_url == _DEFAULT_CDP: # Try to auto-launch a Chromium-family browser with remote debugging print(" Chromium-family browser isn't running with remote debugging — attempting to launch...") _launched = self._try_launch_chrome_debug(_port, _plat.system()) if _launched: # Wait for the DevTools discovery endpoint to come up for _wait in range(10): if is_browser_debug_ready(cdp_url, timeout=1.0): _already_open = True break time.sleep(0.5) if _already_open: print(f" ✓ Chromium-family browser launched and listening on port {_port}") else: print(f" ⚠ Browser launched but port {_port} isn't responding yet") print(" Try again in a few seconds — the debug instance may still be starting") else: print(" ⚠ Could not auto-launch a Chromium-family browser") sys_name = _plat.system() chrome_cmd = manual_chrome_debug_command(_port, sys_name) if chrome_cmd: print(f" Launch a Chromium-family browser manually:") print(f" {chrome_cmd}") else: print(" No supported Chromium-family browser executable found in this environment") else: print(f" ⚠ Port {_port} is not reachable at {cdp_url}") if not _already_open: print() print("Browser not connected — start a Chromium-family browser with remote debugging and retry /browser connect") print() return os.environ["BROWSER_CDP_URL"] = cdp_url # Eagerly start the CDP supervisor so pending_dialogs + frame_tree # show up in the next browser_snapshot. No-op if already started. try: from tools.browser_tool import _ensure_cdp_supervisor # type: ignore[import-not-found] _ensure_cdp_supervisor("default") except Exception: pass print() print("🌐 Browser connected to live Chromium-family browser via CDP") print(f" Endpoint: {cdp_url}") print() # Inject context message so the model knows this slash command # intentionally makes the dev/debug CDP browser available for use. if hasattr(self, '_pending_input'): self._pending_input.put( "[System note: The user invoked /browser connect and connected your browser tools to " "a Chromium-family dev/debug browser via Chrome DevTools Protocol. " "Your browser_navigate, browser_snapshot, browser_click, and other browser tools now " "control that CDP browser. The command itself is a signal that using browser tools for " "their current browser-related request is expected; do not wait for separate permission " "just because CDP is connected. This is typically a Hermes-managed isolated debug " "profile, not the user's main everyday browser. It is still user-visible and may contain " "pages, logged-in sessions, or cookies in that debug profile, so avoid destructive actions, " "closing tabs, or navigating away unless the user's task calls for it.]" ) elif sub == "disconnect": if current: os.environ.pop("BROWSER_CDP_URL", None) try: from tools.browser_tool import cleanup_all_browsers, _stop_cdp_supervisor _stop_cdp_supervisor("default") cleanup_all_browsers() except Exception: pass print() print("🌐 Browser disconnected from live Chromium-family browser") print(" Browser tools reverted to default mode (local headless or cloud provider)") print() if hasattr(self, '_pending_input'): self._pending_input.put( "[System note: The user has disconnected the browser tools from their live Chromium-family browser. " "Browser tools are back to default mode (headless local browser or cloud provider).]" ) else: print() print("Browser is not connected to a live Chromium-family browser (already using default mode)") print() elif sub == "status": print() if current: print("🌐 Browser: connected to live Chromium-family browser via CDP") print(f" Endpoint: {current}") _port = 9222 try: _port = int(current.rsplit(":", 1)[-1].split("/")[0]) except (ValueError, IndexError): pass try: import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(1) s.connect(("127.0.0.1", _port)) s.close() print(" Status: ✓ reachable") except (OSError, Exception): print(" Status: ⚠ not reachable (browser may not be running)") else: try: from tools.browser_tool import _get_cloud_provider provider = _get_cloud_provider() except Exception: provider = None if provider is not None: print(f"🌐 Browser: {provider.provider_name()} (cloud)") else: # Show engine info for local mode try: from tools.browser_tool import _get_browser_engine engine = _get_browser_engine() except Exception: engine = "auto" if engine == "lightpanda": print("🌐 Browser: local Lightpanda (agent-browser --engine lightpanda)") print(" ⚡ Lightpanda: faster navigation, no screenshot support") print(" Automatic Chromium fallback for screenshots and failed commands") elif engine == "chrome": print("🌐 Browser: local headless Chromium (agent-browser --engine chrome)") else: print("🌐 Browser: local headless Chromium (agent-browser)") print() print(" /browser connect — connect to your live Chromium-family browser") print(" /browser disconnect — revert to default") print() else: print() print("Usage: /browser connect|disconnect|status") print() print(" connect Connect browser tools to your live Chromium-family browser session") print(" disconnect Revert to default browser backend") print(" status Show current browser mode") print() def _handle_goal_command(self, cmd: str) -> None: """Dispatch /goal subcommands: set / status / pause / resume / clear.""" from cli import _DIM, _RST, _cprint parts = (cmd or "").strip().split(None, 1) arg = parts[1].strip() if len(parts) > 1 else "" mgr = self._get_goal_manager() if mgr is None: _cprint(f" {_DIM}Goals unavailable (no active session).{_RST}") return lower = arg.lower() # Bare /goal or /goal status → show current state if not arg or lower == "status": _cprint(f" {mgr.status_line()}") return if lower == "pause": state = mgr.pause(reason="user-paused") if state is None: _cprint(f" {_DIM}No goal set.{_RST}") else: _cprint(f" ⏸ Goal paused: {state.goal}") return if lower == "resume": state = mgr.resume() if state is None: _cprint(f" {_DIM}No goal to resume.{_RST}") else: _cprint(f" ▶ Goal resumed: {state.goal}") _cprint( f" {_DIM}Send any message (or press Enter on an empty prompt " f"is a no-op; type 'continue' to kick it off).{_RST}" ) return if lower in {"clear", "stop", "done"}: had = mgr.has_goal() mgr.clear() if had: _cprint(" ✓ Goal cleared.") else: _cprint(f" {_DIM}No active goal.{_RST}") return # Otherwise treat the arg as the goal text. try: state = mgr.set(arg) except ValueError as exc: _cprint(f" Invalid goal: {exc}") return _cprint(f" ⊙ Goal set ({state.max_turns}-turn budget): {state.goal}") _cprint( f" {_DIM}After each turn, a judge model will check if the goal is done. " f"Hermes keeps working until it is, you pause/clear it, or the budget is " f"exhausted. Use /goal status, /goal pause, /goal resume, /goal clear.{_RST}" ) # Kick the loop off immediately so the user doesn't have to send a # separate message after setting the goal. try: self._pending_input.put(state.goal) except Exception: pass def _handle_subgoal_command(self, cmd: str) -> None: """Dispatch /subgoal subcommands. Forms: /subgoal show current subgoals /subgoal append a criterion /subgoal remove drop subgoal n (1-based) /subgoal clear wipe all subgoals Subgoals are extra criteria the user adds mid-loop. They get appended to both the judge prompt (verdict must consider them) and the continuation prompt (agent sees them) on the next turn boundary. No special kick — the running turn finishes, the next judge call includes them. """ from cli import _DIM, _RST, _cprint parts = (cmd or "").strip().split(None, 2) arg = " ".join(parts[1:]).strip() if len(parts) > 1 else "" mgr = self._get_goal_manager() if mgr is None: _cprint(f" {_DIM}Goals unavailable (no active session).{_RST}") return if not mgr.has_goal(): _cprint(f" {_DIM}No active goal. Set one with /goal .{_RST}") return # No args → list current subgoals. if not arg: _cprint(f" {mgr.status_line()}") _cprint(f" {mgr.render_subgoals()}") return tokens = arg.split(None, 1) verb = tokens[0].lower() rest = tokens[1].strip() if len(tokens) > 1 else "" if verb == "remove": if not rest: _cprint(" Usage: /subgoal remove ") return try: idx = int(rest.split()[0]) except ValueError: _cprint(" /subgoal remove: must be an integer (1-based index).") return try: removed = mgr.remove_subgoal(idx) except (IndexError, RuntimeError) as exc: _cprint(f" /subgoal remove: {exc}") return _cprint(f" ✓ Removed subgoal {idx}: {removed}") return if verb == "clear": try: prev = mgr.clear_subgoals() except RuntimeError as exc: _cprint(f" /subgoal clear: {exc}") return if prev: _cprint(f" ✓ Cleared {prev} subgoal{'s' if prev != 1 else ''}.") else: _cprint(f" {_DIM}No subgoals to clear.{_RST}") return # Otherwise — append the whole arg as a new subgoal. try: text = mgr.add_subgoal(arg) except (ValueError, RuntimeError) as exc: _cprint(f" /subgoal: {exc}") return idx = len(mgr.state.subgoals) if mgr.state else 0 _cprint(f" ✓ Added subgoal {idx}: {text}") def _handle_skin_command(self, cmd: str): """Handle /skin [name] — show or change the display skin.""" from cli import _ACCENT, save_config_value try: from hermes_cli.skin_engine import list_skins, set_active_skin, get_active_skin_name except ImportError: print("Skin engine not available.") return parts = cmd.strip().split(maxsplit=1) if len(parts) < 2 or not parts[1].strip(): # Show current skin and list available current = get_active_skin_name() skins = list_skins() print(f"\n Current skin: {current}") print(" Available skins:") for s in skins: marker = " ●" if s["name"] == current else " " source = f" ({s['source']})" if s["source"] == "user" else "" print(f" {marker} {s['name']}{source} — {s['description']}") print("\n Usage: /skin ") print(f" Custom skins: drop a YAML file in {display_hermes_home()}/skins/\n") return new_skin = parts[1].strip().lower() available = {s["name"] for s in list_skins()} if new_skin not in available: print(f" Unknown skin: {new_skin}") print(f" Available: {', '.join(sorted(available))}") return set_active_skin(new_skin) _ACCENT.reset() # Re-resolve ANSI color for the new skin # _DIM is now a fixed dim+italic ANSI escape (terminal-default fg) # so it doesn't need re-resolving on skin switch. if save_config_value("display.skin", new_skin): print(f" Skin set to: {new_skin} (saved)") else: print(f" Skin set to: {new_skin}") print(" Note: banner colors will update on next session start.") if self._apply_tui_skin_style(): print(" Prompt + TUI colors updated.") def _handle_footer_command(self, cmd_original: str) -> None: """Toggle or inspect ``display.runtime_footer.enabled`` from the CLI. Usage: /footer → toggle /footer on|off → explicit /footer status → show current state """ from cli import _cprint, save_config_value from hermes_cli.config import load_config from hermes_cli.colors import Colors as _Colors # Parse arg arg = "" try: parts = (cmd_original or "").strip().split(None, 1) if len(parts) > 1: arg = parts[1].strip().lower() except Exception: arg = "" cfg = load_config() or {} footer_cfg = ((cfg.get("display") or {}).get("runtime_footer") or {}) current = bool(footer_cfg.get("enabled", False)) fields = footer_cfg.get("fields") or ["model", "context_pct", "cwd"] if arg in {"status", "?"}: state = "ON" if current else "OFF" _cprint( f" {_Colors.BOLD}Runtime footer:{_Colors.RESET} {state}\n" f" Fields: {', '.join(fields)}" ) return if arg in {"on", "enable", "true", "1"}: new_state = True elif arg in {"off", "disable", "false", "0"}: new_state = False elif arg == "": new_state = not current else: _cprint(" Usage: /footer [on|off|status]") return if save_config_value("display.runtime_footer.enabled", new_state): state = ( f"{_Colors.GREEN}ON{_Colors.RESET}" if new_state else f"{_Colors.DIM}OFF{_Colors.RESET}" ) _cprint(f" Runtime footer: {state}") else: _cprint(" Failed to save runtime_footer setting to config.yaml") def _handle_reasoning_command(self, cmd: str): """Handle /reasoning — manage effort level and display toggle. Usage: /reasoning Show current effort level and display state /reasoning Set reasoning effort (none, minimal, low, medium, high, xhigh) /reasoning show|on Show model thinking/reasoning in output /reasoning hide|off Hide model thinking/reasoning from output """ from cli import _ACCENT, _DIM, _RST, _cprint, _parse_reasoning_config, save_config_value parts = cmd.strip().split(maxsplit=1) if len(parts) < 2: # Show current state rc = self.reasoning_config if rc is None: level = "medium (default)" elif rc.get("enabled") is False: level = "none (disabled)" else: level = rc.get("effort", "medium") display_state = "on ✓" if self.show_reasoning else "off" _cprint(f" {_ACCENT}Reasoning effort: {level}{_RST}") _cprint(f" {_ACCENT}Reasoning display: {display_state}{_RST}") _cprint(f" {_DIM}Usage: /reasoning {_RST}") return arg = parts[1].strip().lower() # Display toggle if arg in {"show", "on"}: self.show_reasoning = True if self.agent: self.agent.reasoning_callback = self._current_reasoning_callback() save_config_value("display.show_reasoning", True) _cprint(f" {_ACCENT}✓ Reasoning display: ON (saved){_RST}") _cprint(f" {_DIM} Model thinking will be shown during and after each response.{_RST}") return if arg in {"hide", "off"}: self.show_reasoning = False if self.agent: self.agent.reasoning_callback = self._current_reasoning_callback() save_config_value("display.show_reasoning", False) _cprint(f" {_ACCENT}✓ Reasoning display: OFF (saved){_RST}") return # Effort level change parsed = _parse_reasoning_config(arg) if parsed is None: _cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}") _cprint(f" {_DIM}Valid levels: none, minimal, low, medium, high, xhigh{_RST}") _cprint(f" {_DIM}Display: show, hide{_RST}") return self.reasoning_config = parsed self.agent = None # Force agent re-init with new reasoning config if save_config_value("agent.reasoning_effort", arg): _cprint(f" {_ACCENT}✓ Reasoning effort set to '{arg}' (saved to config){_RST}") else: _cprint(f" {_ACCENT}✓ Reasoning effort set to '{arg}' (session only){_RST}") def _handle_busy_command(self, cmd: str): """Handle /busy — control what Enter does while Hermes is working. Usage: /busy Show current busy input mode /busy status Show current busy input mode /busy queue Queue input for the next turn instead of interrupting /busy steer Inject Enter mid-run via /steer (after next tool call) /busy interrupt Interrupt the current run on Enter (default) """ from cli import _ACCENT, _DIM, _RST, _cprint, save_config_value parts = cmd.strip().split(maxsplit=1) if len(parts) < 2 or parts[1].strip().lower() == "status": _cprint(f" {_ACCENT}Busy input mode: {self.busy_input_mode}{_RST}") if self.busy_input_mode == "queue": _behavior = "queues for next turn" elif self.busy_input_mode == "steer": _behavior = "steers into current run (after next tool call)" else: _behavior = "interrupts current run" _cprint(f" {_DIM}Enter while busy: {_behavior}{_RST}") _cprint(f" {_DIM}Usage: /busy [queue|steer|interrupt|status]{_RST}") return arg = parts[1].strip().lower() if arg not in {"queue", "interrupt", "steer"}: _cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}") _cprint(f" {_DIM}Usage: /busy [queue|steer|interrupt|status]{_RST}") return self.busy_input_mode = arg if save_config_value("display.busy_input_mode", arg): if arg == "queue": behavior = "Enter will queue follow-up input while Hermes is busy." elif arg == "steer": behavior = "Enter will steer your message into the current run (after the next tool call)." else: behavior = "Enter will interrupt the current run while Hermes is busy." _cprint(f" {_ACCENT}✓ Busy input mode set to '{arg}' (saved to config){_RST}") _cprint(f" {_DIM}{behavior}{_RST}") else: _cprint(f" {_ACCENT}✓ Busy input mode set to '{arg}' (session only){_RST}") def _handle_fast_command(self, cmd: str): """Handle /fast — toggle fast mode (OpenAI Priority Processing / Anthropic Fast Mode).""" from cli import _ACCENT, _DIM, _RST, _cprint, save_config_value if not self._fast_command_available(): _cprint(" (._.) /fast is only available for models that support fast mode (OpenAI Priority Processing or Anthropic Fast Mode).") return # Determine the branding for the current model try: from hermes_cli.models import _is_anthropic_fast_model agent = getattr(self, "agent", None) model = getattr(agent, "model", None) or getattr(self, "model", None) feature_name = "Anthropic Fast Mode" if _is_anthropic_fast_model(model) else "Priority Processing" except Exception: feature_name = "Fast mode" parts = cmd.strip().split(maxsplit=1) if len(parts) < 2 or parts[1].strip().lower() == "status": status = "fast" if self.service_tier == "priority" else "normal" _cprint(f" {_ACCENT}{feature_name}: {status}{_RST}") _cprint(f" {_DIM}Usage: /fast [normal|fast|status]{_RST}") return arg = parts[1].strip().lower() if arg in {"fast", "on"}: self.service_tier = "priority" saved_value = "fast" label = "FAST" elif arg in {"normal", "off"}: self.service_tier = None saved_value = "normal" label = "NORMAL" else: _cprint(f" {_DIM}(._.) Unknown argument: {arg}{_RST}") _cprint(f" {_DIM}Usage: /fast [normal|fast|status]{_RST}") return self.agent = None # Force agent re-init with new service-tier config if save_config_value("agent.service_tier", saved_value): _cprint(f" {_ACCENT}✓ {feature_name} set to {label} (saved to config){_RST}") else: _cprint(f" {_ACCENT}✓ {feature_name} set to {label} (session only){_RST}") def _handle_debug_command(self): """Handle /debug — upload debug report + logs and print paste URLs.""" from hermes_cli.debug import run_debug_share from types import SimpleNamespace args = SimpleNamespace(lines=200, expire=7, local=False) run_debug_share(args) def _handle_update_command(self) -> bool: """Handle /update — update Hermes Agent to the latest version. In the classic CLI this exits the session and relaunches as ``hermes update`` so the user sees update output directly and gets the new version on next launch. Returns ``True`` when the update was confirmed (caller should trigger app exit so the relaunch is deferred to the main thread after prompt_toolkit cleans up terminal modes). Returns ``False`` / falsy when cancelled. """ from hermes_cli.config import is_managed, format_managed_message if is_managed(): print(f" ✗ {format_managed_message('update Hermes Agent')}") return False # Use the prompt_toolkit-native modal so the confirmation panel # renders properly above the composer and avoids raw input() races # with the prompt_toolkit event loop (same pattern as # _confirm_destructive_slash). choices = [ ("once", "Update Now", "exit the current session and update Hermes Agent"), ("cancel", "Cancel", "keep the current session"), ] raw = self._prompt_text_input_modal( title="⚕ Update Hermes Agent", detail="This will exit the current session and run `hermes update`.", choices=choices, ) if raw is None: print(" 🟡 /update cancelled.") return False choice = self._normalize_slash_confirm_choice(raw, choices) if choice != "once": print(" 🟡 /update cancelled.") return False print() print(" ⚕ Launching update...") print() # Store the relaunch args so run() can exec them from the main thread # after prompt_toolkit exits and restores terminal modes. Calling # relaunch() directly here (from the process_loop daemon thread) would # skip terminal cleanup on POSIX (execvp replaces the process mid-TUI) # and only exit the worker thread on Windows (subprocess.run + # sys.exit inside a non-main thread does not exit the process). self._pending_relaunch = ["update"] return True def _handle_voice_command(self, command: str): """Handle /voice [on|off|tts|status] command.""" from cli import _cprint parts = command.strip().split(maxsplit=1) subcommand = parts[1].lower().strip() if len(parts) > 1 else "" if subcommand == "on": self._enable_voice_mode() elif subcommand == "off": self._disable_voice_mode() elif subcommand == "tts": self._toggle_voice_tts() elif subcommand == "status": self._show_voice_status() elif subcommand == "": # Toggle if self._voice_mode: self._disable_voice_mode() else: self._enable_voice_mode() else: _cprint(f"Unknown voice subcommand: {subcommand}") _cprint("Usage: /voice [on|off|tts|status]")