mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
PR #9467 added a call to self._fuzzy_file_completions() inside _context_completions(), but the method was still decorated with @staticmethod and didn't receive self. Every @ mention in the input triggers 'name self is not defined' from prompt_toolkit's async completer, spamming the error on every keystroke. Fix: remove @staticmethod, add self parameter. The method already uses self._fuzzy_file_completions() and self._get_project_files() via that call chain, so it was never meant to stay static after the fuzzy search feature was added.
1217 lines
48 KiB
Python
1217 lines
48 KiB
Python
"""Slash command definitions and autocomplete for the Hermes CLI.
|
|
|
|
Central registry for all slash commands. Every consumer -- CLI help, gateway
|
|
dispatch, Telegram BotCommands, Slack subcommand mapping, autocomplete --
|
|
derives its data from ``COMMAND_REGISTRY``.
|
|
|
|
To add a command: add a ``CommandDef`` entry to ``COMMAND_REGISTRY``.
|
|
To add an alias: set ``aliases=("short",)`` on the existing ``CommandDef``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import time
|
|
from collections.abc import Callable, Mapping
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
# prompt_toolkit is an optional CLI dependency — only needed for
|
|
# SlashCommandCompleter and SlashCommandAutoSuggest. Gateway and test
|
|
# environments that lack it must still be able to import this module
|
|
# for resolve_command, gateway_help_lines, and COMMAND_REGISTRY.
|
|
try:
|
|
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
|
|
from prompt_toolkit.completion import Completer, Completion
|
|
except ImportError: # pragma: no cover
|
|
AutoSuggest = object # type: ignore[assignment,misc]
|
|
Completer = object # type: ignore[assignment,misc]
|
|
Suggestion = None # type: ignore[assignment]
|
|
Completion = None # type: ignore[assignment]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CommandDef dataclass
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass(frozen=True)
|
|
class CommandDef:
|
|
"""Definition of a single slash command."""
|
|
|
|
name: str # canonical name without slash: "background"
|
|
description: str # human-readable description
|
|
category: str # "Session", "Configuration", etc.
|
|
aliases: tuple[str, ...] = () # alternative names: ("bg",)
|
|
args_hint: str = "" # argument placeholder: "<prompt>", "[name]"
|
|
subcommands: tuple[str, ...] = () # tab-completable subcommands
|
|
cli_only: bool = False # only available in CLI
|
|
gateway_only: bool = False # only available in gateway/messaging
|
|
gateway_config_gate: str | None = None # config dotpath; when truthy, overrides cli_only for gateway
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Central registry -- single source of truth
|
|
# ---------------------------------------------------------------------------
|
|
|
|
COMMAND_REGISTRY: list[CommandDef] = [
|
|
# Session
|
|
CommandDef("new", "Start a new session (fresh session ID + history)", "Session",
|
|
aliases=("reset",)),
|
|
CommandDef("clear", "Clear screen and start a new session", "Session",
|
|
cli_only=True),
|
|
CommandDef("history", "Show conversation history", "Session",
|
|
cli_only=True),
|
|
CommandDef("save", "Save the current conversation", "Session",
|
|
cli_only=True),
|
|
CommandDef("retry", "Retry the last message (resend to agent)", "Session"),
|
|
CommandDef("undo", "Remove the last user/assistant exchange", "Session"),
|
|
CommandDef("title", "Set a title for the current session", "Session",
|
|
args_hint="[name]"),
|
|
CommandDef("branch", "Branch the current session (explore a different path)", "Session",
|
|
aliases=("fork",), args_hint="[name]"),
|
|
CommandDef("compress", "Manually compress conversation context", "Session",
|
|
args_hint="[focus topic]"),
|
|
CommandDef("rollback", "List or restore filesystem checkpoints", "Session",
|
|
args_hint="[number]"),
|
|
CommandDef("snapshot", "Create or restore state snapshots of Hermes config/state", "Session",
|
|
aliases=("snap",), args_hint="[create|restore <id>|prune]"),
|
|
CommandDef("stop", "Kill all running background processes", "Session"),
|
|
CommandDef("approve", "Approve a pending dangerous command", "Session",
|
|
gateway_only=True, args_hint="[session|always]"),
|
|
CommandDef("deny", "Deny a pending dangerous command", "Session",
|
|
gateway_only=True),
|
|
CommandDef("background", "Run a prompt in the background", "Session",
|
|
aliases=("bg",), args_hint="<prompt>"),
|
|
CommandDef("btw", "Ephemeral side question using session context (no tools, not persisted)", "Session",
|
|
args_hint="<question>"),
|
|
CommandDef("queue", "Queue a prompt for the next turn (doesn't interrupt)", "Session",
|
|
aliases=("q",), args_hint="<prompt>"),
|
|
CommandDef("status", "Show session info", "Session"),
|
|
CommandDef("profile", "Show active profile name and home directory", "Info"),
|
|
CommandDef("sethome", "Set this chat as the home channel", "Session",
|
|
gateway_only=True, aliases=("set-home",)),
|
|
CommandDef("resume", "Resume a previously-named session", "Session",
|
|
args_hint="[name]"),
|
|
|
|
# Configuration
|
|
CommandDef("config", "Show current configuration", "Configuration",
|
|
cli_only=True),
|
|
CommandDef("model", "Switch model for this session", "Configuration", args_hint="[model] [--global]"),
|
|
CommandDef("provider", "Show available providers and current provider",
|
|
"Configuration"),
|
|
|
|
CommandDef("personality", "Set a predefined personality", "Configuration",
|
|
args_hint="[name]"),
|
|
CommandDef("statusbar", "Toggle the context/model status bar", "Configuration",
|
|
cli_only=True, aliases=("sb",)),
|
|
CommandDef("verbose", "Cycle tool progress display: off -> new -> all -> verbose",
|
|
"Configuration", cli_only=True,
|
|
gateway_config_gate="display.tool_progress_command"),
|
|
CommandDef("yolo", "Toggle YOLO mode (skip all dangerous command approvals)",
|
|
"Configuration"),
|
|
CommandDef("reasoning", "Manage reasoning effort and display", "Configuration",
|
|
args_hint="[level|show|hide]",
|
|
subcommands=("none", "minimal", "low", "medium", "high", "xhigh", "show", "hide", "on", "off")),
|
|
CommandDef("fast", "Toggle fast mode — OpenAI Priority Processing / Anthropic Fast Mode (Normal/Fast)", "Configuration",
|
|
args_hint="[normal|fast|status]",
|
|
subcommands=("normal", "fast", "status", "on", "off")),
|
|
CommandDef("skin", "Show or change the display skin/theme", "Configuration",
|
|
cli_only=True, args_hint="[name]"),
|
|
CommandDef("voice", "Toggle voice mode", "Configuration",
|
|
args_hint="[on|off|tts|status]", subcommands=("on", "off", "tts", "status")),
|
|
|
|
# Tools & Skills
|
|
CommandDef("tools", "Manage tools: /tools [list|disable|enable] [name...]", "Tools & Skills",
|
|
args_hint="[list|disable|enable] [name...]", cli_only=True),
|
|
CommandDef("toolsets", "List available toolsets", "Tools & Skills",
|
|
cli_only=True),
|
|
CommandDef("skills", "Search, install, inspect, or manage skills",
|
|
"Tools & Skills", cli_only=True,
|
|
subcommands=("search", "browse", "inspect", "install")),
|
|
CommandDef("cron", "Manage scheduled tasks", "Tools & Skills",
|
|
cli_only=True, args_hint="[subcommand]",
|
|
subcommands=("list", "add", "create", "edit", "pause", "resume", "run", "remove")),
|
|
CommandDef("reload", "Reload .env variables into the running session", "Tools & Skills"),
|
|
CommandDef("reload-mcp", "Reload MCP servers from config", "Tools & Skills",
|
|
aliases=("reload_mcp",)),
|
|
CommandDef("browser", "Connect browser tools to your live Chrome via CDP", "Tools & Skills",
|
|
cli_only=True, args_hint="[connect|disconnect|status]",
|
|
subcommands=("connect", "disconnect", "status")),
|
|
CommandDef("plugins", "List installed plugins and their status",
|
|
"Tools & Skills", cli_only=True),
|
|
|
|
# Info
|
|
CommandDef("commands", "Browse all commands and skills (paginated)", "Info",
|
|
gateway_only=True, args_hint="[page]"),
|
|
CommandDef("help", "Show available commands", "Info"),
|
|
CommandDef("restart", "Gracefully restart the gateway after draining active runs", "Session",
|
|
gateway_only=True),
|
|
CommandDef("usage", "Show token usage and rate limits for the current session", "Info"),
|
|
CommandDef("insights", "Show usage insights and analytics", "Info",
|
|
args_hint="[days]"),
|
|
CommandDef("platforms", "Show gateway/messaging platform status", "Info",
|
|
cli_only=True, aliases=("gateway",)),
|
|
CommandDef("paste", "Check clipboard for an image and attach it", "Info",
|
|
cli_only=True),
|
|
CommandDef("image", "Attach a local image file for your next prompt", "Info",
|
|
cli_only=True, args_hint="<path>"),
|
|
CommandDef("update", "Update Hermes Agent to the latest version", "Info",
|
|
gateway_only=True),
|
|
CommandDef("debug", "Upload debug report (system info + logs) and get shareable links", "Info"),
|
|
|
|
# Exit
|
|
CommandDef("quit", "Exit the CLI", "Exit",
|
|
cli_only=True, aliases=("exit", "q")),
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Derived lookups -- rebuilt once at import time, refreshed by rebuild_lookups()
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _build_command_lookup() -> dict[str, CommandDef]:
|
|
"""Map every name and alias to its CommandDef."""
|
|
lookup: dict[str, CommandDef] = {}
|
|
for cmd in COMMAND_REGISTRY:
|
|
lookup[cmd.name] = cmd
|
|
for alias in cmd.aliases:
|
|
lookup[alias] = cmd
|
|
return lookup
|
|
|
|
|
|
_COMMAND_LOOKUP: dict[str, CommandDef] = _build_command_lookup()
|
|
|
|
|
|
def resolve_command(name: str) -> CommandDef | None:
|
|
"""Resolve a command name or alias to its CommandDef.
|
|
|
|
Accepts names with or without the leading slash.
|
|
"""
|
|
return _COMMAND_LOOKUP.get(name.lower().lstrip("/"))
|
|
|
|
|
|
def _build_description(cmd: CommandDef) -> str:
|
|
"""Build a CLI-facing description string including usage hint."""
|
|
if cmd.args_hint:
|
|
return f"{cmd.description} (usage: /{cmd.name} {cmd.args_hint})"
|
|
return cmd.description
|
|
|
|
|
|
# Backwards-compatible flat dict: "/command" -> description
|
|
COMMANDS: dict[str, str] = {}
|
|
for _cmd in COMMAND_REGISTRY:
|
|
if not _cmd.gateway_only:
|
|
COMMANDS[f"/{_cmd.name}"] = _build_description(_cmd)
|
|
for _alias in _cmd.aliases:
|
|
COMMANDS[f"/{_alias}"] = f"{_cmd.description} (alias for /{_cmd.name})"
|
|
|
|
# Backwards-compatible categorized dict
|
|
COMMANDS_BY_CATEGORY: dict[str, dict[str, str]] = {}
|
|
for _cmd in COMMAND_REGISTRY:
|
|
if not _cmd.gateway_only:
|
|
_cat = COMMANDS_BY_CATEGORY.setdefault(_cmd.category, {})
|
|
_cat[f"/{_cmd.name}"] = COMMANDS[f"/{_cmd.name}"]
|
|
for _alias in _cmd.aliases:
|
|
_cat[f"/{_alias}"] = COMMANDS[f"/{_alias}"]
|
|
|
|
|
|
# Subcommands lookup: "/cmd" -> ["sub1", "sub2", ...]
|
|
SUBCOMMANDS: dict[str, list[str]] = {}
|
|
for _cmd in COMMAND_REGISTRY:
|
|
if _cmd.subcommands:
|
|
SUBCOMMANDS[f"/{_cmd.name}"] = list(_cmd.subcommands)
|
|
|
|
# Also extract subcommands hinted in args_hint via pipe-separated patterns
|
|
# e.g. args_hint="[on|off|tts|status]" for commands that don't have explicit subcommands.
|
|
# NOTE: If a command already has explicit subcommands, this fallback is skipped.
|
|
# Use the `subcommands` field on CommandDef for intentional tab-completable args.
|
|
_PIPE_SUBS_RE = re.compile(r"[a-z]+(?:\|[a-z]+)+")
|
|
for _cmd in COMMAND_REGISTRY:
|
|
key = f"/{_cmd.name}"
|
|
if key in SUBCOMMANDS or not _cmd.args_hint:
|
|
continue
|
|
m = _PIPE_SUBS_RE.search(_cmd.args_hint)
|
|
if m:
|
|
SUBCOMMANDS[key] = m.group(0).split("|")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gateway helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Set of all command names + aliases recognized by the gateway.
|
|
# Includes config-gated commands so the gateway can dispatch them
|
|
# (the handler checks the config gate at runtime).
|
|
GATEWAY_KNOWN_COMMANDS: frozenset[str] = frozenset(
|
|
name
|
|
for cmd in COMMAND_REGISTRY
|
|
if not cmd.cli_only or cmd.gateway_config_gate
|
|
for name in (cmd.name, *cmd.aliases)
|
|
)
|
|
|
|
|
|
def _resolve_config_gates() -> set[str]:
|
|
"""Return canonical names of commands whose ``gateway_config_gate`` is truthy.
|
|
|
|
Reads ``config.yaml`` and walks the dot-separated key path for each
|
|
config-gated command. Returns an empty set on any error so callers
|
|
degrade gracefully.
|
|
"""
|
|
gated = [c for c in COMMAND_REGISTRY if c.gateway_config_gate]
|
|
if not gated:
|
|
return set()
|
|
try:
|
|
from hermes_cli.config import read_raw_config
|
|
cfg = read_raw_config()
|
|
except Exception:
|
|
return set()
|
|
result: set[str] = set()
|
|
for cmd in gated:
|
|
val: Any = cfg
|
|
for key in cmd.gateway_config_gate.split("."):
|
|
if isinstance(val, dict):
|
|
val = val.get(key)
|
|
else:
|
|
val = None
|
|
break
|
|
if val:
|
|
result.add(cmd.name)
|
|
return result
|
|
|
|
|
|
def _is_gateway_available(cmd: CommandDef, config_overrides: set[str] | None = None) -> bool:
|
|
"""Check if *cmd* should appear in gateway surfaces (help, menus, mappings).
|
|
|
|
Unconditionally available when ``cli_only`` is False. When ``cli_only``
|
|
is True but ``gateway_config_gate`` is set, the command is available only
|
|
when the config value is truthy. Pass *config_overrides* (from
|
|
``_resolve_config_gates()``) to avoid re-reading config for every command.
|
|
"""
|
|
if not cmd.cli_only:
|
|
return True
|
|
if cmd.gateway_config_gate:
|
|
overrides = config_overrides if config_overrides is not None else _resolve_config_gates()
|
|
return cmd.name in overrides
|
|
return False
|
|
|
|
|
|
def gateway_help_lines() -> list[str]:
|
|
"""Generate gateway help text lines from the registry."""
|
|
overrides = _resolve_config_gates()
|
|
lines: list[str] = []
|
|
for cmd in COMMAND_REGISTRY:
|
|
if not _is_gateway_available(cmd, overrides):
|
|
continue
|
|
args = f" {cmd.args_hint}" if cmd.args_hint else ""
|
|
alias_parts: list[str] = []
|
|
for a in cmd.aliases:
|
|
# Skip internal aliases like reload_mcp (underscore variant)
|
|
if a.replace("-", "_") == cmd.name.replace("-", "_") and a != cmd.name:
|
|
continue
|
|
alias_parts.append(f"`/{a}`")
|
|
alias_note = f" (alias: {', '.join(alias_parts)})" if alias_parts else ""
|
|
lines.append(f"`/{cmd.name}{args}` -- {cmd.description}{alias_note}")
|
|
return lines
|
|
|
|
|
|
def telegram_bot_commands() -> list[tuple[str, str]]:
|
|
"""Return (command_name, description) pairs for Telegram setMyCommands.
|
|
|
|
Telegram command names cannot contain hyphens, so they are replaced with
|
|
underscores. Aliases are skipped -- Telegram shows one menu entry per
|
|
canonical command.
|
|
"""
|
|
overrides = _resolve_config_gates()
|
|
result: list[tuple[str, str]] = []
|
|
for cmd in COMMAND_REGISTRY:
|
|
if not _is_gateway_available(cmd, overrides):
|
|
continue
|
|
tg_name = _sanitize_telegram_name(cmd.name)
|
|
if tg_name:
|
|
result.append((tg_name, cmd.description))
|
|
return result
|
|
|
|
|
|
_CMD_NAME_LIMIT = 32
|
|
"""Max command name length shared by Telegram and Discord."""
|
|
|
|
# Backward-compat alias — tests and external code may reference the old name.
|
|
_TG_NAME_LIMIT = _CMD_NAME_LIMIT
|
|
|
|
# Telegram Bot API allows only lowercase a-z, 0-9, and underscores in
|
|
# command names. This regex strips everything else after initial conversion.
|
|
_TG_INVALID_CHARS = re.compile(r"[^a-z0-9_]")
|
|
_TG_MULTI_UNDERSCORE = re.compile(r"_{2,}")
|
|
|
|
|
|
def _sanitize_telegram_name(raw: str) -> str:
|
|
"""Convert a command/skill/plugin name to a valid Telegram command name.
|
|
|
|
Telegram requires: 1-32 chars, lowercase a-z, digits 0-9, underscores only.
|
|
Steps: lowercase → replace hyphens with underscores → strip all other
|
|
invalid characters → collapse consecutive underscores → strip leading/
|
|
trailing underscores.
|
|
"""
|
|
name = raw.lower().replace("-", "_")
|
|
name = _TG_INVALID_CHARS.sub("", name)
|
|
name = _TG_MULTI_UNDERSCORE.sub("_", name)
|
|
return name.strip("_")
|
|
|
|
|
|
def _clamp_command_names(
|
|
entries: list[tuple[str, str]],
|
|
reserved: set[str],
|
|
) -> list[tuple[str, str]]:
|
|
"""Enforce 32-char command name limit with collision avoidance.
|
|
|
|
Both Telegram and Discord cap slash command names at 32 characters.
|
|
Names exceeding the limit are truncated. If truncation creates a duplicate
|
|
(against *reserved* names or earlier entries in the same batch), the name is
|
|
shortened to 31 chars and a digit ``0``-``9`` is appended to differentiate.
|
|
If all 10 digit slots are taken the entry is silently dropped.
|
|
"""
|
|
used: set[str] = set(reserved)
|
|
result: list[tuple[str, str]] = []
|
|
for name, desc in entries:
|
|
if len(name) > _CMD_NAME_LIMIT:
|
|
candidate = name[:_CMD_NAME_LIMIT]
|
|
if candidate in used:
|
|
prefix = name[:_CMD_NAME_LIMIT - 1]
|
|
for digit in range(10):
|
|
candidate = f"{prefix}{digit}"
|
|
if candidate not in used:
|
|
break
|
|
else:
|
|
# All 10 digit slots exhausted — skip entry
|
|
continue
|
|
name = candidate
|
|
if name in used:
|
|
continue
|
|
used.add(name)
|
|
result.append((name, desc))
|
|
return result
|
|
|
|
|
|
# Backward-compat alias.
|
|
_clamp_telegram_names = _clamp_command_names
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Shared skill/plugin collection for gateway platforms
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _collect_gateway_skill_entries(
|
|
platform: str,
|
|
max_slots: int,
|
|
reserved_names: set[str],
|
|
desc_limit: int = 100,
|
|
sanitize_name: "Callable[[str], str] | None" = None,
|
|
) -> tuple[list[tuple[str, str, str]], int]:
|
|
"""Collect plugin + skill entries for a gateway platform.
|
|
|
|
Priority order:
|
|
1. Plugin slash commands (take precedence over skills)
|
|
2. Built-in skill commands (fill remaining slots, alphabetical)
|
|
|
|
Only skills are trimmed when the cap is reached.
|
|
Hub-installed skills are excluded. Per-platform disabled skills are
|
|
excluded.
|
|
|
|
Args:
|
|
platform: Platform identifier for per-platform skill filtering
|
|
(``"telegram"``, ``"discord"``, etc.).
|
|
max_slots: Maximum number of entries to return (remaining slots after
|
|
built-in/core commands).
|
|
reserved_names: Names already taken by built-in commands. Mutated
|
|
in-place as new names are added.
|
|
desc_limit: Max description length (40 for Telegram, 100 for Discord).
|
|
sanitize_name: Optional name transform applied before clamping, e.g.
|
|
:func:`_sanitize_telegram_name` for Telegram. May return an
|
|
empty string to signal "skip this entry".
|
|
|
|
Returns:
|
|
``(entries, hidden_count)`` where *entries* is a list of
|
|
``(name, description, cmd_key)`` triples and *hidden_count* is the
|
|
number of skill entries dropped due to the cap. ``cmd_key`` is the
|
|
original ``/skill-name`` key from :func:`get_skill_commands`.
|
|
"""
|
|
all_entries: list[tuple[str, str, str]] = []
|
|
|
|
# --- Tier 1: Plugin slash commands (never trimmed) ---------------------
|
|
plugin_pairs: list[tuple[str, str]] = []
|
|
try:
|
|
from hermes_cli.plugins import get_plugin_manager
|
|
pm = get_plugin_manager()
|
|
plugin_cmds = getattr(pm, "_plugin_commands", {})
|
|
for cmd_name in sorted(plugin_cmds):
|
|
name = sanitize_name(cmd_name) if sanitize_name else cmd_name
|
|
if not name:
|
|
continue
|
|
desc = "Plugin command"
|
|
if len(desc) > desc_limit:
|
|
desc = desc[:desc_limit - 3] + "..."
|
|
plugin_pairs.append((name, desc))
|
|
except Exception:
|
|
pass
|
|
|
|
plugin_pairs = _clamp_command_names(plugin_pairs, reserved_names)
|
|
reserved_names.update(n for n, _ in plugin_pairs)
|
|
# Plugins have no cmd_key — use empty string as placeholder
|
|
for n, d in plugin_pairs:
|
|
all_entries.append((n, d, ""))
|
|
|
|
# --- Tier 2: Built-in skill commands (trimmed at cap) -----------------
|
|
_platform_disabled: set[str] = set()
|
|
try:
|
|
from agent.skill_utils import get_disabled_skill_names
|
|
_platform_disabled = get_disabled_skill_names(platform=platform)
|
|
except Exception:
|
|
pass
|
|
|
|
skill_triples: list[tuple[str, str, str]] = []
|
|
try:
|
|
from agent.skill_commands import get_skill_commands
|
|
from tools.skills_tool import SKILLS_DIR
|
|
_skills_dir = str(SKILLS_DIR.resolve())
|
|
_hub_dir = str((SKILLS_DIR / ".hub").resolve())
|
|
skill_cmds = get_skill_commands()
|
|
for cmd_key in sorted(skill_cmds):
|
|
info = skill_cmds[cmd_key]
|
|
skill_path = info.get("skill_md_path", "")
|
|
if not skill_path.startswith(_skills_dir):
|
|
continue
|
|
if skill_path.startswith(_hub_dir):
|
|
continue
|
|
skill_name = info.get("name", "")
|
|
if skill_name in _platform_disabled:
|
|
continue
|
|
raw_name = cmd_key.lstrip("/")
|
|
name = sanitize_name(raw_name) if sanitize_name else raw_name
|
|
if not name:
|
|
continue
|
|
desc = info.get("description", "")
|
|
if len(desc) > desc_limit:
|
|
desc = desc[:desc_limit - 3] + "..."
|
|
skill_triples.append((name, desc, cmd_key))
|
|
except Exception:
|
|
pass
|
|
|
|
# Clamp names; _clamp_command_names works on (name, desc) pairs so we
|
|
# need to zip/unzip.
|
|
skill_pairs = [(n, d) for n, d, _ in skill_triples]
|
|
key_by_pair = {(n, d): k for n, d, k in skill_triples}
|
|
skill_pairs = _clamp_command_names(skill_pairs, reserved_names)
|
|
|
|
# Skills fill remaining slots — only tier that gets trimmed
|
|
remaining = max(0, max_slots - len(all_entries))
|
|
hidden_count = max(0, len(skill_pairs) - remaining)
|
|
for n, d in skill_pairs[:remaining]:
|
|
all_entries.append((n, d, key_by_pair.get((n, d), "")))
|
|
|
|
return all_entries[:max_slots], hidden_count
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Platform-specific wrappers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def telegram_menu_commands(max_commands: int = 100) -> tuple[list[tuple[str, str]], int]:
|
|
"""Return Telegram menu commands capped to the Bot API limit.
|
|
|
|
Priority order (higher priority = never bumped by overflow):
|
|
1. Core CommandDef commands (always included)
|
|
2. Plugin slash commands (take precedence over skills)
|
|
3. Built-in skill commands (fill remaining slots, alphabetical)
|
|
|
|
Skills are the only tier that gets trimmed when the cap is hit.
|
|
User-installed hub skills are excluded — accessible via /skills.
|
|
Skills disabled for the ``"telegram"`` platform (via ``hermes skills
|
|
config``) are excluded from the menu entirely.
|
|
|
|
Returns:
|
|
(menu_commands, hidden_count) where hidden_count is the number of
|
|
skill commands omitted due to the cap.
|
|
"""
|
|
core_commands = list(telegram_bot_commands())
|
|
reserved_names = {n for n, _ in core_commands}
|
|
all_commands = list(core_commands)
|
|
|
|
remaining_slots = max(0, max_commands - len(all_commands))
|
|
entries, hidden_count = _collect_gateway_skill_entries(
|
|
platform="telegram",
|
|
max_slots=remaining_slots,
|
|
reserved_names=reserved_names,
|
|
desc_limit=40,
|
|
sanitize_name=_sanitize_telegram_name,
|
|
)
|
|
# Drop the cmd_key — Telegram only needs (name, desc) pairs.
|
|
all_commands.extend((n, d) for n, d, _k in entries)
|
|
return all_commands[:max_commands], hidden_count
|
|
|
|
|
|
def discord_skill_commands(
|
|
max_slots: int,
|
|
reserved_names: set[str],
|
|
) -> tuple[list[tuple[str, str, str]], int]:
|
|
"""Return skill entries for Discord slash command registration.
|
|
|
|
Same priority and filtering logic as :func:`telegram_menu_commands`
|
|
(plugins > skills, hub excluded, per-platform disabled excluded), but
|
|
adapted for Discord's constraints:
|
|
|
|
- Hyphens are allowed in names (no ``-`` → ``_`` sanitization)
|
|
- Descriptions capped at 100 chars (Discord's per-field max)
|
|
|
|
Args:
|
|
max_slots: Available command slots (100 minus existing built-in count).
|
|
reserved_names: Names of already-registered built-in commands.
|
|
|
|
Returns:
|
|
``(entries, hidden_count)`` where *entries* is a list of
|
|
``(discord_name, description, cmd_key)`` triples. ``cmd_key`` is
|
|
the original ``/skill-name`` key needed for the slash handler callback.
|
|
"""
|
|
return _collect_gateway_skill_entries(
|
|
platform="discord",
|
|
max_slots=max_slots,
|
|
reserved_names=set(reserved_names), # copy — don't mutate caller's set
|
|
desc_limit=100,
|
|
)
|
|
|
|
|
|
def discord_skill_commands_by_category(
|
|
reserved_names: set[str],
|
|
) -> tuple[dict[str, list[tuple[str, str, str]]], list[tuple[str, str, str]], int]:
|
|
"""Return skill entries organized by category for Discord ``/skill`` subcommand groups.
|
|
|
|
Skills whose directory is nested at least 2 levels under ``SKILLS_DIR``
|
|
(e.g. ``creative/ascii-art/SKILL.md``) are grouped by their top-level
|
|
category. Root-level skills (e.g. ``dogfood/SKILL.md``) are returned as
|
|
*uncategorized* — the caller should register them as direct subcommands
|
|
of the ``/skill`` group.
|
|
|
|
The same filtering as :func:`discord_skill_commands` is applied: hub
|
|
skills excluded, per-platform disabled excluded, names clamped.
|
|
|
|
Returns:
|
|
``(categories, uncategorized, hidden_count)``
|
|
|
|
- *categories*: ``{category_name: [(name, description, cmd_key), ...]}``
|
|
- *uncategorized*: ``[(name, description, cmd_key), ...]``
|
|
- *hidden_count*: skills dropped due to Discord group limits
|
|
(25 subcommand groups, 25 subcommands per group)
|
|
"""
|
|
from pathlib import Path as _P
|
|
|
|
_platform_disabled: set[str] = set()
|
|
try:
|
|
from agent.skill_utils import get_disabled_skill_names
|
|
_platform_disabled = get_disabled_skill_names(platform="discord")
|
|
except Exception:
|
|
pass
|
|
|
|
# Collect raw skill data --------------------------------------------------
|
|
categories: dict[str, list[tuple[str, str, str]]] = {}
|
|
uncategorized: list[tuple[str, str, str]] = []
|
|
_names_used: set[str] = set(reserved_names)
|
|
hidden = 0
|
|
|
|
try:
|
|
from agent.skill_commands import get_skill_commands
|
|
from tools.skills_tool import SKILLS_DIR
|
|
_skills_dir = SKILLS_DIR.resolve()
|
|
_hub_dir = (SKILLS_DIR / ".hub").resolve()
|
|
skill_cmds = get_skill_commands()
|
|
|
|
for cmd_key in sorted(skill_cmds):
|
|
info = skill_cmds[cmd_key]
|
|
skill_path = info.get("skill_md_path", "")
|
|
if not skill_path:
|
|
continue
|
|
sp = _P(skill_path).resolve()
|
|
# Skip skills outside SKILLS_DIR or from the hub
|
|
if not str(sp).startswith(str(_skills_dir)):
|
|
continue
|
|
if str(sp).startswith(str(_hub_dir)):
|
|
continue
|
|
|
|
skill_name = info.get("name", "")
|
|
if skill_name in _platform_disabled:
|
|
continue
|
|
|
|
raw_name = cmd_key.lstrip("/")
|
|
# Clamp to 32 chars (Discord limit)
|
|
discord_name = raw_name[:32]
|
|
if discord_name in _names_used:
|
|
continue
|
|
_names_used.add(discord_name)
|
|
|
|
desc = info.get("description", "")
|
|
if len(desc) > 100:
|
|
desc = desc[:97] + "..."
|
|
|
|
# Determine category from the relative path within SKILLS_DIR.
|
|
# e.g. creative/ascii-art/SKILL.md → parts = ("creative", "ascii-art")
|
|
try:
|
|
rel = sp.parent.relative_to(_skills_dir)
|
|
except ValueError:
|
|
continue
|
|
parts = rel.parts
|
|
if len(parts) >= 2:
|
|
cat = parts[0]
|
|
categories.setdefault(cat, []).append((discord_name, desc, cmd_key))
|
|
else:
|
|
uncategorized.append((discord_name, desc, cmd_key))
|
|
except Exception:
|
|
pass
|
|
|
|
# Enforce Discord limits: 25 subcommand groups, 25 subcommands each ------
|
|
_MAX_GROUPS = 25
|
|
_MAX_PER_GROUP = 25
|
|
|
|
trimmed_categories: dict[str, list[tuple[str, str, str]]] = {}
|
|
group_count = 0
|
|
for cat in sorted(categories):
|
|
if group_count >= _MAX_GROUPS:
|
|
hidden += len(categories[cat])
|
|
continue
|
|
entries = categories[cat][:_MAX_PER_GROUP]
|
|
hidden += max(0, len(categories[cat]) - _MAX_PER_GROUP)
|
|
trimmed_categories[cat] = entries
|
|
group_count += 1
|
|
|
|
# Uncategorized skills also count against the 25 top-level limit
|
|
remaining_slots = _MAX_GROUPS - group_count
|
|
if len(uncategorized) > remaining_slots:
|
|
hidden += len(uncategorized) - remaining_slots
|
|
uncategorized = uncategorized[:remaining_slots]
|
|
|
|
return trimmed_categories, uncategorized, hidden
|
|
|
|
|
|
def slack_subcommand_map() -> dict[str, str]:
|
|
"""Return subcommand -> /command mapping for Slack /hermes handler.
|
|
|
|
Maps both canonical names and aliases so /hermes bg do stuff works
|
|
the same as /hermes background do stuff.
|
|
"""
|
|
overrides = _resolve_config_gates()
|
|
mapping: dict[str, str] = {}
|
|
for cmd in COMMAND_REGISTRY:
|
|
if not _is_gateway_available(cmd, overrides):
|
|
continue
|
|
mapping[cmd.name] = f"/{cmd.name}"
|
|
for alias in cmd.aliases:
|
|
mapping[alias] = f"/{alias}"
|
|
return mapping
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Autocomplete
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class SlashCommandCompleter(Completer):
|
|
"""Autocomplete for built-in slash commands, subcommands, and skill commands."""
|
|
|
|
def __init__(
|
|
self,
|
|
skill_commands_provider: Callable[[], Mapping[str, dict[str, Any]]] | None = None,
|
|
command_filter: Callable[[str], bool] | None = None,
|
|
) -> None:
|
|
self._skill_commands_provider = skill_commands_provider
|
|
self._command_filter = command_filter
|
|
# Cached project file list for fuzzy @ completions
|
|
self._file_cache: list[str] = []
|
|
self._file_cache_time: float = 0.0
|
|
self._file_cache_cwd: str = ""
|
|
|
|
def _command_allowed(self, slash_command: str) -> bool:
|
|
if self._command_filter is None:
|
|
return True
|
|
try:
|
|
return bool(self._command_filter(slash_command))
|
|
except Exception:
|
|
return True
|
|
|
|
def _iter_skill_commands(self) -> Mapping[str, dict[str, Any]]:
|
|
if self._skill_commands_provider is None:
|
|
return {}
|
|
try:
|
|
return self._skill_commands_provider() or {}
|
|
except Exception:
|
|
return {}
|
|
|
|
@staticmethod
|
|
def _completion_text(cmd_name: str, word: str) -> str:
|
|
"""Return replacement text for a completion.
|
|
|
|
When the user has already typed the full command exactly (``/help``),
|
|
returning ``help`` would be a no-op and prompt_toolkit suppresses the
|
|
menu. Appending a trailing space keeps the dropdown visible and makes
|
|
backspacing retrigger it naturally.
|
|
"""
|
|
return f"{cmd_name} " if cmd_name == word else cmd_name
|
|
|
|
@staticmethod
|
|
def _extract_path_word(text: str) -> str | None:
|
|
"""Extract the current word if it looks like a file path.
|
|
|
|
Returns the path-like token under the cursor, or None if the
|
|
current word doesn't look like a path. A word is path-like when
|
|
it starts with ``./``, ``../``, ``~/``, ``/``, or contains a
|
|
``/`` separator (e.g. ``src/main.py``).
|
|
"""
|
|
if not text:
|
|
return None
|
|
# Walk backwards to find the start of the current "word".
|
|
# Words are delimited by spaces, but paths can contain almost anything.
|
|
i = len(text) - 1
|
|
while i >= 0 and text[i] != " ":
|
|
i -= 1
|
|
word = text[i + 1:]
|
|
if not word:
|
|
return None
|
|
# Only trigger path completion for path-like tokens
|
|
if word.startswith(("./", "../", "~/", "/")) or "/" in word:
|
|
return word
|
|
return None
|
|
|
|
@staticmethod
|
|
def _path_completions(word: str, limit: int = 30):
|
|
"""Yield Completion objects for file paths matching *word*."""
|
|
expanded = os.path.expanduser(word)
|
|
# Split into directory part and prefix to match inside it
|
|
if expanded.endswith("/"):
|
|
search_dir = expanded
|
|
prefix = ""
|
|
else:
|
|
search_dir = os.path.dirname(expanded) or "."
|
|
prefix = os.path.basename(expanded)
|
|
|
|
try:
|
|
entries = os.listdir(search_dir)
|
|
except OSError:
|
|
return
|
|
|
|
count = 0
|
|
prefix_lower = prefix.lower()
|
|
for entry in sorted(entries):
|
|
if prefix and not entry.lower().startswith(prefix_lower):
|
|
continue
|
|
if count >= limit:
|
|
break
|
|
|
|
full_path = os.path.join(search_dir, entry)
|
|
is_dir = os.path.isdir(full_path)
|
|
|
|
# Build the completion text (what replaces the typed word)
|
|
if word.startswith("~"):
|
|
display_path = "~/" + os.path.relpath(full_path, os.path.expanduser("~"))
|
|
elif os.path.isabs(word):
|
|
display_path = full_path
|
|
else:
|
|
# Keep relative
|
|
display_path = os.path.relpath(full_path)
|
|
|
|
if is_dir:
|
|
display_path += "/"
|
|
|
|
suffix = "/" if is_dir else ""
|
|
meta = "dir" if is_dir else _file_size_label(full_path)
|
|
|
|
yield Completion(
|
|
display_path,
|
|
start_position=-len(word),
|
|
display=entry + suffix,
|
|
display_meta=meta,
|
|
)
|
|
count += 1
|
|
|
|
@staticmethod
|
|
def _extract_context_word(text: str) -> str | None:
|
|
"""Extract a bare ``@`` token for context reference completions."""
|
|
if not text:
|
|
return None
|
|
# Walk backwards to find the start of the current word
|
|
i = len(text) - 1
|
|
while i >= 0 and text[i] != " ":
|
|
i -= 1
|
|
word = text[i + 1:]
|
|
if not word.startswith("@"):
|
|
return None
|
|
return word
|
|
|
|
def _context_completions(self, word: str, limit: int = 30):
|
|
"""Yield Claude Code-style @ context completions.
|
|
|
|
Bare ``@`` or ``@partial`` shows static references and matching
|
|
files/folders. ``@file:path`` and ``@folder:path`` are handled
|
|
by the existing path completion path.
|
|
"""
|
|
lowered = word.lower()
|
|
|
|
# Static context references
|
|
_STATIC_REFS = (
|
|
("@diff", "Git working tree diff"),
|
|
("@staged", "Git staged diff"),
|
|
("@file:", "Attach a file"),
|
|
("@folder:", "Attach a folder"),
|
|
("@git:", "Git log with diffs (e.g. @git:5)"),
|
|
("@url:", "Fetch web content"),
|
|
)
|
|
for candidate, meta in _STATIC_REFS:
|
|
if candidate.lower().startswith(lowered) and candidate.lower() != lowered:
|
|
yield Completion(
|
|
candidate,
|
|
start_position=-len(word),
|
|
display=candidate,
|
|
display_meta=meta,
|
|
)
|
|
|
|
# If the user typed @file: or @folder:, delegate to path completions
|
|
for prefix in ("@file:", "@folder:"):
|
|
if word.startswith(prefix):
|
|
path_part = word[len(prefix):] or "."
|
|
expanded = os.path.expanduser(path_part)
|
|
if expanded.endswith("/"):
|
|
search_dir, match_prefix = expanded, ""
|
|
else:
|
|
search_dir = os.path.dirname(expanded) or "."
|
|
match_prefix = os.path.basename(expanded)
|
|
|
|
try:
|
|
entries = os.listdir(search_dir)
|
|
except OSError:
|
|
return
|
|
|
|
count = 0
|
|
prefix_lower = match_prefix.lower()
|
|
for entry in sorted(entries):
|
|
if match_prefix and not entry.lower().startswith(prefix_lower):
|
|
continue
|
|
if count >= limit:
|
|
break
|
|
full_path = os.path.join(search_dir, entry)
|
|
is_dir = os.path.isdir(full_path)
|
|
display_path = os.path.relpath(full_path)
|
|
suffix = "/" if is_dir else ""
|
|
kind = "folder" if is_dir else "file"
|
|
meta = "dir" if is_dir else _file_size_label(full_path)
|
|
completion = f"@{kind}:{display_path}{suffix}"
|
|
yield Completion(
|
|
completion,
|
|
start_position=-len(word),
|
|
display=entry + suffix,
|
|
display_meta=meta,
|
|
)
|
|
count += 1
|
|
return
|
|
|
|
# Bare @ or @partial — fuzzy project-wide file search
|
|
query = word[1:] # strip the @
|
|
yield from self._fuzzy_file_completions(word, query, limit)
|
|
|
|
def _get_project_files(self) -> list[str]:
|
|
"""Return cached list of project files (refreshed every 5s)."""
|
|
cwd = os.getcwd()
|
|
now = time.monotonic()
|
|
if (
|
|
self._file_cache
|
|
and self._file_cache_cwd == cwd
|
|
and now - self._file_cache_time < 5.0
|
|
):
|
|
return self._file_cache
|
|
|
|
files: list[str] = []
|
|
# Try rg first (fast, respects .gitignore), then fd, then find.
|
|
for cmd in [
|
|
["rg", "--files", "--sortr=modified", cwd],
|
|
["rg", "--files", cwd],
|
|
["fd", "--type", "f", "--base-directory", cwd],
|
|
]:
|
|
tool = cmd[0]
|
|
if not shutil.which(tool):
|
|
continue
|
|
try:
|
|
proc = subprocess.run(
|
|
cmd, capture_output=True, text=True, timeout=2,
|
|
cwd=cwd,
|
|
)
|
|
if proc.returncode == 0 and proc.stdout.strip():
|
|
raw = proc.stdout.strip().split("\n")
|
|
# Store relative paths
|
|
for p in raw[:5000]:
|
|
rel = os.path.relpath(p, cwd) if os.path.isabs(p) else p
|
|
files.append(rel)
|
|
break
|
|
except (subprocess.TimeoutExpired, OSError):
|
|
continue
|
|
|
|
self._file_cache = files
|
|
self._file_cache_time = now
|
|
self._file_cache_cwd = cwd
|
|
return files
|
|
|
|
@staticmethod
|
|
def _score_path(filepath: str, query: str) -> int:
|
|
"""Score a file path against a fuzzy query. Higher = better match."""
|
|
if not query:
|
|
return 1 # show everything when query is empty
|
|
|
|
filename = os.path.basename(filepath)
|
|
lower_file = filename.lower()
|
|
lower_path = filepath.lower()
|
|
lower_q = query.lower()
|
|
|
|
# Exact filename match
|
|
if lower_file == lower_q:
|
|
return 100
|
|
# Filename starts with query
|
|
if lower_file.startswith(lower_q):
|
|
return 80
|
|
# Filename contains query as substring
|
|
if lower_q in lower_file:
|
|
return 60
|
|
# Full path contains query
|
|
if lower_q in lower_path:
|
|
return 40
|
|
# Initials / abbreviation match: e.g. "fo" matches "file_operations"
|
|
# Check if query chars appear in order in filename
|
|
qi = 0
|
|
for c in lower_file:
|
|
if qi < len(lower_q) and c == lower_q[qi]:
|
|
qi += 1
|
|
if qi == len(lower_q):
|
|
# Bonus if matches land on word boundaries (after _, -, /, .)
|
|
boundary_hits = 0
|
|
qi = 0
|
|
prev = "_" # treat start as boundary
|
|
for c in lower_file:
|
|
if qi < len(lower_q) and c == lower_q[qi]:
|
|
if prev in "_-./":
|
|
boundary_hits += 1
|
|
qi += 1
|
|
prev = c
|
|
if boundary_hits >= len(lower_q) * 0.5:
|
|
return 35
|
|
return 25
|
|
return 0
|
|
|
|
def _fuzzy_file_completions(self, word: str, query: str, limit: int = 20):
|
|
"""Yield fuzzy file completions for bare @query."""
|
|
files = self._get_project_files()
|
|
|
|
if not query:
|
|
# No query — show recently modified files (already sorted by mtime)
|
|
for fp in files[:limit]:
|
|
is_dir = fp.endswith("/")
|
|
filename = os.path.basename(fp)
|
|
kind = "folder" if is_dir else "file"
|
|
meta = "dir" if is_dir else _file_size_label(
|
|
os.path.join(os.getcwd(), fp)
|
|
)
|
|
yield Completion(
|
|
f"@{kind}:{fp}",
|
|
start_position=-len(word),
|
|
display=filename,
|
|
display_meta=meta,
|
|
)
|
|
return
|
|
|
|
# Score and rank
|
|
scored = []
|
|
for fp in files:
|
|
s = self._score_path(fp, query)
|
|
if s > 0:
|
|
scored.append((s, fp))
|
|
scored.sort(key=lambda x: (-x[0], x[1]))
|
|
|
|
for _, fp in scored[:limit]:
|
|
is_dir = fp.endswith("/")
|
|
filename = os.path.basename(fp)
|
|
kind = "folder" if is_dir else "file"
|
|
meta = "dir" if is_dir else _file_size_label(
|
|
os.path.join(os.getcwd(), fp)
|
|
)
|
|
yield Completion(
|
|
f"@{kind}:{fp}",
|
|
start_position=-len(word),
|
|
display=filename,
|
|
display_meta=f"{fp} {meta}" if meta else fp,
|
|
)
|
|
|
|
def _model_completions(self, sub_text: str, sub_lower: str):
|
|
"""Yield completions for /model from config aliases + built-in aliases."""
|
|
seen = set()
|
|
# Config-based direct aliases (preferred — include provider info)
|
|
try:
|
|
from hermes_cli.model_switch import (
|
|
_ensure_direct_aliases, DIRECT_ALIASES, MODEL_ALIASES,
|
|
)
|
|
_ensure_direct_aliases()
|
|
for name, da in DIRECT_ALIASES.items():
|
|
if name.startswith(sub_lower) and name != sub_lower:
|
|
seen.add(name)
|
|
yield Completion(
|
|
name,
|
|
start_position=-len(sub_text),
|
|
display=name,
|
|
display_meta=f"{da.model} ({da.provider})",
|
|
)
|
|
# Built-in catalog aliases not already covered
|
|
for name in sorted(MODEL_ALIASES.keys()):
|
|
if name in seen:
|
|
continue
|
|
if name.startswith(sub_lower) and name != sub_lower:
|
|
identity = MODEL_ALIASES[name]
|
|
yield Completion(
|
|
name,
|
|
start_position=-len(sub_text),
|
|
display=name,
|
|
display_meta=f"{identity.vendor}/{identity.family}",
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
def get_completions(self, document, complete_event):
|
|
text = document.text_before_cursor
|
|
if not text.startswith("/"):
|
|
# Try @ context completion (Claude Code-style)
|
|
ctx_word = self._extract_context_word(text)
|
|
if ctx_word is not None:
|
|
yield from self._context_completions(ctx_word)
|
|
return
|
|
# Try file path completion for non-slash input
|
|
path_word = self._extract_path_word(text)
|
|
if path_word is not None:
|
|
yield from self._path_completions(path_word)
|
|
return
|
|
|
|
# Check if we're completing a subcommand (base command already typed)
|
|
parts = text.split(maxsplit=1)
|
|
base_cmd = parts[0].lower()
|
|
if len(parts) > 1 or (len(parts) == 1 and text.endswith(" ")):
|
|
sub_text = parts[1] if len(parts) > 1 else ""
|
|
sub_lower = sub_text.lower()
|
|
|
|
# Dynamic model alias completions for /model
|
|
if " " not in sub_text and base_cmd == "/model":
|
|
yield from self._model_completions(sub_text, sub_lower)
|
|
return
|
|
|
|
# Static subcommand completions
|
|
if " " not in sub_text and base_cmd in SUBCOMMANDS and self._command_allowed(base_cmd):
|
|
for sub in SUBCOMMANDS[base_cmd]:
|
|
if sub.startswith(sub_lower) and sub != sub_lower:
|
|
yield Completion(
|
|
sub,
|
|
start_position=-len(sub_text),
|
|
display=sub,
|
|
)
|
|
return
|
|
|
|
word = text[1:]
|
|
|
|
for cmd, desc in COMMANDS.items():
|
|
if not self._command_allowed(cmd):
|
|
continue
|
|
cmd_name = cmd[1:]
|
|
if cmd_name.startswith(word):
|
|
yield Completion(
|
|
self._completion_text(cmd_name, word),
|
|
start_position=-len(word),
|
|
display=cmd,
|
|
display_meta=desc,
|
|
)
|
|
|
|
for cmd, info in self._iter_skill_commands().items():
|
|
cmd_name = cmd[1:]
|
|
if cmd_name.startswith(word):
|
|
description = str(info.get("description", "Skill command"))
|
|
short_desc = description[:50] + ("..." if len(description) > 50 else "")
|
|
yield Completion(
|
|
self._completion_text(cmd_name, word),
|
|
start_position=-len(word),
|
|
display=cmd,
|
|
display_meta=f"⚡ {short_desc}",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Inline auto-suggest (ghost text) for slash commands
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class SlashCommandAutoSuggest(AutoSuggest):
|
|
"""Inline ghost-text suggestions for slash commands and their subcommands.
|
|
|
|
Shows the rest of a command or subcommand in dim text as you type.
|
|
Falls back to history-based suggestions for non-slash input.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
history_suggest: AutoSuggest | None = None,
|
|
completer: SlashCommandCompleter | None = None,
|
|
) -> None:
|
|
self._history = history_suggest
|
|
self._completer = completer # Reuse its model cache
|
|
|
|
def get_suggestion(self, buffer, document):
|
|
text = document.text_before_cursor
|
|
|
|
# Only suggest for slash commands
|
|
if not text.startswith("/"):
|
|
# Fall back to history for regular text
|
|
if self._history:
|
|
return self._history.get_suggestion(buffer, document)
|
|
return None
|
|
|
|
parts = text.split(maxsplit=1)
|
|
base_cmd = parts[0].lower()
|
|
|
|
if len(parts) == 1 and not text.endswith(" "):
|
|
# Still typing the command name: /upd → suggest "ate"
|
|
word = text[1:].lower()
|
|
for cmd in COMMANDS:
|
|
if self._completer is not None and not self._completer._command_allowed(cmd):
|
|
continue
|
|
cmd_name = cmd[1:] # strip leading /
|
|
if cmd_name.startswith(word) and cmd_name != word:
|
|
return Suggestion(cmd_name[len(word):])
|
|
return None
|
|
|
|
# Command is complete — suggest subcommands or model names
|
|
sub_text = parts[1] if len(parts) > 1 else ""
|
|
sub_lower = sub_text.lower()
|
|
|
|
# Static subcommands
|
|
if self._completer is not None and not self._completer._command_allowed(base_cmd):
|
|
return None
|
|
if base_cmd in SUBCOMMANDS and SUBCOMMANDS[base_cmd]:
|
|
if " " not in sub_text:
|
|
for sub in SUBCOMMANDS[base_cmd]:
|
|
if sub.startswith(sub_lower) and sub != sub_lower:
|
|
return Suggestion(sub[len(sub_text):])
|
|
|
|
# Fall back to history
|
|
if self._history:
|
|
return self._history.get_suggestion(buffer, document)
|
|
return None
|
|
|
|
|
|
def _file_size_label(path: str) -> str:
|
|
"""Return a compact human-readable file size, or '' on error."""
|
|
try:
|
|
size = os.path.getsize(path)
|
|
except OSError:
|
|
return ""
|
|
if size < 1024:
|
|
return f"{size}B"
|
|
if size < 1024 * 1024:
|
|
return f"{size / 1024:.0f}K"
|
|
if size < 1024 * 1024 * 1024:
|
|
return f"{size / (1024 * 1024):.1f}M"
|
|
return f"{size / (1024 * 1024 * 1024):.1f}G"
|