mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
The in-session slash commands (/model, /reset, /usage, /compress, /voice, ...) — 42 _handle_*_command handlers, ~3,200 LOC — move out of gateway/run.py into a mixin GatewayRunner inherits. self._handle_*_command dispatch + all test references resolve unchanged via the MRO. Neutral deps (MessageEvent, EphemeralReply, Platform, t, cfg_get, atomic_*_write, account-usage helpers, stdlib) imported at the mixin top level. The ~10 run.py- internal helpers (_hermes_home, _load_gateway_config, _resolve_gateway_model, _AGENT_PENDING_SENTINEL, ...) imported lazily inside the handlers that need them to avoid an import cycle. gateway/run.py 19157 -> 15870 LOC; GatewayRunner direct methods 214 -> 172. Behavior-neutral: voice/update/model/compress command test suites pass; all 42 resolve to the mixin via MRO.
3393 lines
152 KiB
Python
3393 lines
152 KiB
Python
"""Gateway slash-command handlers for GatewayRunner.
|
|
|
|
Extracted from ``gateway/run.py`` (god-file decomposition Phase 3b). These are
|
|
the in-session slash commands (/model, /reset, /usage, /compress, ...) the
|
|
gateway dispatches from ``_handle_message``. There are 42 of them (~3,200 LOC);
|
|
lifting them into a mixin that ``GatewayRunner`` inherits keeps every
|
|
``self._handle_*_command`` dispatch + test reference working via the MRO, while
|
|
removing the bulk from run.py.
|
|
|
|
Module-level run.py helpers a handler needs (``_hermes_home``,
|
|
``_load_gateway_config``, ``_resolve_gateway_model``, etc.) are imported lazily
|
|
inside the handler body — a deferred ``from gateway.run import ...`` resolves at
|
|
call time (run.py fully loaded by then), avoiding an import cycle.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import dataclasses
|
|
import inspect
|
|
import logging
|
|
import os
|
|
import re
|
|
import shlex
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Union
|
|
|
|
from agent.account_usage import fetch_account_usage, render_account_usage_lines
|
|
from agent.i18n import t
|
|
from gateway.config import HomeChannel, Platform, PlatformConfig
|
|
from gateway.platforms.base import EphemeralReply, MessageEvent, MessageType
|
|
from gateway.session import build_session_key
|
|
from hermes_cli.config import cfg_get
|
|
from utils import (
|
|
atomic_json_write,
|
|
atomic_yaml_write,
|
|
base_url_host_matches,
|
|
is_truthy_value,
|
|
)
|
|
|
|
logger = logging.getLogger("gateway.run")
|
|
|
|
|
|
class GatewaySlashCommandsMixin:
|
|
"""In-session slash-command handlers for GatewayRunner."""
|
|
|
|
async def _handle_reset_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
|
|
"""Handle /new or /reset command."""
|
|
source = event.source
|
|
|
|
# Get existing session key
|
|
session_key = self._session_key_for_source(source)
|
|
self._invalidate_session_run_generation(session_key, reason="session_reset")
|
|
# Evict the running-agent slot now that the generation is bumped. The
|
|
# in-flight run's own guarded release (run_generation=old) will return
|
|
# False and leave its dead agent behind; clearing here keeps the slot
|
|
# from becoming a zombie that silently drops all later messages (#28686).
|
|
# Idempotent, so the run's finally calling it again is harmless.
|
|
self._release_running_agent_state(session_key)
|
|
|
|
# Snapshot the old entry so on_session_finalize can report the
|
|
# expiring session id before reset_session() rotates it.
|
|
old_entry = self.session_store._entries.get(session_key)
|
|
|
|
# Close tool resources on the old agent (terminal sandboxes, browser
|
|
# daemons, background processes) before evicting from cache.
|
|
# Guard with getattr because test fixtures may skip __init__.
|
|
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
|
if _cache_lock is not None:
|
|
with _cache_lock:
|
|
_cached = self._agent_cache.get(session_key)
|
|
_old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
|
|
if _old_agent is not None:
|
|
self._cleanup_agent_resources(_old_agent)
|
|
self._evict_cached_agent(session_key)
|
|
|
|
# Discard any /queue overflow for this session — /new is a
|
|
# conversation-boundary operation, queued follow-ups from the
|
|
# previous conversation must not bleed into the new one.
|
|
_qe = getattr(self, "_queued_events", None)
|
|
if _qe is not None:
|
|
_qe.pop(session_key, None)
|
|
|
|
try:
|
|
from tools.env_passthrough import clear_env_passthrough
|
|
clear_env_passthrough()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
from tools.credential_files import clear_credential_files
|
|
clear_credential_files()
|
|
except Exception:
|
|
pass
|
|
|
|
# Reset the session
|
|
new_entry = self.session_store.reset_session(session_key)
|
|
|
|
# Clear any session-scoped model/reasoning overrides so the next agent
|
|
# picks up configured defaults instead of previous session switches.
|
|
self._session_model_overrides.pop(session_key, None)
|
|
self._set_session_reasoning_override(session_key, None)
|
|
if hasattr(self, "_pending_model_notes"):
|
|
self._pending_model_notes.pop(session_key, None)
|
|
|
|
# Clear session-scoped dangerous-command approvals and /yolo state.
|
|
# /new is a conversation-boundary operation — approval state from the
|
|
# previous conversation must not survive the reset.
|
|
self._clear_session_boundary_security_state(session_key)
|
|
|
|
_old_sid = old_entry.session_id if old_entry else None
|
|
|
|
# Fire plugin on_session_finalize hook (session boundary)
|
|
try:
|
|
from hermes_cli.plugins import invoke_hook as _invoke_hook
|
|
_invoke_hook(
|
|
"on_session_finalize",
|
|
session_id=_old_sid,
|
|
platform=source.platform.value if source.platform else "",
|
|
reason="new_session",
|
|
old_session_id=_old_sid,
|
|
new_session_id=new_entry.session_id if new_entry else None,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Emit session:end hook (session is ending)
|
|
await self.hooks.emit("session:end", {
|
|
"platform": source.platform.value if source.platform else "",
|
|
"user_id": source.user_id,
|
|
"session_key": session_key,
|
|
})
|
|
|
|
# Emit session:reset hook
|
|
await self.hooks.emit("session:reset", {
|
|
"platform": source.platform.value if source.platform else "",
|
|
"user_id": source.user_id,
|
|
"session_key": session_key,
|
|
})
|
|
|
|
# Resolve session config info to surface to the user
|
|
try:
|
|
session_info = self._format_session_info()
|
|
except Exception:
|
|
session_info = ""
|
|
|
|
if new_entry:
|
|
header = self._telegram_topic_new_header(source) or t("gateway.reset.header_default")
|
|
else:
|
|
# No existing session, just create one
|
|
new_entry = self.session_store.get_or_create_session(source, force_new=True)
|
|
header = self._telegram_topic_new_header(source) or t("gateway.reset.header_new")
|
|
|
|
# Set session title if provided with /new <title>
|
|
_title_arg = event.get_command_args().strip()
|
|
_title_note = ""
|
|
if _title_arg and self._session_db and new_entry:
|
|
from hermes_state import SessionDB
|
|
try:
|
|
sanitized = SessionDB.sanitize_title(_title_arg)
|
|
except ValueError as e:
|
|
sanitized = None
|
|
_title_note = t("gateway.reset.title_rejected", error=str(e))
|
|
if sanitized:
|
|
try:
|
|
self._session_db.set_session_title(new_entry.session_id, sanitized)
|
|
header = t("gateway.reset.header_titled", title=sanitized)
|
|
except ValueError as e:
|
|
_title_note = t("gateway.reset.title_error_untitled", error=str(e))
|
|
except Exception:
|
|
pass
|
|
elif not _title_note:
|
|
# sanitize_title returned empty (whitespace-only / unprintable)
|
|
_title_note = t("gateway.reset.title_empty_untitled")
|
|
header = header + _title_note
|
|
|
|
# When /new runs inside a Telegram DM topic lane, rewrite the
|
|
# (chat_id, thread_id) → session_id binding so the next message
|
|
# uses the freshly-created session. Without this, the binding
|
|
# still points at the old session and the binding-lookup at the
|
|
# top of _handle_message_with_agent would switch right back.
|
|
if self._is_telegram_topic_lane(source) and new_entry is not None:
|
|
try:
|
|
self._record_telegram_topic_binding(source, new_entry)
|
|
except Exception:
|
|
logger.debug("Failed to rebind Telegram topic after /new", exc_info=True)
|
|
|
|
# Fire plugin on_session_reset hook (new session guaranteed to exist)
|
|
try:
|
|
from hermes_cli.plugins import invoke_hook as _invoke_hook
|
|
_new_sid = new_entry.session_id if new_entry else None
|
|
_invoke_hook(
|
|
"on_session_reset",
|
|
session_id=_new_sid,
|
|
platform=source.platform.value if source.platform else "",
|
|
reason="new_session",
|
|
old_session_id=_old_sid,
|
|
new_session_id=_new_sid,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Append a random tip to the reset message
|
|
try:
|
|
from hermes_cli.tips import get_random_tip
|
|
_tip_line = t("gateway.reset.tip", tip=get_random_tip())
|
|
except Exception:
|
|
_tip_line = ""
|
|
|
|
if session_info:
|
|
return EphemeralReply(f"{header}\n\n{session_info}{_tip_line}")
|
|
return EphemeralReply(f"{header}{_tip_line}")
|
|
|
|
async def _handle_profile_command(self, event: MessageEvent) -> str:
|
|
"""Handle /profile — show active profile name and home directory."""
|
|
from hermes_constants import display_hermes_home
|
|
from hermes_cli.profiles import get_active_profile_name
|
|
|
|
display = display_hermes_home()
|
|
profile_name = get_active_profile_name()
|
|
|
|
lines = [
|
|
t("gateway.profile.header", profile=profile_name),
|
|
t("gateway.profile.home", home=display),
|
|
]
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def _handle_whoami_command(self, event: MessageEvent) -> str:
|
|
"""Handle /whoami — show the user's slash command access on this scope.
|
|
|
|
Always works (it's in the always-allowed floor of slash_access).
|
|
Reports: platform, scope (DM vs group), the user's tier
|
|
(admin / user / unrestricted), and the slash commands they can
|
|
actually run on this scope.
|
|
"""
|
|
from gateway.slash_access import policy_for_source as _policy_for_source
|
|
|
|
source = event.source
|
|
policy = _policy_for_source(self.config, source)
|
|
platform = source.platform.value if source and source.platform else "?"
|
|
chat_type = (source.chat_type if source else "") or "dm"
|
|
scope = "DM" if chat_type.lower() in {"dm", "direct", "private", ""} else "group/channel"
|
|
user_id = (source.user_id if source else None) or "?"
|
|
|
|
if not policy.enabled:
|
|
return (
|
|
f"**You** — {platform} ({scope})\n"
|
|
f"User ID: `{user_id}`\n"
|
|
f"Tier: unrestricted (no admin list configured for this scope)\n"
|
|
f"Slash commands: all available"
|
|
)
|
|
|
|
if policy.is_admin(user_id):
|
|
return (
|
|
f"**You** — {platform} ({scope})\n"
|
|
f"User ID: `{user_id}`\n"
|
|
f"Tier: **admin**\n"
|
|
f"Slash commands: all available"
|
|
)
|
|
|
|
# Non-admin user. Show what's actually reachable.
|
|
floor = ["help", "whoami"] # mirrors slash_access._ALWAYS_ALLOWED_FOR_USERS
|
|
configured = sorted(policy.user_allowed_commands)
|
|
# Combine + dedupe, preserve order: floor first, then operator additions.
|
|
seen: set[str] = set()
|
|
runnable: list[str] = []
|
|
for c in floor + configured:
|
|
if c not in seen:
|
|
seen.add(c)
|
|
runnable.append(c)
|
|
runnable_str = ", ".join(f"/{c}" for c in runnable) if runnable else "(none)"
|
|
return (
|
|
f"**You** — {platform} ({scope})\n"
|
|
f"User ID: `{user_id}`\n"
|
|
f"Tier: user\n"
|
|
f"Slash commands you can run: {runnable_str}"
|
|
)
|
|
|
|
async def _handle_kanban_command(self, event: MessageEvent) -> str:
|
|
"""Handle /kanban — delegate to the shared kanban CLI.
|
|
|
|
Run the potentially-blocking DB work in a thread pool so the
|
|
gateway event loop stays responsive. Read operations (list,
|
|
show, context, tail) are permitted while an agent is running;
|
|
mutations are allowed too because the board is profile-agnostic
|
|
and does not touch the running agent's state.
|
|
|
|
For ``/kanban create`` invocations we also auto-subscribe the
|
|
originating gateway source (platform + chat + thread) to the new
|
|
task's terminal events, so the user hears back when the worker
|
|
completes / blocks / auto-blocks / crashes without having to poll.
|
|
"""
|
|
import asyncio
|
|
import re
|
|
import shlex
|
|
from hermes_cli.kanban import run_slash
|
|
|
|
text = (event.text or "").strip()
|
|
# Strip the leading "/kanban" (with or without slash), leaving args.
|
|
if text.startswith("/"):
|
|
text = text.lstrip("/")
|
|
if text.startswith("kanban"):
|
|
text = text[len("kanban"):].lstrip()
|
|
|
|
tokens = shlex.split(text) if text else []
|
|
requested_board = None
|
|
action = None
|
|
i = 0
|
|
while i < len(tokens):
|
|
tok = tokens[i]
|
|
if tok == "--board":
|
|
if i + 1 >= len(tokens):
|
|
break
|
|
requested_board = tokens[i + 1]
|
|
i += 2
|
|
continue
|
|
if tok.startswith("--board="):
|
|
requested_board = tok.split("=", 1)[1]
|
|
i += 1
|
|
continue
|
|
action = tok
|
|
break
|
|
|
|
is_create = action == "create"
|
|
|
|
try:
|
|
output = await asyncio.to_thread(run_slash, text)
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
return t("gateway.kanban.error_prefix", error=exc)
|
|
|
|
# Auto-subscribe on create. Parse the task id from the CLI's standard
|
|
# success line ("Created t_abcd (ready, assignee=...)"). If the user
|
|
# passed --json we don't subscribe; they're clearly scripting and
|
|
# can call /kanban notify-subscribe explicitly.
|
|
if is_create and output:
|
|
m = re.search(r"Created\s+(t_[0-9a-f]+)\b", output)
|
|
if m:
|
|
task_id = m.group(1)
|
|
try:
|
|
source = event.source
|
|
platform = getattr(source, "platform", None)
|
|
platform_str = (
|
|
platform.value if hasattr(platform, "value") else str(platform or "")
|
|
).lower()
|
|
chat_id = str(getattr(source, "chat_id", "") or "")
|
|
thread_id = str(getattr(source, "thread_id", "") or "")
|
|
user_id = str(getattr(source, "user_id", "") or "") or None
|
|
if platform_str and chat_id:
|
|
def _sub():
|
|
from hermes_cli import kanban_db as _kb
|
|
conn = _kb.connect(board=requested_board)
|
|
try:
|
|
_kb.add_notify_sub(
|
|
conn, task_id=task_id,
|
|
platform=platform_str, chat_id=chat_id,
|
|
thread_id=thread_id or None,
|
|
user_id=user_id,
|
|
notifier_profile=getattr(self, "_kanban_notifier_profile", None) or self._active_profile_name(),
|
|
)
|
|
finally:
|
|
conn.close()
|
|
await asyncio.to_thread(_sub)
|
|
output = (
|
|
output.rstrip()
|
|
+ "\n"
|
|
+ t("gateway.kanban.subscribed_suffix", task_id=task_id)
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("kanban create auto-subscribe failed: %s", exc)
|
|
|
|
# Gateway messages have practical length caps; truncate long
|
|
# listings to keep the UX reasonable.
|
|
if len(output) > 3800:
|
|
output = output[:3800] + "\n" + t("gateway.kanban.truncated_suffix")
|
|
return output or t("gateway.kanban.no_output")
|
|
|
|
async def _handle_status_command(self, event: MessageEvent) -> str:
|
|
"""Handle /status command."""
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
|
|
connected_platforms = [p.value for p in self.adapters.keys()]
|
|
|
|
# Check if there's an active agent
|
|
session_key = session_entry.session_key
|
|
is_running = session_key in self._running_agents
|
|
|
|
# Count pending /queue follow-ups (slot + overflow).
|
|
adapter = self.adapters.get(source.platform) if source else None
|
|
queue_depth = self._queue_depth(session_key, adapter=adapter)
|
|
|
|
title = None
|
|
# Pull token totals from the SQLite session DB rather than the
|
|
# in-memory SessionStore. The agent's per-turn token deltas are
|
|
# persisted into sessions_db (run_agent.py), not into SessionEntry,
|
|
# so session_entry.total_tokens is always 0. SessionDB is the
|
|
# single source of truth; reading it here keeps /status accurate
|
|
# without duplicating token writes into two stores.
|
|
db_total_tokens = 0
|
|
if self._session_db:
|
|
try:
|
|
title = self._session_db.get_session_title(session_entry.session_id)
|
|
except Exception:
|
|
title = None
|
|
try:
|
|
row = self._session_db.get_session(session_entry.session_id)
|
|
if row:
|
|
db_total_tokens = (
|
|
(row.get("input_tokens") or 0)
|
|
+ (row.get("output_tokens") or 0)
|
|
+ (row.get("cache_read_tokens") or 0)
|
|
+ (row.get("cache_write_tokens") or 0)
|
|
+ (row.get("reasoning_tokens") or 0)
|
|
)
|
|
except Exception:
|
|
db_total_tokens = 0
|
|
|
|
lines = [
|
|
t("gateway.status.header"),
|
|
"",
|
|
t("gateway.status.session_id", session_id=session_entry.session_id),
|
|
]
|
|
if title:
|
|
lines.append(t("gateway.status.title", title=title))
|
|
lines.extend([
|
|
t("gateway.status.created", timestamp=session_entry.created_at.strftime('%Y-%m-%d %H:%M')),
|
|
t("gateway.status.last_activity", timestamp=session_entry.updated_at.strftime('%Y-%m-%d %H:%M')),
|
|
t("gateway.status.tokens", tokens=f"{db_total_tokens:,}"),
|
|
t("gateway.status.agent_running", state=t("gateway.status.state_yes") if is_running else t("gateway.status.state_no")),
|
|
])
|
|
if queue_depth:
|
|
lines.append(t("gateway.status.queued", count=queue_depth))
|
|
lines.extend([
|
|
"",
|
|
t("gateway.status.platforms", platforms=', '.join(connected_platforms)),
|
|
])
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def _handle_agents_command(self, event: MessageEvent) -> str:
|
|
"""Handle /agents command - list active agents and running tasks."""
|
|
from gateway.run import _AGENT_PENDING_SENTINEL
|
|
from tools.process_registry import format_uptime_short, process_registry
|
|
|
|
now = time.time()
|
|
current_session_key = self._session_key_for_source(event.source)
|
|
|
|
running_agents: dict = getattr(self, "_running_agents", {}) or {}
|
|
running_started: dict = getattr(self, "_running_agents_ts", {}) or {}
|
|
|
|
agent_rows: list[dict] = []
|
|
for session_key, agent in running_agents.items():
|
|
started = float(running_started.get(session_key, now))
|
|
elapsed = max(0, int(now - started))
|
|
is_pending = agent is _AGENT_PENDING_SENTINEL
|
|
agent_rows.append(
|
|
{
|
|
"session_key": session_key,
|
|
"elapsed": elapsed,
|
|
"state": t("gateway.agents.state_starting") if is_pending else t("gateway.agents.state_running"),
|
|
"session_id": "" if is_pending else str(getattr(agent, "session_id", "") or ""),
|
|
"model": "" if is_pending else str(getattr(agent, "model", "") or ""),
|
|
}
|
|
)
|
|
|
|
agent_rows.sort(key=lambda row: row["elapsed"], reverse=True)
|
|
|
|
running_processes: list[dict] = []
|
|
try:
|
|
running_processes = [
|
|
p for p in process_registry.list_sessions()
|
|
if p.get("status") == "running"
|
|
]
|
|
except Exception:
|
|
running_processes = []
|
|
|
|
background_tasks = [
|
|
t for t in (getattr(self, "_background_tasks", set()) or set())
|
|
if hasattr(t, "done") and not t.done()
|
|
]
|
|
|
|
lines = [
|
|
t("gateway.agents.header"),
|
|
"",
|
|
t("gateway.agents.active_agents", count=len(agent_rows)),
|
|
]
|
|
|
|
if agent_rows:
|
|
for idx, row in enumerate(agent_rows[:12], 1):
|
|
current = t("gateway.agents.this_chat") if row["session_key"] == current_session_key else ""
|
|
sid = f" · `{row['session_id']}`" if row["session_id"] else ""
|
|
model = f" · `{row['model']}`" if row["model"] else ""
|
|
lines.append(
|
|
f"{idx}. `{row['session_key']}` · {row['state']} · "
|
|
f"{format_uptime_short(row['elapsed'])}{sid}{model}{current}"
|
|
)
|
|
if len(agent_rows) > 12:
|
|
lines.append(t("gateway.agents.more", count=len(agent_rows) - 12))
|
|
|
|
lines.extend(
|
|
[
|
|
"",
|
|
t("gateway.agents.running_processes", count=len(running_processes)),
|
|
]
|
|
)
|
|
if running_processes:
|
|
for proc in running_processes[:12]:
|
|
cmd = " ".join(str(proc.get("command", "")).split())
|
|
if len(cmd) > 90:
|
|
cmd = cmd[:87] + "..."
|
|
lines.append(
|
|
f"- `{proc.get('session_id', '?')}` · "
|
|
f"{format_uptime_short(int(proc.get('uptime_seconds', 0)))} · `{cmd}`"
|
|
)
|
|
if len(running_processes) > 12:
|
|
lines.append(t("gateway.agents.more", count=len(running_processes) - 12))
|
|
|
|
lines.extend(
|
|
[
|
|
"",
|
|
t("gateway.agents.async_jobs", count=len(background_tasks)),
|
|
]
|
|
)
|
|
|
|
if not agent_rows and not running_processes and not background_tasks:
|
|
lines.append("")
|
|
lines.append(t("gateway.agents.none"))
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def _handle_stop_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
|
|
"""Handle /stop command - interrupt a running agent.
|
|
|
|
When an agent is truly hung (blocked thread that never checks
|
|
_interrupt_requested), the early intercept in _handle_message()
|
|
handles /stop before this method is reached. This handler fires
|
|
only through normal command dispatch (no running agent) or as a
|
|
fallback. Force-clean the session lock in all cases for safety.
|
|
|
|
The session is preserved so the user can continue the conversation.
|
|
"""
|
|
from gateway.run import _AGENT_PENDING_SENTINEL, _INTERRUPT_REASON_STOP
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
session_key = session_entry.session_key
|
|
|
|
agent = self._running_agents.get(session_key)
|
|
if agent is _AGENT_PENDING_SENTINEL:
|
|
# Force-clean the sentinel so the session is unlocked.
|
|
await self._interrupt_and_clear_session(
|
|
session_key,
|
|
source,
|
|
interrupt_reason=_INTERRUPT_REASON_STOP,
|
|
invalidation_reason="stop_command_pending",
|
|
)
|
|
logger.info("STOP (pending) for session %s — sentinel cleared", session_key)
|
|
return EphemeralReply(t("gateway.stop.stopped_pending"))
|
|
if agent:
|
|
# Force-clean the session lock so a truly hung agent doesn't
|
|
# keep it locked forever.
|
|
await self._interrupt_and_clear_session(
|
|
session_key,
|
|
source,
|
|
interrupt_reason=_INTERRUPT_REASON_STOP,
|
|
invalidation_reason="stop_command_handler",
|
|
)
|
|
return EphemeralReply(t("gateway.stop.stopped"))
|
|
|
|
# No run under the caller's own session key. In a per-user thread
|
|
# (thread_sessions_per_user=True) each participant is isolated even
|
|
# inside one shared thread, so a run another user started lives under
|
|
# a different key. Authorized users should still be able to /stop it
|
|
# (#bernard-thread-stop). Fall back to interrupting any running
|
|
# agent(s) that share this thread, gated on authorization.
|
|
sibling_keys = self._sibling_thread_run_keys(source, session_key)
|
|
if sibling_keys and self._is_user_authorized(source):
|
|
for sibling_key in sibling_keys:
|
|
await self._interrupt_and_clear_session(
|
|
sibling_key,
|
|
source,
|
|
interrupt_reason=_INTERRUPT_REASON_STOP,
|
|
invalidation_reason="stop_command_thread_sibling",
|
|
)
|
|
logger.info(
|
|
"STOP (thread sibling) by %s — interrupted %d run(s) in thread: %s",
|
|
session_key,
|
|
len(sibling_keys),
|
|
", ".join(sibling_keys),
|
|
)
|
|
return EphemeralReply(t("gateway.stop.stopped"))
|
|
|
|
return t("gateway.stop.no_active")
|
|
|
|
async def _handle_platform_command(self, event: MessageEvent) -> str:
|
|
"""Handle ``/platform list|pause|resume [name]`` — surface and
|
|
manually control failed/paused gateway adapters.
|
|
|
|
Examples:
|
|
``/platform list`` — show connected + failed/paused platforms
|
|
``/platform pause whatsapp`` — stop the reconnect watcher hammering whatsapp
|
|
``/platform resume whatsapp`` — re-queue a paused platform for retry
|
|
"""
|
|
text = (getattr(event, "content", "") or "").strip()
|
|
# Strip the leading "/platform" (or "/PLATFORM") token if present
|
|
parts = text.split(maxsplit=2)
|
|
if parts and parts[0].lower().lstrip("/").startswith("platform"):
|
|
parts = parts[1:]
|
|
action = (parts[0] if parts else "list").lower()
|
|
target = parts[1].lower() if len(parts) > 1 else ""
|
|
|
|
# Resolve platform name (case-insensitive, value match)
|
|
def _resolve_platform(name: str):
|
|
if not name:
|
|
return None
|
|
for p in Platform.__members__.values():
|
|
if p.value.lower() == name:
|
|
return p
|
|
return None
|
|
|
|
if action == "list":
|
|
lines = ["**Gateway platforms**"]
|
|
connected = sorted(p.value for p in self.adapters.keys())
|
|
if connected:
|
|
lines.append("Connected: " + ", ".join(connected))
|
|
else:
|
|
lines.append("Connected: (none)")
|
|
failed = getattr(self, "_failed_platforms", {}) or {}
|
|
if failed:
|
|
for p, info in failed.items():
|
|
if info.get("paused"):
|
|
reason = info.get("pause_reason") or "paused"
|
|
lines.append(
|
|
f" · {p.value} — PAUSED ({reason}). "
|
|
f"Resume with `/platform resume {p.value}`."
|
|
)
|
|
else:
|
|
attempts = info.get("attempts", 0)
|
|
lines.append(
|
|
f" · {p.value} — retrying (attempt {attempts})"
|
|
)
|
|
else:
|
|
lines.append("Failed/paused: (none)")
|
|
return "\n".join(lines)
|
|
|
|
if action in {"pause", "resume"}:
|
|
if not target:
|
|
return f"Usage: /platform {action} <name>"
|
|
platform = _resolve_platform(target)
|
|
if platform is None:
|
|
return f"Unknown platform: {target}"
|
|
failed = getattr(self, "_failed_platforms", {}) or {}
|
|
if action == "pause":
|
|
if platform not in failed:
|
|
return (
|
|
f"{platform.value} is not in the retry queue "
|
|
f"(it's either connected or not enabled)."
|
|
)
|
|
if failed[platform].get("paused"):
|
|
return f"{platform.value} is already paused."
|
|
self._pause_failed_platform(platform, reason="paused via /platform pause")
|
|
return (
|
|
f"✓ {platform.value} paused. "
|
|
f"Resume with `/platform resume {platform.value}` or "
|
|
f"`hermes gateway restart` to reset."
|
|
)
|
|
# action == "resume"
|
|
if platform not in failed:
|
|
return (
|
|
f"{platform.value} is not in the retry queue — "
|
|
f"nothing to resume."
|
|
)
|
|
if not failed[platform].get("paused"):
|
|
return (
|
|
f"{platform.value} is already retrying — "
|
|
f"no resume needed."
|
|
)
|
|
self._resume_paused_platform(platform)
|
|
return f"✓ {platform.value} resumed — retrying on next watcher tick."
|
|
|
|
return (
|
|
"Usage: /platform <list|pause|resume> [name]\n"
|
|
" /platform list — show platform status\n"
|
|
" /platform pause <name> — stop retrying a failing platform\n"
|
|
" /platform resume <name> — re-queue a paused platform"
|
|
)
|
|
|
|
async def _handle_restart_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
|
|
"""Handle /restart command - drain active work, then restart the gateway."""
|
|
from gateway.run import _hermes_home
|
|
# Defensive idempotency check: if the previous gateway process
|
|
# recorded this same /restart (same platform + update_id) and the new
|
|
# process is seeing it *again*, this is a re-delivery caused by PTB's
|
|
# graceful-shutdown `get_updates` ACK failing on the way out ("Error
|
|
# while calling `get_updates` one more time to mark all fetched
|
|
# updates. Suppressing error to ensure graceful shutdown. When
|
|
# polling for updates is restarted, updates may be received twice."
|
|
# in gateway.log). Ignoring the stale redelivery prevents a
|
|
# self-perpetuating restart loop where every fresh gateway
|
|
# re-processes the same /restart command and immediately restarts
|
|
# again.
|
|
if self._is_stale_restart_redelivery(event):
|
|
logger.info(
|
|
"Ignoring redelivered /restart (platform=%s, update_id=%s) — "
|
|
"already processed by a previous gateway instance.",
|
|
event.source.platform.value if event.source and event.source.platform else "?",
|
|
event.platform_update_id,
|
|
)
|
|
return ""
|
|
|
|
if self._restart_requested or self._draining:
|
|
count = self._running_agent_count()
|
|
if count:
|
|
return t("gateway.draining", count=count)
|
|
return EphemeralReply(t("gateway.restart.in_progress"))
|
|
|
|
# Save the requester's routing info so the new gateway process can
|
|
# notify them once it comes back online.
|
|
try:
|
|
notify_data = {
|
|
"platform": event.source.platform.value if event.source.platform else None,
|
|
"chat_id": event.source.chat_id,
|
|
"chat_type": event.source.chat_type,
|
|
}
|
|
if event.source.thread_id:
|
|
notify_data["thread_id"] = event.source.thread_id
|
|
if event.message_id:
|
|
notify_data["message_id"] = event.message_id
|
|
if event.source is not None:
|
|
try:
|
|
self._restart_command_source = dataclasses.replace(
|
|
event.source,
|
|
message_id=str(event.message_id)
|
|
if event.message_id is not None
|
|
else event.source.message_id,
|
|
)
|
|
except Exception:
|
|
self._restart_command_source = event.source
|
|
atomic_json_write(
|
|
_hermes_home / ".restart_notify.json",
|
|
notify_data,
|
|
indent=None,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Failed to write restart notify file: %s", e)
|
|
|
|
# Record the triggering platform + update_id in a dedicated dedup
|
|
# marker. Unlike .restart_notify.json (which gets unlinked once the
|
|
# new gateway sends the "gateway restarted" notification), this
|
|
# marker persists so the new gateway can still detect a delayed
|
|
# /restart redelivery from Telegram. Overwritten on every /restart.
|
|
try:
|
|
dedup_data = {
|
|
"platform": event.source.platform.value if event.source.platform else None,
|
|
"requested_at": time.time(),
|
|
}
|
|
if event.platform_update_id is not None:
|
|
dedup_data["update_id"] = event.platform_update_id
|
|
atomic_json_write(
|
|
_hermes_home / ".restart_last_processed.json",
|
|
dedup_data,
|
|
indent=None,
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Failed to write restart dedup marker: %s", e)
|
|
|
|
active_agents = self._running_agent_count()
|
|
# When running under a service manager (systemd/launchd) or inside a
|
|
# Docker/Podman container, use the service restart path: exit with
|
|
# code 75 so the service manager / container restart policy restarts
|
|
# us. The detached subprocess approach (setsid + bash) doesn't work
|
|
# under systemd (KillMode=mixed kills the cgroup) or Docker (tini
|
|
# exits when the gateway dies, taking the detached helper with it).
|
|
_under_service = bool(os.environ.get("INVOCATION_ID")) # systemd sets this
|
|
_in_container = os.path.exists("/.dockerenv") or os.path.exists("/run/.containerenv")
|
|
if _under_service or _in_container:
|
|
self.request_restart(detached=False, via_service=True)
|
|
else:
|
|
self.request_restart(detached=True, via_service=False)
|
|
if active_agents:
|
|
return t("gateway.draining", count=active_agents)
|
|
return EphemeralReply(t("gateway.restart.restarting"))
|
|
|
|
async def _handle_version_command(self, event: MessageEvent) -> str:
|
|
"""Handle /version — show the running Hermes Agent version."""
|
|
from hermes_cli.banner import format_banner_version_label
|
|
|
|
return format_banner_version_label()
|
|
|
|
async def _handle_help_command(self, event: MessageEvent) -> str:
|
|
"""Handle /help command - list available commands."""
|
|
from gateway.run import _telegramize_command_mentions
|
|
from hermes_cli.commands import gateway_help_lines
|
|
lines = [
|
|
t("gateway.help.header"),
|
|
*gateway_help_lines(),
|
|
]
|
|
try:
|
|
from agent.skill_commands import get_skill_commands
|
|
skill_cmds = get_skill_commands()
|
|
if skill_cmds:
|
|
lines.append(t("gateway.help.skill_header", count=len(skill_cmds)))
|
|
# Show first 10, then point to /commands for the rest
|
|
sorted_cmds = sorted(skill_cmds)
|
|
for cmd in sorted_cmds[:10]:
|
|
lines.append(f"`{cmd}` — {skill_cmds[cmd]['description']}")
|
|
if len(sorted_cmds) > 10:
|
|
lines.append(t("gateway.help.more_use_commands", count=len(sorted_cmds) - 10))
|
|
except Exception:
|
|
pass
|
|
return _telegramize_command_mentions(
|
|
"\n".join(lines),
|
|
getattr(getattr(event, "source", None), "platform", None),
|
|
)
|
|
|
|
async def _handle_commands_command(self, event: MessageEvent) -> str:
|
|
from gateway.run import _telegramize_command_mentions
|
|
from hermes_cli.commands import gateway_help_lines
|
|
|
|
raw_args = event.get_command_args().strip()
|
|
if raw_args:
|
|
try:
|
|
requested_page = int(raw_args)
|
|
except ValueError:
|
|
return t("gateway.commands.usage")
|
|
else:
|
|
requested_page = 1
|
|
|
|
# Build combined entry list: built-in commands + skill commands
|
|
entries = list(gateway_help_lines())
|
|
try:
|
|
from agent.skill_commands import get_skill_commands
|
|
skill_cmds = get_skill_commands()
|
|
if skill_cmds:
|
|
entries.append("")
|
|
entries.append(t("gateway.commands.skill_header"))
|
|
for cmd in sorted(skill_cmds):
|
|
desc = skill_cmds[cmd].get("description", "").strip() or t("gateway.commands.default_desc")
|
|
entries.append(f"`{cmd}` — {desc}")
|
|
except Exception:
|
|
pass
|
|
|
|
if not entries:
|
|
return t("gateway.commands.none")
|
|
|
|
from gateway.config import Platform
|
|
page_size = 15 if event.source.platform == Platform.TELEGRAM else 20
|
|
total_pages = max(1, (len(entries) + page_size - 1) // page_size)
|
|
page = max(1, min(requested_page, total_pages))
|
|
start = (page - 1) * page_size
|
|
page_entries = entries[start:start + page_size]
|
|
|
|
lines = [
|
|
t("gateway.commands.header", total=len(entries), page=page, total_pages=total_pages),
|
|
"",
|
|
*page_entries,
|
|
]
|
|
if total_pages > 1:
|
|
nav_parts = []
|
|
if page > 1:
|
|
nav_parts.append(t("gateway.commands.nav_prev", page=page - 1))
|
|
if page < total_pages:
|
|
nav_parts.append(t("gateway.commands.nav_next", page=page + 1))
|
|
lines.extend(["", " | ".join(nav_parts)])
|
|
if page != requested_page:
|
|
lines.append(t("gateway.commands.out_of_range", requested=requested_page, page=page))
|
|
return _telegramize_command_mentions(
|
|
"\n".join(lines),
|
|
getattr(getattr(event, "source", None), "platform", None),
|
|
)
|
|
|
|
async def _handle_model_command(self, event: MessageEvent) -> Optional[str]:
|
|
"""Handle /model command — switch model for this session.
|
|
|
|
Supports:
|
|
/model — interactive picker (Telegram/Discord) or text list
|
|
/model <name> — switch for this session only
|
|
/model <name> --global — switch and persist to config.yaml
|
|
/model <name> --provider <provider> — switch provider + model
|
|
/model --provider <provider> — switch to provider, auto-detect model
|
|
"""
|
|
from gateway.run import _hermes_home, _load_gateway_config
|
|
import yaml
|
|
from hermes_cli.model_switch import (
|
|
switch_model as _switch_model, parse_model_flags,
|
|
list_authenticated_providers,
|
|
list_picker_providers,
|
|
)
|
|
from hermes_cli.providers import get_label
|
|
|
|
raw_args = event.get_command_args().strip()
|
|
|
|
# Parse --provider, --global, and --refresh flags
|
|
model_input, explicit_provider, persist_global, force_refresh = parse_model_flags(raw_args)
|
|
|
|
# --refresh: bust the disk cache so the picker shows live data.
|
|
if force_refresh:
|
|
try:
|
|
from hermes_cli.models import clear_provider_models_cache
|
|
clear_provider_models_cache()
|
|
except Exception:
|
|
pass
|
|
|
|
# Read current model/provider from config
|
|
current_model = ""
|
|
current_provider = "openrouter"
|
|
current_base_url = ""
|
|
current_api_key = ""
|
|
user_provs = None
|
|
custom_provs = None
|
|
config_path = _hermes_home / "config.yaml"
|
|
try:
|
|
cfg = _load_gateway_config()
|
|
if cfg:
|
|
model_cfg = cfg.get("model", {})
|
|
if isinstance(model_cfg, dict):
|
|
current_model = model_cfg.get("default", "")
|
|
current_provider = model_cfg.get("provider", current_provider)
|
|
current_base_url = model_cfg.get("base_url", "")
|
|
user_provs = cfg.get("providers")
|
|
try:
|
|
from hermes_cli.config import get_compatible_custom_providers
|
|
custom_provs = get_compatible_custom_providers(cfg)
|
|
except Exception:
|
|
custom_provs = cfg.get("custom_providers")
|
|
except Exception:
|
|
pass
|
|
|
|
# Check for session override
|
|
source = event.source
|
|
# Normalize the source the same way a normal message turn does
|
|
# (Telegram DM topic recovery) before deriving the override key, so
|
|
# the override is stored under the key the next message turn reads
|
|
# (#30479).
|
|
source = self._normalize_source_for_session_key(source)
|
|
session_key = self._session_key_for_source(source)
|
|
override = self._session_model_overrides.get(session_key, {})
|
|
if override:
|
|
current_model = override.get("model", current_model)
|
|
current_provider = override.get("provider", current_provider)
|
|
current_base_url = override.get("base_url", current_base_url)
|
|
current_api_key = override.get("api_key", current_api_key)
|
|
|
|
# No args: show interactive picker (Telegram/Discord) or text list
|
|
if not model_input and not explicit_provider:
|
|
# Try interactive picker if the platform supports it
|
|
adapter = self.adapters.get(source.platform)
|
|
has_picker = (
|
|
adapter is not None
|
|
and getattr(type(adapter), "send_model_picker", None) is not None
|
|
)
|
|
|
|
if has_picker:
|
|
try:
|
|
providers = list_picker_providers(
|
|
current_provider=current_provider,
|
|
current_base_url=current_base_url,
|
|
current_model=current_model,
|
|
user_providers=user_provs,
|
|
custom_providers=custom_provs,
|
|
max_models=50,
|
|
)
|
|
except Exception:
|
|
providers = []
|
|
|
|
if providers:
|
|
# Build a callback closure for when the user picks a model.
|
|
# Captures self + locals needed for the switch logic.
|
|
_self = self
|
|
_session_key = session_key
|
|
_cur_model = current_model
|
|
_cur_provider = current_provider
|
|
_cur_base_url = current_base_url
|
|
_cur_api_key = current_api_key
|
|
|
|
async def _on_model_selected(
|
|
_chat_id: str, model_id: str, provider_slug: str
|
|
) -> str:
|
|
"""Perform the model switch and return confirmation text."""
|
|
result = _switch_model(
|
|
raw_input=model_id,
|
|
current_provider=_cur_provider,
|
|
current_model=_cur_model,
|
|
current_base_url=_cur_base_url,
|
|
current_api_key=_cur_api_key,
|
|
is_global=False,
|
|
explicit_provider=provider_slug,
|
|
user_providers=user_provs,
|
|
custom_providers=custom_provs,
|
|
)
|
|
if not result.success:
|
|
return t("gateway.model.error_prefix", error=result.error_message)
|
|
|
|
# Update cached agent in-place
|
|
cached_entry = None
|
|
_cache_lock = getattr(_self, "_agent_cache_lock", None)
|
|
_cache = getattr(_self, "_agent_cache", None)
|
|
if _cache_lock and _cache is not None:
|
|
with _cache_lock:
|
|
cached_entry = _cache.get(_session_key)
|
|
if cached_entry and cached_entry[0] is not None:
|
|
try:
|
|
cached_entry[0].switch_model(
|
|
new_model=result.new_model,
|
|
new_provider=result.target_provider,
|
|
api_key=result.api_key,
|
|
base_url=result.base_url,
|
|
api_mode=result.api_mode,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Picker model switch failed for cached agent: %s", exc)
|
|
|
|
# Persist the new model to the session DB so the
|
|
# dashboard shows the updated model (#34850).
|
|
_sess_db = getattr(_self, "_session_db", None)
|
|
if _sess_db is not None:
|
|
try:
|
|
_sess_entry = _self.session_store.get_or_create_session(
|
|
event.source
|
|
)
|
|
_sess_db.update_session_model(
|
|
_sess_entry.session_id, result.new_model
|
|
)
|
|
except Exception as exc:
|
|
logger.debug(
|
|
"Failed to persist model switch to DB: %s", exc
|
|
)
|
|
|
|
# Store model note + session override
|
|
if not hasattr(_self, "_pending_model_notes"):
|
|
_self._pending_model_notes = {}
|
|
_self._pending_model_notes[_session_key] = (
|
|
f"[Note: model was just switched from {_cur_model} to {result.new_model} "
|
|
f"via {result.provider_label or result.target_provider}. "
|
|
f"Adjust your self-identification accordingly.]"
|
|
)
|
|
_self._session_model_overrides[_session_key] = {
|
|
"model": result.new_model,
|
|
"provider": result.target_provider,
|
|
"api_key": result.api_key,
|
|
"base_url": result.base_url,
|
|
"api_mode": result.api_mode,
|
|
}
|
|
|
|
# Evict cached agent so the next turn creates a fresh
|
|
# agent from the override rather than relying on the
|
|
# stale cache signature to trigger a rebuild.
|
|
_self._evict_cached_agent(_session_key)
|
|
|
|
# Build confirmation text
|
|
plabel = result.provider_label or result.target_provider
|
|
lines = [t("gateway.model.switched", model=result.new_model)]
|
|
lines.append(t("gateway.model.provider_label", provider=plabel))
|
|
mi = result.model_info
|
|
from hermes_cli.model_switch import resolve_display_context_length
|
|
_sw_config_ctx = None
|
|
try:
|
|
_sw_cfg = _load_gateway_config()
|
|
_sw_model_cfg = _sw_cfg.get("model", {})
|
|
if isinstance(_sw_model_cfg, dict):
|
|
_sw_raw = _sw_model_cfg.get("context_length")
|
|
if _sw_raw is not None:
|
|
_sw_config_ctx = int(_sw_raw)
|
|
except Exception:
|
|
pass
|
|
ctx = resolve_display_context_length(
|
|
result.new_model,
|
|
result.target_provider,
|
|
base_url=result.base_url or current_base_url or "",
|
|
api_key=result.api_key or current_api_key or "",
|
|
model_info=mi,
|
|
custom_providers=custom_provs,
|
|
config_context_length=_sw_config_ctx,
|
|
)
|
|
if ctx:
|
|
lines.append(t("gateway.model.context_label", tokens=f"{ctx:,}"))
|
|
if mi:
|
|
if mi.max_output:
|
|
lines.append(t("gateway.model.max_output_label", tokens=f"{mi.max_output:,}"))
|
|
if mi.has_cost_data():
|
|
lines.append(t("gateway.model.cost_label", cost=mi.format_cost()))
|
|
lines.append(t("gateway.model.capabilities_label", capabilities=mi.format_capabilities()))
|
|
lines.append(t("gateway.model.session_only_hint"))
|
|
return "\n".join(lines)
|
|
|
|
metadata = self._thread_metadata_for_source(source, self._reply_anchor_for_event(event))
|
|
result = await adapter.send_model_picker(
|
|
chat_id=source.chat_id,
|
|
providers=providers,
|
|
current_model=current_model,
|
|
current_provider=current_provider,
|
|
session_key=session_key,
|
|
on_model_selected=_on_model_selected,
|
|
metadata=metadata,
|
|
)
|
|
if result.success:
|
|
return None # Picker sent — adapter handles the response
|
|
|
|
# Fallback: text list (for platforms without picker or if picker failed)
|
|
provider_label = get_label(current_provider)
|
|
lines = [t("gateway.model.current_label", model=current_model or "unknown", provider=provider_label), ""]
|
|
|
|
try:
|
|
providers = list_authenticated_providers(
|
|
current_provider=current_provider,
|
|
current_base_url=current_base_url,
|
|
current_model=current_model,
|
|
user_providers=user_provs,
|
|
custom_providers=custom_provs,
|
|
max_models=5,
|
|
)
|
|
for p in providers:
|
|
tag = t("gateway.model.current_tag") if p["is_current"] else ""
|
|
lines.append(f"**{p['name']}** `--provider {p['slug']}`{tag}:")
|
|
if p["models"]:
|
|
model_strs = ", ".join(f"`{m}`" for m in p["models"])
|
|
extra = t("gateway.model.more_models_suffix", count=p["total_models"] - len(p["models"])) if p["total_models"] > len(p["models"]) else ""
|
|
lines.append(f" {model_strs}{extra}")
|
|
elif p.get("api_url"):
|
|
lines.append(f" `{p['api_url']}`")
|
|
lines.append("")
|
|
except Exception:
|
|
pass
|
|
|
|
lines.append(t("gateway.model.usage_switch_model"))
|
|
lines.append(t("gateway.model.usage_switch_provider"))
|
|
lines.append(t("gateway.model.usage_persist"))
|
|
return "\n".join(lines)
|
|
|
|
# Perform the switch
|
|
result = _switch_model(
|
|
raw_input=model_input,
|
|
current_provider=current_provider,
|
|
current_model=current_model,
|
|
current_base_url=current_base_url,
|
|
current_api_key=current_api_key,
|
|
is_global=persist_global,
|
|
explicit_provider=explicit_provider,
|
|
user_providers=user_provs,
|
|
custom_providers=custom_provs,
|
|
)
|
|
|
|
if not result.success:
|
|
return t("gateway.model.error_prefix", error=result.error_message)
|
|
|
|
# If there's a cached agent, update it in-place
|
|
cached_entry = None
|
|
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
|
_cache = getattr(self, "_agent_cache", None)
|
|
if _cache_lock and _cache is not None:
|
|
with _cache_lock:
|
|
cached_entry = _cache.get(session_key)
|
|
|
|
if cached_entry and cached_entry[0] is not None:
|
|
try:
|
|
cached_entry[0].switch_model(
|
|
new_model=result.new_model,
|
|
new_provider=result.target_provider,
|
|
api_key=result.api_key,
|
|
base_url=result.base_url,
|
|
api_mode=result.api_mode,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("In-place model switch failed for cached agent: %s", exc)
|
|
|
|
# Persist the new model to the session DB so the dashboard
|
|
# shows the updated model (#34850).
|
|
_sess_db = getattr(self, "_session_db", None)
|
|
if _sess_db is not None:
|
|
try:
|
|
_sess_entry = self.session_store.get_or_create_session(source)
|
|
_sess_db.update_session_model(
|
|
_sess_entry.session_id, result.new_model
|
|
)
|
|
except Exception as exc:
|
|
logger.debug(
|
|
"Failed to persist model switch to DB: %s", exc
|
|
)
|
|
|
|
# Store a note to prepend to the next user message so the model
|
|
# knows about the switch (avoids system messages mid-history).
|
|
if not hasattr(self, "_pending_model_notes"):
|
|
self._pending_model_notes = {}
|
|
self._pending_model_notes[session_key] = (
|
|
f"[Note: model was just switched from {current_model} to {result.new_model} "
|
|
f"via {result.provider_label or result.target_provider}. "
|
|
f"Adjust your self-identification accordingly.]"
|
|
)
|
|
|
|
# Store session override so next agent creation uses the new model
|
|
self._session_model_overrides[session_key] = {
|
|
"model": result.new_model,
|
|
"provider": result.target_provider,
|
|
"api_key": result.api_key,
|
|
"base_url": result.base_url,
|
|
"api_mode": result.api_mode,
|
|
}
|
|
|
|
# Evict cached agent so the next turn creates a fresh agent from the
|
|
# override rather than relying on cache signature mismatch detection.
|
|
self._evict_cached_agent(session_key)
|
|
|
|
# Persist to config if --global
|
|
if persist_global:
|
|
try:
|
|
if config_path.exists():
|
|
with open(config_path, encoding="utf-8") as f:
|
|
cfg = yaml.safe_load(f) or {}
|
|
else:
|
|
cfg = {}
|
|
# Coerce scalar/None ``model:`` into a dict before mutation —
|
|
# otherwise ``cfg.setdefault("model", {})`` returns the existing
|
|
# scalar and the next assignment raises
|
|
# ``TypeError: 'str' object does not support item assignment``.
|
|
# Reproduces when ``config.yaml`` has ``model: <name>`` (flat
|
|
# string) instead of the proper nested ``model: {default: ...}``.
|
|
raw_model = cfg.get("model")
|
|
if isinstance(raw_model, dict):
|
|
model_cfg = raw_model
|
|
elif isinstance(raw_model, str) and raw_model.strip():
|
|
model_cfg = {"default": raw_model.strip()}
|
|
cfg["model"] = model_cfg
|
|
else:
|
|
model_cfg = {}
|
|
cfg["model"] = model_cfg
|
|
model_cfg["default"] = result.new_model
|
|
model_cfg["provider"] = result.target_provider
|
|
if result.base_url:
|
|
model_cfg["base_url"] = result.base_url
|
|
from hermes_cli.config import save_config
|
|
save_config(cfg)
|
|
except Exception as e:
|
|
logger.warning("Failed to persist model switch: %s", e)
|
|
|
|
# Build confirmation message with full metadata
|
|
provider_label = result.provider_label or result.target_provider
|
|
lines = [t("gateway.model.switched", model=result.new_model)]
|
|
lines.append(t("gateway.model.provider_label", provider=provider_label))
|
|
|
|
# Context: always resolve via the provider-aware chain so Codex OAuth,
|
|
# Copilot, and Nous-enforced caps win over the raw models.dev entry.
|
|
mi = result.model_info
|
|
from hermes_cli.model_switch import resolve_display_context_length
|
|
_sw2_config_ctx = None
|
|
try:
|
|
_sw2_cfg = _load_gateway_config()
|
|
_sw2_model_cfg = _sw2_cfg.get("model", {})
|
|
if isinstance(_sw2_model_cfg, dict):
|
|
_sw2_raw = _sw2_model_cfg.get("context_length")
|
|
if _sw2_raw is not None:
|
|
_sw2_config_ctx = int(_sw2_raw)
|
|
except Exception:
|
|
pass
|
|
ctx = resolve_display_context_length(
|
|
result.new_model,
|
|
result.target_provider,
|
|
base_url=result.base_url or current_base_url or "",
|
|
api_key=result.api_key or current_api_key or "",
|
|
model_info=mi,
|
|
custom_providers=custom_provs,
|
|
config_context_length=_sw2_config_ctx,
|
|
)
|
|
if ctx:
|
|
lines.append(t("gateway.model.context_label", tokens=f"{ctx:,}"))
|
|
if mi:
|
|
if mi.max_output:
|
|
lines.append(t("gateway.model.max_output_label", tokens=f"{mi.max_output:,}"))
|
|
if mi.has_cost_data():
|
|
lines.append(t("gateway.model.cost_label", cost=mi.format_cost()))
|
|
lines.append(t("gateway.model.capabilities_label", capabilities=mi.format_capabilities()))
|
|
|
|
# Cache notice
|
|
cache_enabled = (
|
|
(base_url_host_matches(result.base_url or "", "openrouter.ai") and "claude" in result.new_model.lower())
|
|
or result.api_mode == "anthropic_messages"
|
|
)
|
|
if cache_enabled:
|
|
lines.append(t("gateway.model.prompt_caching_enabled"))
|
|
|
|
if result.warning_message:
|
|
lines.append(t("gateway.model.warning_prefix", warning=result.warning_message))
|
|
|
|
if persist_global:
|
|
lines.append(t("gateway.model.saved_global"))
|
|
else:
|
|
lines.append(t("gateway.model.session_only_hint"))
|
|
|
|
return "\n".join(lines)
|
|
|
|
async def _handle_codex_runtime_command(self, event: MessageEvent) -> str:
|
|
"""Handle /codex-runtime command in the gateway.
|
|
|
|
Same surface as the CLI handler in cli.py:
|
|
/codex-runtime — show current state
|
|
/codex-runtime auto — Hermes default runtime
|
|
/codex-runtime codex_app_server — codex subprocess runtime
|
|
/codex-runtime on / off — synonyms
|
|
|
|
On change, the cached agent for this session is evicted so the next
|
|
message creates a fresh AIAgent with the new api_mode wired in
|
|
(avoids prompt-cache invalidation mid-session)."""
|
|
from hermes_cli import codex_runtime_switch as crs
|
|
|
|
raw_args = event.get_command_args().strip() if event else ""
|
|
new_value, errors = crs.parse_args(raw_args)
|
|
if errors:
|
|
return "❌ " + "\n❌ ".join(errors)
|
|
|
|
# Load + persist via the same helpers used for /model and /yolo
|
|
try:
|
|
from hermes_cli.config import load_config, save_config
|
|
except Exception as exc:
|
|
return f"❌ Could not load config: {exc}"
|
|
cfg = load_config()
|
|
|
|
result = crs.apply(
|
|
cfg,
|
|
new_value,
|
|
persist_callback=(save_config if new_value is not None else None),
|
|
)
|
|
|
|
# On a real change, evict the cached agent so the new runtime takes
|
|
# effect on the next message rather than waiting for cache TTL.
|
|
if result.success and new_value is not None and result.requires_new_session:
|
|
try:
|
|
session_key = self._session_key_for_source(event.source)
|
|
self._evict_cached_agent(session_key)
|
|
except Exception:
|
|
logger.debug("could not evict cached agent after codex-runtime change",
|
|
exc_info=True)
|
|
|
|
prefix = "✓" if result.success else "✗"
|
|
return f"{prefix} {result.message}"
|
|
|
|
async def _handle_personality_command(self, event: MessageEvent) -> str:
|
|
"""Handle /personality command - list or set a personality."""
|
|
from gateway.run import _hermes_home, _load_gateway_config
|
|
from hermes_constants import display_hermes_home
|
|
|
|
args = event.get_command_args().strip().lower()
|
|
config_path = _hermes_home / 'config.yaml'
|
|
|
|
try:
|
|
config = _load_gateway_config()
|
|
personalities = cfg_get(config, "agent", "personalities", default={})
|
|
except Exception:
|
|
config = {}
|
|
personalities = {}
|
|
|
|
if not personalities:
|
|
return t("gateway.personality.none_configured", path=display_hermes_home())
|
|
|
|
if not args:
|
|
lines = [t("gateway.personality.header")]
|
|
lines.append(t("gateway.personality.none_option"))
|
|
for name, prompt in personalities.items():
|
|
if isinstance(prompt, dict):
|
|
preview = prompt.get("description") or prompt.get("system_prompt", "")[:50]
|
|
else:
|
|
preview = prompt[:50] + "..." if len(prompt) > 50 else prompt
|
|
lines.append(t("gateway.personality.item", name=name, preview=preview))
|
|
lines.append(t("gateway.personality.usage"))
|
|
return "\n".join(lines)
|
|
|
|
def _resolve_prompt(value):
|
|
if isinstance(value, dict):
|
|
parts = [value.get("system_prompt", "")]
|
|
if value.get("tone"):
|
|
parts.append(f'Tone: {value["tone"]}')
|
|
if value.get("style"):
|
|
parts.append(f'Style: {value["style"]}')
|
|
return "\n".join(p for p in parts if p)
|
|
return str(value)
|
|
|
|
if args in {"none", "default", "neutral"}:
|
|
try:
|
|
if "agent" not in config or not isinstance(config.get("agent"), dict):
|
|
config["agent"] = {}
|
|
config["agent"]["system_prompt"] = ""
|
|
atomic_yaml_write(config_path, config)
|
|
except Exception as e:
|
|
return t("gateway.personality.save_failed", error=str(e))
|
|
self._ephemeral_system_prompt = ""
|
|
return t("gateway.personality.cleared")
|
|
elif args in personalities:
|
|
new_prompt = _resolve_prompt(personalities[args])
|
|
|
|
# Write to config.yaml, same pattern as CLI save_config_value.
|
|
try:
|
|
if "agent" not in config or not isinstance(config.get("agent"), dict):
|
|
config["agent"] = {}
|
|
config["agent"]["system_prompt"] = new_prompt
|
|
atomic_yaml_write(config_path, config)
|
|
except Exception as e:
|
|
return t("gateway.personality.save_failed", error=str(e))
|
|
|
|
# Update in-memory so it takes effect on the very next message.
|
|
self._ephemeral_system_prompt = new_prompt
|
|
|
|
return t("gateway.personality.set_to", name=args)
|
|
|
|
available = "`none`, " + ", ".join(f"`{n}`" for n in personalities)
|
|
return t("gateway.personality.unknown", name=args, available=available)
|
|
|
|
async def _handle_retry_command(self, event: MessageEvent) -> str:
|
|
"""Handle /retry command - re-send the last user message."""
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
history = self.session_store.load_transcript(session_entry.session_id)
|
|
|
|
# Find the last user message
|
|
last_user_msg = None
|
|
last_user_idx = None
|
|
for i in range(len(history) - 1, -1, -1):
|
|
if history[i].get("role") == "user":
|
|
last_user_msg = history[i].get("content", "")
|
|
last_user_idx = i
|
|
break
|
|
|
|
if not last_user_msg:
|
|
return t("gateway.retry.no_previous")
|
|
|
|
# Truncate history to before the last user message and persist
|
|
truncated = history[:last_user_idx]
|
|
self.session_store.rewrite_transcript(session_entry.session_id, truncated)
|
|
# Reset stored token count — transcript was truncated
|
|
session_entry.last_prompt_tokens = 0
|
|
|
|
# Re-send by creating a fake text event with the old message
|
|
retry_event = MessageEvent(
|
|
text=last_user_msg,
|
|
message_type=MessageType.TEXT,
|
|
source=source,
|
|
raw_message=event.raw_message,
|
|
channel_prompt=event.channel_prompt,
|
|
)
|
|
|
|
# Let the normal message handler process it
|
|
return await self._handle_message(retry_event)
|
|
|
|
async def _handle_goal_command(self, event: "MessageEvent") -> str:
|
|
"""Handle /goal for gateway platforms.
|
|
|
|
Subcommands: ``/goal`` / ``/goal status`` / ``/goal pause`` /
|
|
``/goal resume`` / ``/goal clear``. Any other text becomes the
|
|
new goal.
|
|
|
|
Setting a new goal queues the goal text as the next turn so the
|
|
agent starts working on it immediately — the post-turn
|
|
continuation hook then takes over from there.
|
|
"""
|
|
args = (event.get_command_args() or "").strip()
|
|
lower = args.lower()
|
|
|
|
mgr, session_entry = self._get_goal_manager_for_event(event)
|
|
if mgr is None:
|
|
return t("gateway.goal.unavailable")
|
|
|
|
if not args or lower == "status":
|
|
return mgr.status_line()
|
|
|
|
if lower == "pause":
|
|
state = mgr.pause(reason="user-paused")
|
|
if state is None:
|
|
return t("gateway.goal.no_goal_set")
|
|
try:
|
|
adapter = self.adapters.get(event.source.platform) if event.source else None
|
|
_quick_key = self._session_key_for_source(event.source) if event.source else None
|
|
if adapter and _quick_key:
|
|
self._clear_goal_pending_continuations(_quick_key, adapter)
|
|
except Exception as exc:
|
|
logger.debug("goal pause: pending continuation cleanup failed: %s", exc)
|
|
return t("gateway.goal.paused", goal=state.goal)
|
|
|
|
if lower == "resume":
|
|
state = mgr.resume()
|
|
if state is None:
|
|
return t("gateway.goal.no_resume")
|
|
return t("gateway.goal.resumed", goal=state.goal)
|
|
|
|
if lower in {"clear", "stop", "done"}:
|
|
had = mgr.has_goal()
|
|
mgr.clear()
|
|
try:
|
|
adapter = self.adapters.get(event.source.platform) if event.source else None
|
|
_quick_key = self._session_key_for_source(event.source) if event.source else None
|
|
if adapter and _quick_key:
|
|
self._clear_goal_pending_continuations(_quick_key, adapter)
|
|
except Exception as exc:
|
|
logger.debug("goal clear: pending continuation cleanup failed: %s", exc)
|
|
return t("gateway.goal_cleared") if had else t("gateway.no_active_goal")
|
|
|
|
# Otherwise — treat the remaining text as the new goal.
|
|
try:
|
|
state = mgr.set(args)
|
|
except ValueError as exc:
|
|
return t("gateway.goal.invalid", error=str(exc))
|
|
|
|
# Queue the goal text as an immediate first turn so the agent
|
|
# starts making progress. The post-turn hook takes over after.
|
|
adapter = self.adapters.get(event.source.platform) if event.source else None
|
|
_quick_key = self._session_key_for_source(event.source) if event.source else None
|
|
if adapter and _quick_key:
|
|
try:
|
|
kickoff_event = MessageEvent(
|
|
text=state.goal,
|
|
message_type=MessageType.TEXT,
|
|
source=event.source,
|
|
message_id=event.message_id,
|
|
channel_prompt=event.channel_prompt,
|
|
)
|
|
self._enqueue_fifo(_quick_key, kickoff_event, adapter)
|
|
except Exception as exc:
|
|
logger.debug("goal kickoff enqueue failed: %s", exc)
|
|
|
|
return t("gateway.goal.set", budget=state.max_turns, goal=state.goal)
|
|
|
|
async def _handle_subgoal_command(self, event: "MessageEvent") -> str:
|
|
"""Handle /subgoal for gateway platforms (mirror of CLI handler).
|
|
|
|
Subgoals are extra criteria appended to the active goal mid-loop.
|
|
They modify state read at the next turn boundary, so this is safe
|
|
to invoke while the agent is running.
|
|
"""
|
|
args = (event.get_command_args() or "").strip()
|
|
mgr, _session_entry = self._get_goal_manager_for_event(event)
|
|
if mgr is None:
|
|
return t("gateway.goal.unavailable")
|
|
if not mgr.has_goal():
|
|
return "No active goal. Set one with /goal <text>."
|
|
|
|
# No args → list current subgoals.
|
|
if not args:
|
|
return f"{mgr.status_line()}\n{mgr.render_subgoals()}"
|
|
|
|
tokens = args.split(None, 1)
|
|
verb = tokens[0].lower()
|
|
rest = tokens[1].strip() if len(tokens) > 1 else ""
|
|
|
|
if verb == "remove":
|
|
if not rest:
|
|
return "Usage: /subgoal remove <n>"
|
|
try:
|
|
idx = int(rest.split()[0])
|
|
except ValueError:
|
|
return "/subgoal remove: <n> must be an integer (1-based index)."
|
|
try:
|
|
removed = mgr.remove_subgoal(idx)
|
|
except (IndexError, RuntimeError) as exc:
|
|
return f"/subgoal remove: {exc}"
|
|
return f"✓ Removed subgoal {idx}: {removed}"
|
|
|
|
if verb == "clear":
|
|
try:
|
|
prev = mgr.clear_subgoals()
|
|
except RuntimeError as exc:
|
|
return f"/subgoal clear: {exc}"
|
|
if prev:
|
|
return f"✓ Cleared {prev} subgoal{'s' if prev != 1 else ''}."
|
|
return "No subgoals to clear."
|
|
|
|
try:
|
|
text = mgr.add_subgoal(args)
|
|
except (ValueError, RuntimeError) as exc:
|
|
return f"/subgoal: {exc}"
|
|
idx = len(mgr.state.subgoals) if mgr.state else 0
|
|
return f"✓ Added subgoal {idx}: {text}"
|
|
|
|
async def _handle_undo_command(self, event: MessageEvent) -> str:
|
|
"""Handle /undo [N] — back up N user turns (default 1), soft-deleting
|
|
the truncated rows on disk and echoing the backed-up message text so
|
|
the user can copy/edit and resend.
|
|
|
|
Mirrors the CLI/TUI /undo: rewound rows stay in state.db (active=0)
|
|
for audit and are hidden from re-prompts and search. The cached agent
|
|
is evicted so the next message rebuilds context from the truncated
|
|
(active-only) transcript — the gateway's equivalent of the CLI's
|
|
in-place history surgery + memory-cache invalidation.
|
|
"""
|
|
source = event.source
|
|
|
|
# Parse optional turn count: "/undo" → 1, "/undo 3" → 3.
|
|
n = 1
|
|
raw_args = event.get_command_args().strip()
|
|
if raw_args:
|
|
try:
|
|
n = int(raw_args.split()[0])
|
|
except (ValueError, IndexError):
|
|
return t("gateway.undo.invalid_count", arg=raw_args.split()[0])
|
|
if n < 1:
|
|
n = 1
|
|
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
result = self.session_store.rewind_session(session_entry.session_id, n)
|
|
|
|
if result is None:
|
|
return t("gateway.undo.nothing")
|
|
|
|
# Reset stored token count — transcript was truncated.
|
|
session_entry.last_prompt_tokens = 0
|
|
# Evict the cached agent so the next turn rebuilds from the active-only
|
|
# transcript and memory providers refresh their per-session caches.
|
|
try:
|
|
session_key = build_session_key(source)
|
|
self._evict_cached_agent(session_key)
|
|
except Exception as e:
|
|
logger.debug("undo: cached-agent eviction skipped: %s", e)
|
|
|
|
target_text = result["target_text"]
|
|
preview = target_text[:200] + "..." if len(target_text) > 200 else target_text
|
|
return t(
|
|
"gateway.undo.removed",
|
|
turns=result["turns_undone"],
|
|
count=result["rewound_count"],
|
|
preview=preview,
|
|
)
|
|
|
|
async def _handle_set_home_command(self, event: MessageEvent) -> str:
|
|
"""Handle /sethome command -- set the current chat as the platform's home channel."""
|
|
from gateway.run import _home_target_env_var, _home_thread_env_var
|
|
source = event.source
|
|
platform_name = source.platform.value if source.platform else "unknown"
|
|
chat_id = source.chat_id
|
|
chat_name = source.chat_name or chat_id
|
|
|
|
env_key = _home_target_env_var(platform_name)
|
|
thread_env_key = _home_thread_env_var(platform_name)
|
|
thread_id = source.thread_id
|
|
|
|
# Save to .env so it persists across restarts
|
|
try:
|
|
from hermes_cli.config import save_env_value
|
|
save_env_value(env_key, str(chat_id))
|
|
# Keep thread/topic routing explicit and clear stale values when
|
|
# /sethome is run from the parent chat instead of a thread.
|
|
save_env_value(thread_env_key, str(thread_id or ""))
|
|
except Exception as e:
|
|
return t("gateway.set_home.save_failed", error=e)
|
|
|
|
# Keep the running gateway config in sync too. The pre-restart
|
|
# notification path reads self.config before the process reloads env.
|
|
if source.platform:
|
|
platform_config = self.config.platforms.setdefault(
|
|
source.platform,
|
|
PlatformConfig(enabled=True),
|
|
)
|
|
platform_config.home_channel = HomeChannel(
|
|
platform=source.platform,
|
|
chat_id=str(chat_id),
|
|
name=chat_name,
|
|
thread_id=str(thread_id) if thread_id else None,
|
|
)
|
|
|
|
return t("gateway.set_home.success", name=chat_name, chat_id=chat_id)
|
|
|
|
async def _handle_voice_command(self, event: MessageEvent) -> str:
|
|
"""Handle /voice [on|off|tts|channel|leave|status] command."""
|
|
args = event.get_command_args().strip().lower()
|
|
chat_id = event.source.chat_id
|
|
platform = event.source.platform
|
|
voice_key = self._voice_key(platform, chat_id)
|
|
|
|
adapter = self.adapters.get(platform)
|
|
|
|
if args in {"on", "enable"}:
|
|
self._voice_mode[voice_key] = "voice_only"
|
|
self._save_voice_modes()
|
|
if adapter:
|
|
self._set_adapter_auto_tts_enabled(adapter, chat_id, enabled=True)
|
|
return t("gateway.voice.enabled_voice_only")
|
|
elif args in {"off", "disable"}:
|
|
self._voice_mode[voice_key] = "off"
|
|
self._save_voice_modes()
|
|
if adapter:
|
|
self._set_adapter_auto_tts_disabled(adapter, chat_id, disabled=True)
|
|
return t("gateway.voice.disabled_text")
|
|
elif args == "tts":
|
|
self._voice_mode[voice_key] = "all"
|
|
self._save_voice_modes()
|
|
if adapter:
|
|
self._set_adapter_auto_tts_enabled(adapter, chat_id, enabled=True)
|
|
return t("gateway.voice.tts_enabled")
|
|
elif args in {"channel", "join"}:
|
|
return await self._handle_voice_channel_join(event)
|
|
elif args == "leave":
|
|
return await self._handle_voice_channel_leave(event)
|
|
elif args == "status":
|
|
mode = self._voice_mode.get(voice_key, "off")
|
|
labels = {
|
|
"off": t("gateway.voice.label_off"),
|
|
"voice_only": t("gateway.voice.label_voice_only"),
|
|
"all": t("gateway.voice.label_all"),
|
|
}
|
|
# Append voice channel info if connected
|
|
adapter = self.adapters.get(event.source.platform)
|
|
guild_id = self._get_guild_id(event)
|
|
if guild_id and hasattr(adapter, "get_voice_channel_info"):
|
|
info = adapter.get_voice_channel_info(guild_id)
|
|
if info:
|
|
lines = [
|
|
t("gateway.voice.status_mode", label=labels.get(mode, mode)),
|
|
t("gateway.voice.status_channel", channel=info['channel_name']),
|
|
t("gateway.voice.status_participants", count=info['member_count']),
|
|
]
|
|
for m in info["members"]:
|
|
status = t("gateway.voice.speaking") if m.get("is_speaking") else ""
|
|
lines.append(t("gateway.voice.status_member", name=m['display_name'], status=status))
|
|
return "\n".join(lines)
|
|
return t("gateway.voice.status_mode", label=labels.get(mode, mode))
|
|
else:
|
|
# Toggle: off → on, on/all → off
|
|
current = self._voice_mode.get(voice_key, "off")
|
|
if current == "off":
|
|
self._voice_mode[voice_key] = "voice_only"
|
|
self._save_voice_modes()
|
|
if adapter:
|
|
self._set_adapter_auto_tts_enabled(adapter, chat_id, enabled=True)
|
|
toggle_line = t("gateway.voice.enabled_short")
|
|
else:
|
|
self._voice_mode[voice_key] = "off"
|
|
self._save_voice_modes()
|
|
if adapter:
|
|
self._set_adapter_auto_tts_disabled(adapter, chat_id, disabled=True)
|
|
toggle_line = t("gateway.voice.disabled_short")
|
|
# Bare /voice still toggles, but append an explainer so users
|
|
# discover the on/off/tts/status subcommands (and, on Discord,
|
|
# live voice-channel join/leave). The toggle result is shown
|
|
# first via the {toggle} placeholder.
|
|
supports_voice_channels = adapter is not None and hasattr(
|
|
adapter, "join_voice_channel"
|
|
)
|
|
channels = (
|
|
t("gateway.voice.help_channels") if supports_voice_channels else ""
|
|
)
|
|
return t("gateway.voice.help", toggle=toggle_line, channels=channels)
|
|
|
|
async def _handle_rollback_command(self, event: MessageEvent) -> str:
|
|
"""Handle /rollback command — list or restore filesystem checkpoints."""
|
|
from gateway.run import _hermes_home
|
|
from tools.checkpoint_manager import CheckpointManager, format_checkpoint_list
|
|
|
|
# Read checkpoint config from config.yaml
|
|
cp_cfg = {}
|
|
try:
|
|
import yaml as _y
|
|
_cfg_path = _hermes_home / "config.yaml"
|
|
if _cfg_path.exists():
|
|
with open(_cfg_path, encoding="utf-8") as _f:
|
|
_data = _y.safe_load(_f) or {}
|
|
cp_cfg = _data.get("checkpoints", {})
|
|
if isinstance(cp_cfg, bool):
|
|
cp_cfg = {"enabled": cp_cfg}
|
|
except Exception:
|
|
pass
|
|
|
|
if not cp_cfg.get("enabled", False):
|
|
return t("gateway.rollback.not_enabled")
|
|
|
|
mgr = CheckpointManager(
|
|
enabled=True,
|
|
max_snapshots=cp_cfg.get("max_snapshots", 50),
|
|
max_total_size_mb=cp_cfg.get("max_total_size_mb", 500),
|
|
max_file_size_mb=cp_cfg.get("max_file_size_mb", 10),
|
|
)
|
|
|
|
cwd = os.getenv("TERMINAL_CWD", str(Path.home()))
|
|
arg = event.get_command_args().strip()
|
|
|
|
if not arg:
|
|
checkpoints = mgr.list_checkpoints(cwd)
|
|
return format_checkpoint_list(checkpoints, cwd)
|
|
|
|
# Restore by number or hash
|
|
checkpoints = mgr.list_checkpoints(cwd)
|
|
if not checkpoints:
|
|
return t("gateway.rollback.none_found", cwd=cwd)
|
|
|
|
target_hash = None
|
|
try:
|
|
idx = int(arg) - 1
|
|
if 0 <= idx < len(checkpoints):
|
|
target_hash = checkpoints[idx]["hash"]
|
|
else:
|
|
return t("gateway.rollback.invalid_number", max=len(checkpoints))
|
|
except ValueError:
|
|
target_hash = arg
|
|
|
|
result = mgr.restore(cwd, target_hash)
|
|
if result["success"]:
|
|
return t(
|
|
"gateway.rollback.restored",
|
|
hash=result["restored_to"],
|
|
reason=result["reason"],
|
|
)
|
|
return t("gateway.rollback.restore_failed", error=result["error"])
|
|
|
|
async def _handle_background_command(self, event: MessageEvent) -> str:
|
|
"""Handle /background <prompt> — run a prompt in a separate background session.
|
|
|
|
Spawns a new AIAgent in a background thread with its own session.
|
|
When it completes, sends the result back to the same chat without
|
|
modifying the active session's conversation history.
|
|
"""
|
|
prompt = event.get_command_args().strip()
|
|
if not prompt:
|
|
return t("gateway.background.usage")
|
|
|
|
source = event.source
|
|
task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{os.urandom(3).hex()}"
|
|
|
|
event_message_id = self._reply_anchor_for_event(event)
|
|
|
|
# Forward image/audio attachments so the background agent can see them.
|
|
media_urls = list(event.media_urls) if event.media_urls else []
|
|
media_types = list(event.media_types) if event.media_types else []
|
|
|
|
# Fire-and-forget the background task
|
|
_task = asyncio.create_task(
|
|
self._run_background_task(
|
|
prompt,
|
|
source,
|
|
task_id,
|
|
event_message_id=event_message_id,
|
|
media_urls=media_urls,
|
|
media_types=media_types,
|
|
)
|
|
)
|
|
self._background_tasks.add(_task)
|
|
_task.add_done_callback(self._background_tasks.discard)
|
|
|
|
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")
|
|
return t("gateway.background.started", preview=preview, task_id=task_id)
|
|
|
|
async def _handle_reasoning_command(self, event: MessageEvent) -> str:
|
|
"""Handle /reasoning command — manage reasoning effort and display toggle.
|
|
|
|
Usage:
|
|
/reasoning Show current effort level and display state
|
|
/reasoning <level> Set reasoning effort for this session only
|
|
/reasoning <level> --global Persist reasoning effort to config.yaml
|
|
/reasoning reset Clear this session's reasoning override
|
|
/reasoning show|on Show model reasoning in responses
|
|
/reasoning hide|off Hide model reasoning from responses
|
|
"""
|
|
from gateway.run import _hermes_home, _platform_config_key
|
|
import yaml
|
|
|
|
raw_args = event.get_command_args().strip()
|
|
args, persist_global = self._parse_reasoning_command_args(raw_args)
|
|
config_path = _hermes_home / "config.yaml"
|
|
# Normalize the source (Telegram DM topic recovery) before deriving
|
|
# the override key so storage matches the key the next message turn
|
|
# reads — same fix as /model (#30479).
|
|
_reasoning_source = self._normalize_source_for_session_key(event.source)
|
|
session_key = self._session_key_for_source(_reasoning_source)
|
|
self._show_reasoning = self._load_show_reasoning()
|
|
self._reasoning_config = self._resolve_session_reasoning_config(
|
|
source=event.source,
|
|
session_key=session_key,
|
|
)
|
|
|
|
def _save_config_key(key_path: str, value):
|
|
"""Save a dot-separated key to config.yaml."""
|
|
try:
|
|
user_config = {}
|
|
if config_path.exists():
|
|
with open(config_path, encoding="utf-8") as f:
|
|
user_config = yaml.safe_load(f) or {}
|
|
keys = key_path.split(".")
|
|
current = user_config
|
|
for k in keys[:-1]:
|
|
if k not in current or not isinstance(current[k], dict):
|
|
current[k] = {}
|
|
current = current[k]
|
|
current[keys[-1]] = value
|
|
atomic_yaml_write(config_path, user_config)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to save config key %s: %s", key_path, e)
|
|
return False
|
|
|
|
if not raw_args:
|
|
# Show current state
|
|
rc = self._reasoning_config
|
|
if rc is None:
|
|
level = t("gateway.reasoning.level_default")
|
|
elif rc.get("enabled") is False:
|
|
level = t("gateway.reasoning.level_disabled")
|
|
else:
|
|
level = rc.get("effort", "medium")
|
|
display_state = (
|
|
t("gateway.reasoning.display_on")
|
|
if self._show_reasoning
|
|
else t("gateway.reasoning.display_off")
|
|
)
|
|
has_session_override = session_key in (getattr(self, "_session_reasoning_overrides", {}) or {})
|
|
scope = (
|
|
t("gateway.reasoning.scope_session")
|
|
if has_session_override
|
|
else t("gateway.reasoning.scope_global")
|
|
)
|
|
return t(
|
|
"gateway.reasoning.status",
|
|
level=level,
|
|
scope=scope,
|
|
display=display_state,
|
|
)
|
|
|
|
# Display toggle (per-platform)
|
|
platform_key = _platform_config_key(event.source.platform)
|
|
if args in {"show", "on"}:
|
|
self._show_reasoning = True
|
|
_save_config_key(f"display.platforms.{platform_key}.show_reasoning", True)
|
|
return t("gateway.reasoning.display_set_on", platform=platform_key)
|
|
|
|
if args in {"hide", "off"}:
|
|
self._show_reasoning = False
|
|
_save_config_key(f"display.platforms.{platform_key}.show_reasoning", False)
|
|
return t("gateway.reasoning.display_set_off", platform=platform_key)
|
|
|
|
# Effort level change
|
|
effort = args.strip()
|
|
if effort == "reset":
|
|
if persist_global:
|
|
return t("gateway.reasoning.reset_global_unsupported")
|
|
self._set_session_reasoning_override(session_key, None)
|
|
self._reasoning_config = self._load_reasoning_config()
|
|
self._evict_cached_agent(session_key)
|
|
return t("gateway.reasoning.reset_done")
|
|
if effort == "none":
|
|
parsed = {"enabled": False}
|
|
elif effort in {"minimal", "low", "medium", "high", "xhigh"}:
|
|
parsed = {"enabled": True, "effort": effort}
|
|
else:
|
|
return t(
|
|
"gateway.reasoning.unknown_arg",
|
|
arg=effort or raw_args.lower(),
|
|
)
|
|
|
|
self._reasoning_config = parsed
|
|
if persist_global:
|
|
if _save_config_key("agent.reasoning_effort", effort):
|
|
self._set_session_reasoning_override(session_key, None)
|
|
self._evict_cached_agent(session_key)
|
|
return t("gateway.reasoning.set_global", effort=effort)
|
|
self._set_session_reasoning_override(session_key, parsed)
|
|
self._evict_cached_agent(session_key)
|
|
return t("gateway.reasoning.set_global_save_failed", effort=effort)
|
|
|
|
self._set_session_reasoning_override(session_key, parsed)
|
|
self._evict_cached_agent(session_key)
|
|
return t("gateway.reasoning.set_session", effort=effort)
|
|
|
|
async def _handle_fast_command(self, event: MessageEvent) -> str:
|
|
"""Handle /fast — mirror the CLI Priority Processing toggle in gateway chats."""
|
|
from gateway.run import _hermes_home, _load_gateway_config, _resolve_gateway_model
|
|
import yaml
|
|
from hermes_cli.models import model_supports_fast_mode
|
|
|
|
args = event.get_command_args().strip().lower()
|
|
config_path = _hermes_home / "config.yaml"
|
|
self._service_tier = self._load_service_tier()
|
|
|
|
user_config = _load_gateway_config()
|
|
model = _resolve_gateway_model(user_config)
|
|
if not model_supports_fast_mode(model):
|
|
return t("gateway.fast.not_supported")
|
|
|
|
def _save_config_key(key_path: str, value):
|
|
"""Save a dot-separated key to config.yaml."""
|
|
try:
|
|
user_config = {}
|
|
if config_path.exists():
|
|
with open(config_path, encoding="utf-8") as f:
|
|
user_config = yaml.safe_load(f) or {}
|
|
keys = key_path.split(".")
|
|
current = user_config
|
|
for k in keys[:-1]:
|
|
if k not in current or not isinstance(current[k], dict):
|
|
current[k] = {}
|
|
current = current[k]
|
|
current[keys[-1]] = value
|
|
atomic_yaml_write(config_path, user_config)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to save config key %s: %s", key_path, e)
|
|
return False
|
|
|
|
if not args or args == "status":
|
|
status = t("gateway.fast.status_fast") if self._service_tier == "priority" else t("gateway.fast.status_normal")
|
|
return t("gateway.fast.status", mode=status)
|
|
|
|
if args in {"fast", "on"}:
|
|
self._service_tier = "priority"
|
|
saved_value = "fast"
|
|
label = t("gateway.fast.label_fast")
|
|
elif args in {"normal", "off"}:
|
|
self._service_tier = None
|
|
saved_value = "normal"
|
|
label = t("gateway.fast.label_normal")
|
|
else:
|
|
return t("gateway.fast.unknown_arg", arg=args)
|
|
|
|
if _save_config_key("agent.service_tier", saved_value):
|
|
return t("gateway.fast.saved", label=label)
|
|
return t("gateway.fast.session_only", label=label)
|
|
|
|
async def _handle_yolo_command(self, event: MessageEvent) -> Union[str, EphemeralReply]:
|
|
"""Handle /yolo — toggle dangerous command approval bypass for this session only."""
|
|
from tools.approval import (
|
|
disable_session_yolo,
|
|
enable_session_yolo,
|
|
is_session_yolo_enabled,
|
|
)
|
|
|
|
session_key = self._session_key_for_source(event.source)
|
|
current = is_session_yolo_enabled(session_key)
|
|
if current:
|
|
disable_session_yolo(session_key)
|
|
return EphemeralReply(t("gateway.yolo.disabled"))
|
|
else:
|
|
enable_session_yolo(session_key)
|
|
return EphemeralReply(t("gateway.yolo.enabled"))
|
|
|
|
async def _handle_verbose_command(self, event: MessageEvent) -> str:
|
|
"""Handle /verbose command — cycle tool progress display mode.
|
|
|
|
Gated by ``display.tool_progress_command`` in config.yaml (default off).
|
|
When enabled, cycles the tool progress mode through off → new → all →
|
|
verbose → off for the *current platform*. The setting is saved to
|
|
``display.platforms.<platform>.tool_progress`` so each channel can
|
|
have its own verbosity level independently.
|
|
"""
|
|
from gateway.run import _hermes_home, _load_gateway_config, _platform_config_key
|
|
|
|
config_path = _hermes_home / "config.yaml"
|
|
platform_key = _platform_config_key(event.source.platform)
|
|
|
|
# --- check config gate ------------------------------------------------
|
|
try:
|
|
user_config = _load_gateway_config()
|
|
gate_enabled = is_truthy_value(
|
|
cfg_get(user_config, "display", "tool_progress_command"),
|
|
default=False,
|
|
)
|
|
except Exception:
|
|
gate_enabled = False
|
|
|
|
if not gate_enabled:
|
|
return t("gateway.verbose.not_enabled")
|
|
|
|
# --- cycle mode (per-platform) ----------------------------------------
|
|
cycle = ["off", "new", "all", "verbose"]
|
|
descriptions = {
|
|
"off": t("gateway.verbose.mode_off"),
|
|
"new": t("gateway.verbose.mode_new"),
|
|
"all": t("gateway.verbose.mode_all"),
|
|
"verbose": t("gateway.verbose.mode_verbose"),
|
|
}
|
|
|
|
# Read current effective mode for this platform via the resolver
|
|
from gateway.display_config import resolve_display_setting
|
|
current = resolve_display_setting(user_config, platform_key, "tool_progress", "all")
|
|
if current not in cycle:
|
|
current = "all"
|
|
idx = (cycle.index(current) + 1) % len(cycle)
|
|
new_mode = cycle[idx]
|
|
|
|
# Save to display.platforms.<platform>.tool_progress
|
|
try:
|
|
if "display" not in user_config or not isinstance(user_config.get("display"), dict):
|
|
user_config["display"] = {}
|
|
display = user_config["display"]
|
|
if "platforms" not in display or not isinstance(display.get("platforms"), dict):
|
|
display["platforms"] = {}
|
|
if platform_key not in display["platforms"] or not isinstance(display["platforms"].get(platform_key), dict):
|
|
display["platforms"][platform_key] = {}
|
|
display["platforms"][platform_key]["tool_progress"] = new_mode
|
|
atomic_yaml_write(config_path, user_config)
|
|
return (
|
|
f"{descriptions[new_mode]}\n"
|
|
+ t("gateway.verbose.saved_suffix", platform=platform_key)
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Failed to save tool_progress mode: %s", e)
|
|
return f"{descriptions[new_mode]}\n" + t("gateway.verbose.save_failed", error=e)
|
|
|
|
async def _handle_footer_command(self, event: MessageEvent) -> str:
|
|
"""Handle /footer command — toggle the runtime-metadata footer.
|
|
|
|
Usage:
|
|
/footer → toggle on/off
|
|
/footer on → enable globally
|
|
/footer off → disable globally
|
|
/footer status → show current state + fields
|
|
|
|
The footer is saved to ``display.runtime_footer.enabled`` (global).
|
|
Per-platform overrides under ``display.platforms.<platform>.runtime_footer``
|
|
are respected but not modified here — edit config.yaml directly for
|
|
per-platform control.
|
|
"""
|
|
from gateway.run import _hermes_home, _load_gateway_config, _platform_config_key, _resolve_gateway_model
|
|
from gateway.runtime_footer import resolve_footer_config
|
|
|
|
config_path = _hermes_home / "config.yaml"
|
|
platform_key = _platform_config_key(event.source.platform)
|
|
|
|
# --- parse argument -------------------------------------------------
|
|
arg = ""
|
|
try:
|
|
text = (getattr(event, "message", None) or "").strip()
|
|
if text.startswith("/"):
|
|
parts = text.split(None, 1)
|
|
if len(parts) > 1:
|
|
arg = parts[1].strip().lower()
|
|
except Exception:
|
|
arg = ""
|
|
|
|
# --- load config ----------------------------------------------------
|
|
try:
|
|
user_config: dict = _load_gateway_config()
|
|
except Exception as e:
|
|
return t("gateway.config_read_failed", error=e)
|
|
|
|
effective = resolve_footer_config(user_config, platform_key)
|
|
|
|
if arg in {"status", "?"}:
|
|
state = t("gateway.footer.state_on") if effective["enabled"] else t("gateway.footer.state_off")
|
|
fields = ", ".join(effective.get("fields") or [])
|
|
return t(
|
|
"gateway.footer.status",
|
|
state=state,
|
|
fields=fields,
|
|
platform=platform_key,
|
|
)
|
|
|
|
if arg in {"on", "enable", "true", "1"}:
|
|
new_state = True
|
|
elif arg in {"off", "disable", "false", "0"}:
|
|
new_state = False
|
|
elif arg == "":
|
|
new_state = not effective["enabled"]
|
|
else:
|
|
return t("gateway.footer.usage")
|
|
|
|
# --- write global flag ---------------------------------------------
|
|
try:
|
|
if not isinstance(user_config.get("display"), dict):
|
|
user_config["display"] = {}
|
|
display = user_config["display"]
|
|
if not isinstance(display.get("runtime_footer"), dict):
|
|
display["runtime_footer"] = {}
|
|
display["runtime_footer"]["enabled"] = new_state
|
|
atomic_yaml_write(config_path, user_config)
|
|
except Exception as e:
|
|
logger.warning("Failed to save runtime_footer.enabled: %s", e)
|
|
return t("gateway.config_save_failed", error=e)
|
|
|
|
state = t("gateway.footer.state_on") if new_state else t("gateway.footer.state_off")
|
|
example = ""
|
|
if new_state:
|
|
# Show a preview using current agent state if available.
|
|
from gateway.runtime_footer import format_runtime_footer
|
|
preview = format_runtime_footer(
|
|
model=_resolve_gateway_model(user_config) or None,
|
|
context_tokens=0,
|
|
context_length=None,
|
|
fields=effective.get("fields") or ["model", "context_pct", "cwd"],
|
|
)
|
|
if preview:
|
|
example = t("gateway.footer.example_line", preview=preview)
|
|
return t("gateway.footer.saved", state=state, example=example)
|
|
|
|
async def _handle_compress_command(self, event: MessageEvent) -> str:
|
|
"""Handle /compress command -- manually compress conversation context.
|
|
|
|
Accepts an optional focus topic: ``/compress <focus>`` guides the
|
|
summariser to preserve information related to *focus* while being
|
|
more aggressive about discarding everything else.
|
|
|
|
Also accepts the boundary-aware form ``/compress here [N]``:
|
|
summarize everything except the most recent ``N`` exchanges
|
|
(default 2), kept verbatim. Inspired by Claude Code's Rewind
|
|
"Summarize up to here" action (v2.1.139, May 2026,
|
|
https://code.claude.com/docs/en/whats-new/2026-w20).
|
|
"""
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
history = self.session_store.load_transcript(session_entry.session_id)
|
|
|
|
if not history or len(history) < 4:
|
|
return t("gateway.compress.not_enough")
|
|
|
|
# Parse args: either a focus topic (full compress) or the
|
|
# boundary-aware "here [N]" form (partial compress).
|
|
from hermes_cli.partial_compress import (
|
|
parse_partial_compress_args,
|
|
rejoin_compressed_head_and_tail,
|
|
split_history_for_partial_compress,
|
|
)
|
|
_raw_args = (event.get_command_args() or "").strip()
|
|
partial, keep_last, focus_topic = parse_partial_compress_args(_raw_args)
|
|
|
|
try:
|
|
from run_agent import AIAgent
|
|
from agent.manual_compression_feedback import summarize_manual_compression
|
|
from agent.model_metadata import estimate_request_tokens_rough
|
|
|
|
session_key = self._session_key_for_source(source)
|
|
model, runtime_kwargs = self._resolve_session_agent_runtime(
|
|
source=source,
|
|
session_key=session_key,
|
|
)
|
|
if not runtime_kwargs.get("api_key"):
|
|
return t("gateway.compress.no_provider")
|
|
|
|
msgs = [
|
|
{"role": m.get("role"), "content": m.get("content")}
|
|
for m in history
|
|
if m.get("role") in {"user", "assistant"} and m.get("content")
|
|
]
|
|
|
|
# Boundary-aware split: only the head is summarized; the most
|
|
# recent `keep_last` exchanges are preserved verbatim. The
|
|
# split snaps the tail to a user-turn start so the rejoined
|
|
# transcript keeps role alternation valid.
|
|
tail: list = []
|
|
head = msgs
|
|
if partial:
|
|
head, tail = split_history_for_partial_compress(msgs, keep_last)
|
|
if not tail:
|
|
# Degenerate split — fall back to full compression.
|
|
partial = False
|
|
head = msgs
|
|
|
|
tmp_agent = AIAgent(
|
|
**runtime_kwargs,
|
|
model=model,
|
|
max_iterations=4,
|
|
quiet_mode=True,
|
|
skip_memory=True,
|
|
enabled_toolsets=["memory"],
|
|
session_id=session_entry.session_id,
|
|
)
|
|
try:
|
|
tmp_agent._print_fn = lambda *a, **kw: None
|
|
|
|
# Estimate with system prompt + tool schemas included so the
|
|
# figure reflects real request pressure, not a transcript-only
|
|
# underestimate (#6217). Must be computed after tmp_agent is
|
|
# built so _cached_system_prompt/tools are populated.
|
|
_sys_prompt = getattr(tmp_agent, "_cached_system_prompt", "") or ""
|
|
_tools = getattr(tmp_agent, "tools", None) or None
|
|
approx_tokens = estimate_request_tokens_rough(
|
|
msgs, system_prompt=_sys_prompt, tools=_tools
|
|
)
|
|
|
|
compressor = tmp_agent.context_compressor
|
|
if not compressor.has_content_to_compress(head):
|
|
return t("gateway.compress.nothing_to_do")
|
|
|
|
loop = asyncio.get_running_loop()
|
|
compressed, _ = await loop.run_in_executor(
|
|
None,
|
|
lambda: tmp_agent._compress_context(head, "", approx_tokens=approx_tokens, focus_topic=focus_topic, force=True)
|
|
)
|
|
|
|
# Re-append the verbatim tail after the compressed head,
|
|
# guarding the seam against illegal role adjacency.
|
|
if partial and tail:
|
|
compressed = rejoin_compressed_head_and_tail(compressed, tail)
|
|
|
|
# _compress_context already calls end_session() on the old session
|
|
# (preserving its full transcript in SQLite) and creates a new
|
|
# session_id for the continuation. Write the compressed messages
|
|
# into the NEW session so the original history stays searchable.
|
|
new_session_id = tmp_agent.session_id
|
|
if new_session_id != session_entry.session_id:
|
|
session_entry.session_id = new_session_id
|
|
self.session_store._save()
|
|
self._sync_telegram_topic_binding(
|
|
source, session_entry, reason="compress-command",
|
|
)
|
|
|
|
self.session_store.rewrite_transcript(new_session_id, compressed)
|
|
# Reset stored token count — transcript changed, old value is stale
|
|
self.session_store.update_session(
|
|
session_entry.session_key, last_prompt_tokens=0
|
|
)
|
|
new_tokens = estimate_request_tokens_rough(
|
|
compressed, system_prompt=_sys_prompt, tools=_tools
|
|
)
|
|
summary = summarize_manual_compression(
|
|
msgs,
|
|
compressed,
|
|
approx_tokens,
|
|
new_tokens,
|
|
)
|
|
# Detect summary-generation failure so we can surface a
|
|
# visible warning to the user even on the manual /compress
|
|
# path (otherwise the failure is silently logged).
|
|
# _last_compress_aborted means the aux LLM returned no
|
|
# usable summary and the compressor preserved messages
|
|
# unchanged (no drop, no placeholder). force=True was
|
|
# passed above so any active cooldown is bypassed.
|
|
_summary_aborted = bool(getattr(compressor, "_last_compress_aborted", False))
|
|
_summary_err = getattr(compressor, "_last_summary_error", None)
|
|
# Separately: did the user's CONFIGURED aux model fail
|
|
# and we recovered via main? Surface that as an info
|
|
# note so they can fix their config.
|
|
_aux_fail_model = getattr(compressor, "_last_aux_model_failure_model", None)
|
|
_aux_fail_err = getattr(compressor, "_last_aux_model_failure_error", None)
|
|
finally:
|
|
# Evict cached agent so next turn rebuilds system prompt
|
|
# from current files (SOUL.md, memory, etc.).
|
|
self._evict_cached_agent(session_key)
|
|
self._cleanup_agent_resources(tmp_agent)
|
|
lines = [f"🗜️ {summary['headline']}"]
|
|
if focus_topic:
|
|
lines.append(t("gateway.compress.focus_line", topic=focus_topic))
|
|
lines.append(summary["token_line"])
|
|
if summary["note"]:
|
|
lines.append(summary["note"])
|
|
if _summary_aborted:
|
|
lines.append(
|
|
t(
|
|
"gateway.compress.aborted",
|
|
error=(_summary_err or "unknown error"),
|
|
)
|
|
)
|
|
elif _aux_fail_model:
|
|
lines.append(
|
|
t(
|
|
"gateway.compress.aux_failed",
|
|
model=_aux_fail_model,
|
|
error=(_aux_fail_err or "unknown error"),
|
|
)
|
|
)
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
logger.warning("Manual compress failed: %s", e)
|
|
return t("gateway.compress.failed", error=e)
|
|
|
|
async def _handle_topic_command(self, event: MessageEvent, args: str = "") -> str:
|
|
"""Handle /topic for Telegram DM user-managed topic sessions."""
|
|
source = event.source
|
|
if source.platform != Platform.TELEGRAM or source.chat_type != "dm":
|
|
return t("gateway.topic.not_telegram_dm")
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
return format_session_db_unavailable(prefix=t("gateway.shared.session_db_unavailable_prefix"))
|
|
|
|
# Authorization: /topic activates multi-session mode and mutates
|
|
# SQLite side tables. Unauthorized senders (not in allowlist) must
|
|
# not be able to do that. Gateway routes already authorize the
|
|
# message before reaching here, but defense in depth.
|
|
auth_fn = getattr(self, "_is_user_authorized", None)
|
|
if callable(auth_fn):
|
|
try:
|
|
if not auth_fn(source):
|
|
return t("gateway.topic.unauthorized")
|
|
except Exception:
|
|
logger.debug("Topic auth check failed", exc_info=True)
|
|
|
|
args = event.get_command_args().strip()
|
|
|
|
# /topic help — inline usage without leaving the bot.
|
|
if args.lower() in {"help", "?", "-h", "--help"}:
|
|
return self._telegram_topic_help_text()
|
|
|
|
# /topic off — clean disable path so users don't have to edit the DB.
|
|
if args.lower() in {"off", "disable", "stop"}:
|
|
return self._disable_telegram_topic_mode_for_chat(source)
|
|
|
|
if args:
|
|
if not source.thread_id:
|
|
return t("gateway.topic.restore_needs_topic")
|
|
return await self._restore_telegram_topic_session(event, args)
|
|
|
|
capabilities = await self._get_telegram_topic_capabilities(source)
|
|
if capabilities.get("checked"):
|
|
if capabilities.get("has_topics_enabled") is False:
|
|
# Debounce the BotFather screenshot: don't re-send on every
|
|
# /topic while threads are still disabled.
|
|
if self._should_send_telegram_capability_hint(source):
|
|
await self._send_telegram_topic_setup_image(source)
|
|
return t("gateway.topic.topics_disabled")
|
|
if capabilities.get("allows_users_to_create_topics") is False:
|
|
if self._should_send_telegram_capability_hint(source):
|
|
await self._send_telegram_topic_setup_image(source)
|
|
return t("gateway.topic.topics_user_disallowed")
|
|
|
|
try:
|
|
self._session_db.enable_telegram_topic_mode(
|
|
chat_id=str(source.chat_id),
|
|
user_id=str(source.user_id),
|
|
has_topics_enabled=capabilities.get("has_topics_enabled"),
|
|
allows_users_to_create_topics=capabilities.get("allows_users_to_create_topics"),
|
|
)
|
|
except Exception as exc:
|
|
logger.exception("Failed to enable Telegram topic mode")
|
|
return t("gateway.topic.enable_failed", error=exc)
|
|
|
|
if not source.thread_id:
|
|
await self._ensure_telegram_system_topic(source)
|
|
|
|
if source.thread_id:
|
|
try:
|
|
binding = self._session_db.get_telegram_topic_binding(
|
|
chat_id=str(source.chat_id),
|
|
thread_id=str(source.thread_id),
|
|
)
|
|
except Exception:
|
|
logger.debug("Failed to read Telegram topic binding", exc_info=True)
|
|
binding = None
|
|
if binding:
|
|
session_id = str(binding.get("session_id") or "")
|
|
title = None
|
|
try:
|
|
title = self._session_db.get_session_title(session_id)
|
|
except Exception:
|
|
title = None
|
|
session_label = title or t("gateway.topic.untitled_session")
|
|
return t(
|
|
"gateway.topic.bound_status",
|
|
label=session_label,
|
|
session_id=session_id,
|
|
)
|
|
return t("gateway.topic.thread_ready")
|
|
|
|
return self._telegram_topic_root_status_message(source)
|
|
|
|
async def _handle_title_command(self, event: MessageEvent) -> str:
|
|
"""Handle /title command — set or show the current session's title."""
|
|
source = event.source
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
session_id = session_entry.session_id
|
|
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
return format_session_db_unavailable(prefix=t("gateway.shared.session_db_unavailable_prefix"))
|
|
|
|
# Ensure session exists in SQLite DB (it may only exist in session_store
|
|
# if this is the first command in a new session)
|
|
existing_title = self._session_db.get_session_title(session_id)
|
|
if existing_title is None:
|
|
# Session doesn't exist in DB yet — create it
|
|
try:
|
|
self._session_db.create_session(
|
|
session_id=session_id,
|
|
source=source.platform.value if source.platform else "unknown",
|
|
user_id=source.user_id,
|
|
)
|
|
except Exception:
|
|
pass # Session might already exist, ignore errors
|
|
|
|
title_arg = event.get_command_args().strip()
|
|
if title_arg:
|
|
# Sanitize the title before setting
|
|
try:
|
|
sanitized = self._session_db.sanitize_title(title_arg)
|
|
except ValueError as e:
|
|
return t("gateway.shared.warn_passthrough", error=e)
|
|
if not sanitized:
|
|
return t("gateway.title.empty_after_clean")
|
|
# Set the title
|
|
try:
|
|
if self._session_db.set_session_title(session_id, sanitized):
|
|
return t("gateway.title.set_to", title=sanitized)
|
|
else:
|
|
return t("gateway.title.not_found")
|
|
except ValueError as e:
|
|
return t("gateway.shared.warn_passthrough", error=e)
|
|
else:
|
|
# Show the current title and session ID
|
|
title = self._session_db.get_session_title(session_id)
|
|
if title:
|
|
return t("gateway.title.current_with_title", session_id=session_id, title=title)
|
|
else:
|
|
return t("gateway.title.current_no_title", session_id=session_id)
|
|
|
|
async def _handle_resume_command(self, event: MessageEvent) -> str:
|
|
"""Handle /resume command — list or switch to a previous session."""
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
return format_session_db_unavailable(prefix=t("gateway.shared.session_db_unavailable_prefix"))
|
|
|
|
source = event.source
|
|
session_key = self._session_key_for_source(source)
|
|
name = event.get_command_args().strip()
|
|
|
|
# Strip common outer brackets/quotes users may type literally from the
|
|
# usage hint (e.g. ``/resume <abc123>``). Mirrors the CLI behavior.
|
|
if len(name) >= 2 and (
|
|
(name[0] == "<" and name[-1] == ">")
|
|
or (name[0] == "[" and name[-1] == "]")
|
|
or (name[0] == '"' and name[-1] == '"')
|
|
or (name[0] == "'" and name[-1] == "'")
|
|
):
|
|
name = name[1:-1].strip()
|
|
|
|
def _list_titled_sessions() -> list[dict]:
|
|
user_source = source.platform.value if source.platform else None
|
|
sessions = self._session_db.list_sessions_rich(source=user_source, limit=10)
|
|
return [s for s in sessions if s.get("title")][:10]
|
|
|
|
if not name:
|
|
# List recent titled sessions for this user/platform
|
|
try:
|
|
titled = _list_titled_sessions()
|
|
if not titled:
|
|
return t("gateway.resume.no_named_sessions")
|
|
lines = [t("gateway.resume.list_header")]
|
|
for idx, s in enumerate(titled[:10], start=1):
|
|
title = s["title"]
|
|
preview = s.get("preview", "")[:40]
|
|
preview_part = t("gateway.resume.list_preview_suffix", preview=preview) if preview else ""
|
|
lines.append(t("gateway.resume.list_item_numbered", index=idx, title=title, preview_part=preview_part))
|
|
lines.append(t("gateway.resume.list_footer_numbered"))
|
|
return "\n".join(lines)
|
|
except Exception as e:
|
|
logger.debug("Failed to list titled sessions: %s", e)
|
|
return t("gateway.resume.list_failed", error=e)
|
|
|
|
# Resolve a numbered choice or a title to a session ID.
|
|
if name.isdigit():
|
|
try:
|
|
titled = _list_titled_sessions()
|
|
except Exception as e:
|
|
logger.debug("Failed to list titled sessions for numeric resume: %s", e)
|
|
return t("gateway.resume.list_failed", error=e)
|
|
index = int(name)
|
|
if index < 1 or index > len(titled):
|
|
return t("gateway.resume.out_of_range", index=index)
|
|
target = titled[index - 1]
|
|
target_id = target.get("id")
|
|
name = target.get("title") or name
|
|
else:
|
|
# Try direct session ID lookup first (so `/resume <session_id>`
|
|
# works in the gateway, not just `/resume <title>`).
|
|
session = self._session_db.get_session(name)
|
|
if session:
|
|
target_id = session["id"]
|
|
else:
|
|
target_id = self._session_db.resolve_session_by_title(name)
|
|
if not target_id:
|
|
return t("gateway.resume.not_found", name=name)
|
|
# Compression creates child continuations that hold the live transcript.
|
|
# Follow that chain so gateway /resume matches CLI behavior (#15000).
|
|
try:
|
|
target_id = self._session_db.resolve_resume_session_id(target_id)
|
|
except Exception as e:
|
|
logger.debug("Failed to resolve resume continuation for %s: %s", target_id, e)
|
|
|
|
# Check if already on that session
|
|
current_entry = self.session_store.get_or_create_session(source)
|
|
if current_entry.session_id == target_id:
|
|
return t("gateway.resume.already_on", name=name)
|
|
|
|
# Clear any running agent for this session key
|
|
self._release_running_agent_state(session_key)
|
|
|
|
# Switch the session entry to point at the old session
|
|
new_entry = self.session_store.switch_session(session_key, target_id)
|
|
if not new_entry:
|
|
return t("gateway.resume.switch_failed")
|
|
self._clear_session_boundary_security_state(session_key)
|
|
|
|
# Evict any cached agent for this session so the next message
|
|
# rebuilds with the correct session_id end-to-end — mirrors
|
|
# /branch and /reset. Without this, the cached AIAgent (and its
|
|
# memory provider, which cached `_session_id` during initialize())
|
|
# keeps writing into the wrong session's record. See #6672.
|
|
self._evict_cached_agent(session_key)
|
|
|
|
# Get the title for confirmation
|
|
title = self._session_db.get_session_title(target_id) or name
|
|
|
|
# Count messages for context
|
|
history = self.session_store.load_transcript(target_id)
|
|
msg_count = len([m for m in history if m.get("role") == "user"]) if history else 0
|
|
if not msg_count:
|
|
return t("gateway.resume.resumed_no_count", title=title)
|
|
if msg_count == 1:
|
|
return t("gateway.resume.resumed_one", title=title, count=msg_count)
|
|
return t("gateway.resume.resumed_many", title=title, count=msg_count)
|
|
|
|
async def _handle_branch_command(self, event: MessageEvent) -> str:
|
|
"""Handle /branch [name] — fork the current session into a new independent copy.
|
|
|
|
Copies conversation history to a new session so the user can explore
|
|
a different approach without losing the original.
|
|
Inspired by Claude Code's /branch command.
|
|
"""
|
|
import uuid as _uuid
|
|
|
|
if not self._session_db:
|
|
from hermes_state import format_session_db_unavailable
|
|
return format_session_db_unavailable(prefix=t("gateway.shared.session_db_unavailable_prefix"))
|
|
|
|
source = event.source
|
|
session_key = self._session_key_for_source(source)
|
|
|
|
# Load the current session and its transcript
|
|
current_entry = self.session_store.get_or_create_session(source)
|
|
history = self.session_store.load_transcript(current_entry.session_id)
|
|
if not history:
|
|
return t("gateway.branch.no_conversation")
|
|
|
|
branch_name = event.get_command_args().strip()
|
|
|
|
# Generate the new session ID
|
|
from datetime import datetime as _dt
|
|
now = _dt.now()
|
|
timestamp_str = now.strftime("%Y%m%d_%H%M%S")
|
|
short_uuid = _uuid.uuid4().hex[:6]
|
|
new_session_id = f"{timestamp_str}_{short_uuid}"
|
|
|
|
# Determine branch title
|
|
if branch_name:
|
|
branch_title = branch_name
|
|
else:
|
|
current_title = self._session_db.get_session_title(current_entry.session_id)
|
|
base = current_title or "branch"
|
|
branch_title = self._session_db.get_next_title_in_lineage(base)
|
|
|
|
parent_session_id = current_entry.session_id
|
|
|
|
# Create the new session with parent link.
|
|
# Persist a stable ``_branched_from`` marker in model_config so
|
|
# list_sessions_rich() keeps the branch visible in /resume and
|
|
# /sessions even after the parent is reopened and re-ended with a
|
|
# different end_reason (e.g. tui_shutdown overwriting 'branched').
|
|
try:
|
|
self._session_db.create_session(
|
|
session_id=new_session_id,
|
|
source=source.platform.value if source.platform else "gateway",
|
|
model=(self.config.get("model", {}) or {}).get("default") if isinstance(self.config, dict) else None,
|
|
model_config={"_branched_from": parent_session_id},
|
|
parent_session_id=parent_session_id,
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to create branch session: %s", e)
|
|
return t("gateway.branch.create_failed", error=e)
|
|
|
|
# Copy conversation history to the new session
|
|
for msg in history:
|
|
try:
|
|
self._session_db.append_message(
|
|
session_id=new_session_id,
|
|
role=msg.get("role", "user"),
|
|
content=msg.get("content"),
|
|
tool_name=msg.get("tool_name") or msg.get("name"),
|
|
tool_calls=msg.get("tool_calls"),
|
|
tool_call_id=msg.get("tool_call_id"),
|
|
finish_reason=msg.get("finish_reason"),
|
|
reasoning=msg.get("reasoning"),
|
|
reasoning_content=msg.get("reasoning_content"),
|
|
reasoning_details=msg.get("reasoning_details"),
|
|
codex_reasoning_items=msg.get("codex_reasoning_items"),
|
|
codex_message_items=msg.get("codex_message_items"),
|
|
)
|
|
except Exception:
|
|
pass # Best-effort copy
|
|
|
|
# Set title
|
|
try:
|
|
self._session_db.set_session_title(new_session_id, branch_title)
|
|
except Exception:
|
|
pass
|
|
|
|
# Switch the session store entry to the new session
|
|
new_entry = self.session_store.switch_session(session_key, new_session_id)
|
|
if not new_entry:
|
|
return t("gateway.branch.switch_failed")
|
|
self._clear_session_boundary_security_state(session_key)
|
|
|
|
# Evict any cached agent for this session
|
|
self._evict_cached_agent(session_key)
|
|
|
|
msg_count = len([m for m in history if m.get("role") == "user"])
|
|
key = "gateway.branch.branched_one" if msg_count == 1 else "gateway.branch.branched_many"
|
|
return t(key, title=branch_title, count=msg_count, parent=parent_session_id, new=new_session_id)
|
|
|
|
async def _handle_usage_command(self, event: MessageEvent) -> str:
|
|
"""Handle /usage command -- show token usage for the current session.
|
|
|
|
Checks both _running_agents (mid-turn) and _agent_cache (between turns)
|
|
so that rate limits, cost estimates, and detailed token breakdowns are
|
|
available whenever the user asks, not only while the agent is running.
|
|
"""
|
|
from gateway.run import _AGENT_PENDING_SENTINEL
|
|
source = event.source
|
|
session_key = self._session_key_for_source(source)
|
|
|
|
# Try running agent first (mid-turn), then cached agent (between turns)
|
|
agent = self._running_agents.get(session_key)
|
|
if not agent or agent is _AGENT_PENDING_SENTINEL:
|
|
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
|
_cache = getattr(self, "_agent_cache", None)
|
|
if _cache_lock and _cache is not None:
|
|
with _cache_lock:
|
|
cached = _cache.get(session_key)
|
|
if cached:
|
|
agent = cached[0]
|
|
|
|
# Resolve provider/base_url/api_key for the account-usage fetch.
|
|
# Prefer the live agent; fall back to persisted billing data on the
|
|
# SessionDB row so `/usage` still returns account info between turns
|
|
# when no agent is resident.
|
|
provider = getattr(agent, "provider", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
|
|
base_url = getattr(agent, "base_url", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
|
|
api_key = getattr(agent, "api_key", None) if agent and agent is not _AGENT_PENDING_SENTINEL else None
|
|
if not provider and getattr(self, "_session_db", None) is not None:
|
|
try:
|
|
_entry_for_billing = self.session_store.get_or_create_session(source)
|
|
persisted = self._session_db.get_session(_entry_for_billing.session_id) or {}
|
|
except Exception:
|
|
persisted = {}
|
|
provider = provider or persisted.get("billing_provider")
|
|
base_url = base_url or persisted.get("billing_base_url")
|
|
|
|
# Fetch account usage off the event loop so slow provider APIs don't
|
|
# block the gateway. Failures are non-fatal -- account_lines stays [].
|
|
account_lines: list[str] = []
|
|
credits_lines: list[str] = []
|
|
if provider:
|
|
try:
|
|
account_snapshot = await asyncio.to_thread(
|
|
fetch_account_usage,
|
|
provider,
|
|
base_url=base_url,
|
|
api_key=api_key,
|
|
)
|
|
except Exception:
|
|
account_snapshot = None
|
|
if account_snapshot:
|
|
account_lines = render_account_usage_lines(account_snapshot, markdown=True)
|
|
|
|
# ── Nous credits magnitudes + monthly-grant % gauge ─────────────
|
|
# Shared with the CLI / TUI /usage block via nous_credits_lines(): a single
|
|
# auth-gate + portal-fetch + render path (which also honors the dev fixture).
|
|
# Run off the event loop. The helper gates on "a Nous account is logged in"
|
|
# — NOT the inference provider and NOT nested under `if provider:` — so a
|
|
# Nous-credentialled user running inference elsewhere (or with none resident)
|
|
# still sees their balance. NO recovery trigger: messaging binds no notice
|
|
# consumer, so /usage only displays. Fail-open: never break /usage.
|
|
try:
|
|
from agent.account_usage import nous_credits_lines
|
|
|
|
credits_lines = await asyncio.to_thread(nous_credits_lines, markdown=True)
|
|
except Exception:
|
|
credits_lines = [] # fail-open: never break /usage
|
|
|
|
if agent and hasattr(agent, "session_total_tokens") and agent.session_api_calls > 0:
|
|
lines = []
|
|
|
|
# Rate limits (when available from provider headers)
|
|
rl_state = agent.get_rate_limit_state()
|
|
if rl_state and rl_state.has_data:
|
|
from agent.rate_limit_tracker import format_rate_limit_compact
|
|
lines.append(t("gateway.usage.rate_limits", state=format_rate_limit_compact(rl_state)))
|
|
lines.append("")
|
|
|
|
# Session token usage — detailed breakdown matching CLI
|
|
input_tokens = getattr(agent, "session_input_tokens", 0) or 0
|
|
output_tokens = getattr(agent, "session_output_tokens", 0) or 0
|
|
cache_read = getattr(agent, "session_cache_read_tokens", 0) or 0
|
|
cache_write = getattr(agent, "session_cache_write_tokens", 0) or 0
|
|
|
|
lines.append(t("gateway.usage.header_session"))
|
|
lines.append(t("gateway.usage.label_model", model=agent.model))
|
|
lines.append(t("gateway.usage.label_input_tokens", count=f"{input_tokens:,}"))
|
|
if cache_read:
|
|
lines.append(t("gateway.usage.label_cache_read", count=f"{cache_read:,}"))
|
|
if cache_write:
|
|
lines.append(t("gateway.usage.label_cache_write", count=f"{cache_write:,}"))
|
|
lines.append(t("gateway.usage.label_output_tokens", count=f"{output_tokens:,}"))
|
|
lines.append(t("gateway.usage.label_total", count=f"{agent.session_total_tokens:,}"))
|
|
lines.append(t("gateway.usage.label_api_calls", count=agent.session_api_calls))
|
|
|
|
# Cost estimation
|
|
try:
|
|
from agent.usage_pricing import CanonicalUsage, estimate_usage_cost
|
|
cost_result = estimate_usage_cost(
|
|
agent.model,
|
|
CanonicalUsage(
|
|
input_tokens=input_tokens,
|
|
output_tokens=output_tokens,
|
|
cache_read_tokens=cache_read,
|
|
cache_write_tokens=cache_write,
|
|
),
|
|
provider=getattr(agent, "provider", None),
|
|
base_url=getattr(agent, "base_url", None),
|
|
)
|
|
if cost_result.amount_usd is not None:
|
|
prefix = "~" if cost_result.status == "estimated" else ""
|
|
lines.append(t("gateway.usage.label_cost", prefix=prefix, amount=f"{float(cost_result.amount_usd):.4f}"))
|
|
elif cost_result.status == "included":
|
|
lines.append(t("gateway.usage.label_cost_included"))
|
|
except Exception:
|
|
pass
|
|
|
|
# Context window and compressions
|
|
ctx = agent.context_compressor
|
|
if ctx.last_prompt_tokens:
|
|
pct = min(100, ctx.last_prompt_tokens / ctx.context_length * 100) if ctx.context_length else 0
|
|
lines.append(t("gateway.usage.label_context", used=f"{ctx.last_prompt_tokens:,}", total=f"{ctx.context_length:,}", pct=f"{pct:.0f}"))
|
|
if ctx.compression_count:
|
|
lines.append(t("gateway.usage.label_compressions", count=ctx.compression_count))
|
|
|
|
if account_lines:
|
|
lines.append("")
|
|
lines.extend(account_lines)
|
|
if credits_lines:
|
|
lines.append("")
|
|
lines.extend(credits_lines)
|
|
|
|
return "\n".join(lines)
|
|
|
|
# No agent at all -- check session history for a rough count
|
|
session_entry = self.session_store.get_or_create_session(source)
|
|
history = self.session_store.load_transcript(session_entry.session_id)
|
|
if history:
|
|
from agent.model_metadata import estimate_messages_tokens_rough
|
|
msgs = [m for m in history if m.get("role") in {"user", "assistant"} and m.get("content")]
|
|
approx = estimate_messages_tokens_rough(msgs)
|
|
lines = [
|
|
t("gateway.usage.header_session_info"),
|
|
t("gateway.usage.label_messages", count=len(msgs)),
|
|
t("gateway.usage.label_estimated_context", count=f"{approx:,}"),
|
|
t("gateway.usage.detailed_after_first"),
|
|
]
|
|
if account_lines:
|
|
lines.append("")
|
|
lines.extend(account_lines)
|
|
if credits_lines:
|
|
lines.append("")
|
|
lines.extend(credits_lines)
|
|
return "\n".join(lines)
|
|
if account_lines or credits_lines:
|
|
# account-only, credits-only, or both — joined with a blank divider.
|
|
parts = list(account_lines)
|
|
if credits_lines:
|
|
if parts:
|
|
parts.append("")
|
|
parts.extend(credits_lines)
|
|
return "\n".join(parts)
|
|
return t("gateway.usage.no_data")
|
|
|
|
async def _handle_insights_command(self, event: MessageEvent) -> str:
|
|
"""Handle /insights command -- show usage insights and analytics."""
|
|
args = event.get_command_args().strip()
|
|
|
|
# Normalize Unicode dashes (Telegram/iOS auto-converts -- to em/en dash)
|
|
args = re.sub(r'[\u2012\u2013\u2014\u2015](days|source)', r'--\1', args)
|
|
|
|
days = 30
|
|
source = None
|
|
|
|
# Parse simple args: /insights 7 or /insights --days 7
|
|
if args:
|
|
parts = args.split()
|
|
i = 0
|
|
while i < len(parts):
|
|
if parts[i] == "--days" and i + 1 < len(parts):
|
|
try:
|
|
days = int(parts[i + 1])
|
|
except ValueError:
|
|
return t("gateway.insights.invalid_days", value=parts[i + 1])
|
|
i += 2
|
|
elif parts[i] == "--source" and i + 1 < len(parts):
|
|
source = parts[i + 1]
|
|
i += 2
|
|
elif parts[i].isdigit():
|
|
days = int(parts[i])
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
try:
|
|
from hermes_state import SessionDB
|
|
from agent.insights import InsightsEngine
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
def _run_insights():
|
|
db = SessionDB()
|
|
engine = InsightsEngine(db)
|
|
report = engine.generate(days=days, source=source)
|
|
result = engine.format_gateway(report)
|
|
db.close()
|
|
return result
|
|
|
|
return await loop.run_in_executor(None, _run_insights)
|
|
except Exception as e:
|
|
logger.error("Insights command error: %s", e, exc_info=True)
|
|
return t("gateway.insights.error", error=e)
|
|
|
|
async def _handle_reload_mcp_command(self, event: MessageEvent) -> Optional[str]:
|
|
"""Handle /reload-mcp — reconnect MCP servers and rebuild the cached agent.
|
|
|
|
Reloading MCP tools invalidates the provider prompt cache for the
|
|
active session (tool schemas are baked into the system prompt). The
|
|
next message re-sends full input tokens, which is expensive on
|
|
long-context or high-reasoning models.
|
|
|
|
To surface that cost, the command routes through the slash-confirm
|
|
primitive: users get an Approve Once / Always Approve / Cancel
|
|
prompt before the reload actually runs. "Always Approve" persists
|
|
``approvals.mcp_reload_confirm: false`` so the prompt is silenced
|
|
for subsequent reloads in any session.
|
|
|
|
Users can also skip the confirm by flipping the config key directly.
|
|
"""
|
|
source = event.source
|
|
session_key = self._session_key_for_source(source)
|
|
|
|
# Read the gate fresh from disk so a prior "always" click takes
|
|
# effect on the next invocation without restarting the gateway.
|
|
user_config = self._read_user_config()
|
|
approvals = user_config.get("approvals") if isinstance(user_config, dict) else None
|
|
confirm_required = True
|
|
if isinstance(approvals, dict):
|
|
confirm_required = bool(approvals.get("mcp_reload_confirm", True))
|
|
|
|
if not confirm_required:
|
|
return await self._execute_mcp_reload(event)
|
|
|
|
# Route through slash-confirm. The primitive sends the prompt and
|
|
# stores the resume handler; the button/text response triggers
|
|
# ``_resolve_slash_confirm`` which invokes the handler with the
|
|
# chosen outcome.
|
|
async def _on_confirm(choice: str) -> Optional[str]:
|
|
if choice == "cancel":
|
|
return t("gateway.reload_mcp.cancelled")
|
|
if choice == "always":
|
|
# Persist the opt-out and run the reload.
|
|
try:
|
|
from cli import save_config_value
|
|
save_config_value("approvals.mcp_reload_confirm", False)
|
|
logger.info(
|
|
"User opted out of /reload-mcp confirmation (session=%s)",
|
|
session_key,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Failed to persist mcp_reload_confirm=false: %s", exc)
|
|
# once / always → run the reload
|
|
result = await self._execute_mcp_reload(event)
|
|
if choice == "always":
|
|
return f"{result}\n\n" + t("gateway.reload_mcp.always_followup")
|
|
return result
|
|
|
|
prompt_message = t("gateway.reload_mcp.confirm_prompt")
|
|
return await self._request_slash_confirm(
|
|
event=event,
|
|
command="reload-mcp",
|
|
title="/reload-mcp",
|
|
message=prompt_message,
|
|
handler=_on_confirm,
|
|
)
|
|
|
|
async def _handle_reload_skills_command(self, event: MessageEvent) -> str:
|
|
"""Handle /reload-skills — rescan skills dir, queue a note for next turn.
|
|
|
|
Skills don't need to be in the system prompt for the model to use
|
|
them (they're invoked via ``/skill-name``, ``skills_list``, or
|
|
``skill_view`` at runtime), so this does NOT clear the prompt cache
|
|
— prefix caching stays intact.
|
|
|
|
If any skills were added or removed, a one-shot note is queued on
|
|
``self._pending_skills_reload_notes[session_key]``. The gateway
|
|
prepends it to the NEXT user message in this session (see the
|
|
consumer at ~L11025 in ``_run_agent_turn``), then clears it. Nothing
|
|
is written to the session transcript out-of-band, so message
|
|
alternation is preserved.
|
|
"""
|
|
loop = asyncio.get_running_loop()
|
|
try:
|
|
from agent.skill_commands import reload_skills
|
|
|
|
result = await loop.run_in_executor(None, reload_skills)
|
|
added = result.get("added", []) # [{"name", "description"}, ...]
|
|
removed = result.get("removed", []) # [{"name", "description"}, ...]
|
|
total = result.get("total", 0)
|
|
|
|
# Let each connected adapter refresh any platform-side state
|
|
# that cached the skill list at startup. Today that's the
|
|
# Discord /skill autocomplete (registered once per connect);
|
|
# without this call, new skills stay invisible in the
|
|
# dropdown and deleted skills error out when clicked. Other
|
|
# adapters that don't override refresh_skill_group (Telegram's
|
|
# BotCommand menu, Slack subcommand map, etc.) are silently
|
|
# skipped — the in-process reload above is enough for them.
|
|
for adapter in list(self.adapters.values()):
|
|
refresh = getattr(adapter, "refresh_skill_group", None)
|
|
if not callable(refresh):
|
|
continue
|
|
try:
|
|
maybe = refresh()
|
|
if inspect.isawaitable(maybe):
|
|
await maybe
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"Adapter %s refresh_skill_group raised: %s",
|
|
getattr(adapter, "name", adapter), exc,
|
|
)
|
|
|
|
lines = [t("gateway.reload_skills.header")]
|
|
if not added and not removed:
|
|
lines.append(t("gateway.reload_skills.no_new"))
|
|
lines.append(t("gateway.reload_skills.total", count=total))
|
|
return "\n".join(lines)
|
|
|
|
def _fmt_line(item: dict) -> str:
|
|
nm = item.get("name", "")
|
|
desc = item.get("description", "")
|
|
if desc:
|
|
return t("gateway.reload_skills.item_with_desc", name=nm, desc=desc)
|
|
return t("gateway.reload_skills.item_no_desc", name=nm)
|
|
|
|
if added:
|
|
lines.append(t("gateway.reload_skills.added_header"))
|
|
for item in added:
|
|
lines.append(_fmt_line(item))
|
|
if removed:
|
|
lines.append(t("gateway.reload_skills.removed_header"))
|
|
for item in removed:
|
|
lines.append(_fmt_line(item))
|
|
lines.append(t("gateway.reload_skills.total", count=total))
|
|
|
|
# Queue the one-shot note for the next user turn in this session.
|
|
# Format matches how the system prompt renders pre-existing
|
|
# skills (`` - name: description``) so the model reads the
|
|
# diff in the same shape as its original skill catalog.
|
|
sections = ["[USER INITIATED SKILLS RELOAD:"]
|
|
if added:
|
|
sections.append("")
|
|
sections.append("Added Skills:")
|
|
for item in added:
|
|
sections.append(_fmt_line(item))
|
|
if removed:
|
|
sections.append("")
|
|
sections.append("Removed Skills:")
|
|
for item in removed:
|
|
sections.append(_fmt_line(item))
|
|
sections.append("")
|
|
sections.append("Use skills_list to see the updated catalog.]")
|
|
note = "\n".join(sections)
|
|
|
|
session_key = self._session_key_for_source(event.source)
|
|
if not hasattr(self, "_pending_skills_reload_notes"):
|
|
self._pending_skills_reload_notes = {}
|
|
if session_key:
|
|
self._pending_skills_reload_notes[session_key] = note
|
|
|
|
return "\n".join(lines)
|
|
|
|
except Exception as e:
|
|
logger.warning("Skills reload failed: %s", e)
|
|
return t("gateway.reload_skills.failed", error=e)
|
|
|
|
async def _handle_bundles_command(self, event: MessageEvent) -> str:
|
|
"""Handle /bundles — list installed skill bundles.
|
|
|
|
Mirrors the CLI ``/bundles`` handler. Returns a single text
|
|
message suitable for any gateway adapter; bundles are loaded by
|
|
invoking the bundle's own ``/<slug>`` command, not by this one.
|
|
"""
|
|
try:
|
|
from agent.skill_bundles import list_bundles, _bundles_dir
|
|
except Exception as exc:
|
|
logger.warning("Bundles command unavailable: %s", exc)
|
|
return f"Bundles subsystem unavailable: {exc}"
|
|
|
|
bundles = list_bundles()
|
|
if not bundles:
|
|
return (
|
|
"No skill bundles installed.\n"
|
|
"Create one on the host with:\n"
|
|
" `hermes bundles create <name> --skill <s1> --skill <s2>`\n"
|
|
f"Directory: `{_bundles_dir()}`"
|
|
)
|
|
|
|
lines = [f"**Skill Bundles** ({len(bundles)} installed):", ""]
|
|
for info in bundles:
|
|
skill_count = len(info.get("skills", []))
|
|
desc = info.get("description") or f"Load {skill_count} skills"
|
|
lines.append(
|
|
f"• `/{info['slug']}` — {desc} _({skill_count} skills)_"
|
|
)
|
|
for s in info.get("skills", []):
|
|
lines.append(f" · {s}")
|
|
lines.append("")
|
|
lines.append("Invoke a bundle with `/<slug>` to load all its skills.")
|
|
return "\n".join(lines)
|
|
|
|
async def _handle_approve_command(self, event: MessageEvent) -> Optional[str]:
|
|
"""Handle /approve command — unblock waiting agent thread(s).
|
|
|
|
The agent thread(s) are blocked inside tools/approval.py waiting for
|
|
the user to respond. This handler signals the event so the agent
|
|
resumes and the terminal_tool executes the command inline — the same
|
|
flow as the CLI's synchronous input() approval.
|
|
|
|
Supports multiple concurrent approvals (parallel subagents,
|
|
execute_code). ``/approve`` resolves the oldest pending command;
|
|
``/approve all`` resolves every pending command at once.
|
|
|
|
Usage:
|
|
/approve — approve oldest pending command once
|
|
/approve all — approve ALL pending commands at once
|
|
/approve session — approve oldest + remember for session
|
|
/approve all session — approve all + remember for session
|
|
/approve always — approve oldest + remember permanently
|
|
/approve all always — approve all + remember permanently
|
|
"""
|
|
source = event.source
|
|
session_key = self._session_key_for_source(source)
|
|
|
|
from tools.approval import (
|
|
resolve_gateway_approval, has_blocking_approval,
|
|
)
|
|
|
|
if not has_blocking_approval(session_key):
|
|
if session_key in self._pending_approvals:
|
|
self._pending_approvals.pop(session_key)
|
|
return t("gateway.approval_expired")
|
|
return t("gateway.approve.no_pending")
|
|
|
|
# Parse args: support "all", "all session", "all always", "session", "always"
|
|
args = event.get_command_args().strip().lower().split()
|
|
resolve_all = "all" in args
|
|
remaining = [a for a in args if a != "all"]
|
|
|
|
if any(a in {"always", "permanent", "permanently"} for a in remaining):
|
|
choice = "always"
|
|
elif any(a in {"session", "ses"} for a in remaining):
|
|
choice = "session"
|
|
else:
|
|
choice = "once"
|
|
|
|
count = resolve_gateway_approval(session_key, choice, resolve_all=resolve_all)
|
|
if not count:
|
|
return t("gateway.approve.no_pending")
|
|
|
|
# Resume typing indicator — agent is about to continue processing.
|
|
_adapter = self.adapters.get(source.platform)
|
|
if _adapter:
|
|
_adapter.resume_typing_for_chat(source.chat_id)
|
|
|
|
logger.info("User approved %d dangerous command(s) via /approve (%s)", count, choice)
|
|
plural = "plural" if count > 1 else "singular"
|
|
return t(f"gateway.approve.{choice}_{plural}", count=count)
|
|
|
|
async def _handle_deny_command(self, event: MessageEvent) -> str:
|
|
"""Handle /deny command — reject pending dangerous command(s).
|
|
|
|
Signals blocked agent thread(s) with a 'deny' result so they receive
|
|
a definitive BLOCKED message, same as the CLI deny flow.
|
|
|
|
``/deny`` denies the oldest; ``/deny all`` denies everything.
|
|
"""
|
|
source = event.source
|
|
session_key = self._session_key_for_source(source)
|
|
|
|
from tools.approval import (
|
|
resolve_gateway_approval, has_blocking_approval,
|
|
)
|
|
|
|
if not has_blocking_approval(session_key):
|
|
if session_key in self._pending_approvals:
|
|
self._pending_approvals.pop(session_key)
|
|
return t("gateway.deny.stale")
|
|
return t("gateway.deny.no_pending")
|
|
|
|
args = event.get_command_args().strip().lower()
|
|
resolve_all = "all" in args
|
|
|
|
count = resolve_gateway_approval(session_key, "deny", resolve_all=resolve_all)
|
|
if not count:
|
|
return t("gateway.deny.no_pending")
|
|
|
|
# Resume typing indicator — agent continues (with BLOCKED result).
|
|
_adapter = self.adapters.get(source.platform)
|
|
if _adapter:
|
|
_adapter.resume_typing_for_chat(source.chat_id)
|
|
|
|
logger.info("User denied %d dangerous command(s) via /deny", count)
|
|
if count > 1:
|
|
return t("gateway.deny.denied_plural", count=count)
|
|
return t("gateway.deny.denied_singular")
|
|
|
|
async def _handle_debug_command(self, event: MessageEvent) -> str:
|
|
"""Handle /debug — upload debug report (summary only) and return paste URLs.
|
|
|
|
Gateway uploads ONLY the summary report (system info + log tails),
|
|
NOT full log files, to protect conversation privacy. Users who need
|
|
full log uploads should use ``hermes debug share`` from the CLI.
|
|
"""
|
|
import asyncio
|
|
from hermes_cli.debug import (
|
|
_capture_dump, collect_debug_report,
|
|
upload_to_pastebin, _schedule_auto_delete,
|
|
_GATEWAY_PRIVACY_NOTICE, _best_effort_sweep_expired_pastes,
|
|
)
|
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Run blocking I/O (dump capture, log reads, uploads) in a thread.
|
|
def _collect_and_upload():
|
|
_best_effort_sweep_expired_pastes()
|
|
dump_text = _capture_dump()
|
|
report = collect_debug_report(log_lines=200, dump_text=dump_text)
|
|
|
|
urls = {}
|
|
try:
|
|
urls["Report"] = upload_to_pastebin(report)
|
|
except Exception as exc:
|
|
return t("gateway.debug.upload_failed", error=exc)
|
|
|
|
# Schedule auto-deletion after 6 hours
|
|
_schedule_auto_delete(list(urls.values()))
|
|
|
|
lines = [_GATEWAY_PRIVACY_NOTICE, "", t("gateway.debug.header"), ""]
|
|
label_width = max(len(k) for k in urls)
|
|
for label, url in urls.items():
|
|
lines.append(f"`{label:<{label_width}}` {url}")
|
|
|
|
lines.append("")
|
|
lines.append(t("gateway.debug.auto_delete"))
|
|
lines.append(t("gateway.debug.full_logs_hint"))
|
|
lines.append(t("gateway.debug.share_hint"))
|
|
return "\n".join(lines)
|
|
|
|
return await loop.run_in_executor(None, _collect_and_upload)
|
|
|
|
async def _handle_update_command(self, event: MessageEvent) -> str:
|
|
"""Handle /update command — update Hermes Agent to the latest version.
|
|
|
|
Spawns ``hermes update`` in a detached session (via ``setsid``) so it
|
|
survives the gateway restart that ``hermes update`` may trigger. Marker
|
|
files are written so either the current gateway process or the next one
|
|
can notify the user when the update finishes.
|
|
"""
|
|
from gateway.run import _hermes_home, _resolve_hermes_bin
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
from datetime import datetime
|
|
from hermes_cli.config import is_managed, format_managed_message
|
|
|
|
# Block non-messaging platforms (API server, webhooks, ACP)
|
|
platform = event.source.platform
|
|
_allowed = self._UPDATE_ALLOWED_PLATFORMS
|
|
# Plugin platforms with allow_update_command=True are also allowed
|
|
if platform not in _allowed:
|
|
try:
|
|
from gateway.platform_registry import platform_registry
|
|
entry = platform_registry.get(platform.value)
|
|
if not entry or not entry.allow_update_command:
|
|
return t("gateway.update.platform_not_messaging")
|
|
except Exception:
|
|
return t("gateway.update.platform_not_messaging")
|
|
|
|
if is_managed():
|
|
return f"✗ {format_managed_message('update Hermes Agent')}"
|
|
|
|
project_root = Path(__file__).parent.parent.resolve()
|
|
git_dir = project_root / '.git'
|
|
|
|
if not git_dir.exists():
|
|
return t("gateway.update.not_git_repo")
|
|
|
|
hermes_cmd = _resolve_hermes_bin()
|
|
if not hermes_cmd:
|
|
return t("gateway.update.hermes_cmd_not_found")
|
|
|
|
pending_path = _hermes_home / ".update_pending.json"
|
|
output_path = _hermes_home / ".update_output.txt"
|
|
exit_code_path = _hermes_home / ".update_exit_code"
|
|
session_key = self._session_key_for_source(event.source)
|
|
pending = {
|
|
"platform": event.source.platform.value,
|
|
"chat_id": event.source.chat_id,
|
|
"chat_type": event.source.chat_type,
|
|
"user_id": event.source.user_id,
|
|
"session_key": session_key,
|
|
"timestamp": datetime.now().isoformat(),
|
|
}
|
|
if event.source.thread_id:
|
|
pending["thread_id"] = event.source.thread_id
|
|
if event.message_id:
|
|
pending["message_id"] = event.message_id
|
|
_tmp_pending = pending_path.with_suffix(".tmp")
|
|
_tmp_pending.write_text(json.dumps(pending))
|
|
_tmp_pending.replace(pending_path)
|
|
exit_code_path.unlink(missing_ok=True)
|
|
|
|
# Spawn `hermes update --gateway` detached so it survives gateway restart.
|
|
# --gateway enables file-based IPC for interactive prompts (stash
|
|
# restore, config migration) so the gateway can forward them to the
|
|
# user instead of silently skipping them.
|
|
# Use setsid for portable session detach (works under system services
|
|
# where systemd-run --user fails due to missing D-Bus session).
|
|
# PYTHONUNBUFFERED ensures output is flushed line-by-line so the
|
|
# gateway can stream it to the messenger in near-real-time.
|
|
# Spawn `hermes update --gateway` detached so it survives gateway restart.
|
|
# --gateway enables file-based IPC for interactive prompts (stash
|
|
# restore, config migration) so the gateway can forward them to the
|
|
# user instead of silently skipping them.
|
|
# Use setsid for portable session detach (works under system services
|
|
# where systemd-run --user fails due to missing D-Bus session).
|
|
# PYTHONUNBUFFERED ensures output is flushed line-by-line so the
|
|
# gateway can stream it to the messenger in near-real-time.
|
|
#
|
|
# Windows: no bash/setsid chain. Run `hermes update --gateway`
|
|
# directly via sys.executable; redirect stdout/stderr to the same
|
|
# output files via Popen file handles; write the exit code in a
|
|
# follow-up write. A tiny Python watcher would be cleaner but
|
|
# we're already inside gateway/run.py's update path which is async,
|
|
# so the simplest correct thing is: launch an inline Python helper
|
|
# that runs the command and writes both outputs.
|
|
try:
|
|
if sys.platform == "win32":
|
|
import textwrap
|
|
from hermes_cli._subprocess_compat import windows_detach_popen_kwargs
|
|
|
|
# hermes_cmd is a list of argv parts we can pass directly
|
|
# (no shell-quoting needed).
|
|
helper = textwrap.dedent(
|
|
"""
|
|
import os, subprocess, sys
|
|
output_path = sys.argv[1]
|
|
exit_code_path = sys.argv[2]
|
|
cmd = sys.argv[3:]
|
|
env = dict(os.environ)
|
|
env["PYTHONUNBUFFERED"] = "1"
|
|
with open(output_path, "wb") as f:
|
|
proc = subprocess.Popen(cmd, stdout=f, stderr=subprocess.STDOUT, env=env)
|
|
rc = proc.wait(timeout=3600)
|
|
with open(exit_code_path, "w") as f:
|
|
f.write(str(rc))
|
|
"""
|
|
).strip()
|
|
subprocess.Popen(
|
|
[
|
|
sys.executable, "-c", helper,
|
|
str(output_path), str(exit_code_path),
|
|
*hermes_cmd, "update", "--gateway",
|
|
],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
**windows_detach_popen_kwargs(),
|
|
)
|
|
else:
|
|
hermes_cmd_str = " ".join(shlex.quote(part) for part in hermes_cmd)
|
|
update_cmd = (
|
|
f"PYTHONUNBUFFERED=1 {hermes_cmd_str} update --gateway"
|
|
f" > {shlex.quote(str(output_path))} 2>&1; "
|
|
# Avoid `status=$?`: `status` is a read-only special parameter
|
|
# in zsh, and this command string is copied/reused in macOS/zsh
|
|
# operator wrappers. Keep the template zsh-safe even though this
|
|
# specific subprocess currently runs under bash.
|
|
f"rc=$?; printf '%s' \"$rc\" > {shlex.quote(str(exit_code_path))}"
|
|
)
|
|
setsid_bin = shutil.which("setsid")
|
|
if setsid_bin:
|
|
# Preferred: setsid creates a new session, fully detached
|
|
subprocess.Popen(
|
|
[setsid_bin, "bash", "-c", update_cmd],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
start_new_session=True,
|
|
)
|
|
else:
|
|
# Fallback: start_new_session=True calls os.setsid() in child
|
|
subprocess.Popen(
|
|
["bash", "-c", update_cmd],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
start_new_session=True,
|
|
)
|
|
except Exception as e:
|
|
pending_path.unlink(missing_ok=True)
|
|
exit_code_path.unlink(missing_ok=True)
|
|
return t("gateway.update.start_failed", error=e)
|
|
|
|
self._schedule_update_notification_watch()
|
|
return t("gateway.update.starting")
|