Merge remote-tracking branch 'origin/main' into bb/pets

This commit is contained in:
Brooklyn Nicholson 2026-06-22 05:25:49 -05:00
commit 5342eccf12
823 changed files with 58322 additions and 13772 deletions

623
cli.py
View file

@ -452,6 +452,7 @@ def load_cli_config() -> Dict[str, Any]:
"resume_max_assistant_lines": 3,
"resume_skip_tool_only": True,
"show_reasoning": False,
"reasoning_full": False,
"streaming": True,
"busy_input_mode": "interrupt",
"persistent_output": True,
@ -562,6 +563,18 @@ def load_cli_config() -> Dict[str, Any]:
from hermes_cli.config import _expand_env_vars
defaults = _expand_env_vars(defaults)
# Managed scope: overlay administrator-pinned values LAST so they win over
# the user's config here too. cli.py builds its config independently of
# hermes_cli.config._load_config_impl (which has its own managed merge), so
# without this the entire interactive CLI/TUI surface — skin, display prefs,
# etc. read from CLI_CONFIG — would silently ignore managed scope while
# `hermes config`/`doctor`/guards (which use load_config) honor it. The
# shared helper mirrors _load_config_impl (env-only expansion, root-model
# normalization, leaf-merge) and is fail-open.
from hermes_cli import managed_scope
defaults = managed_scope.apply_managed_overlay(defaults)
# Apply terminal config to environment variables (so terminal_tool picks them up)
terminal_config = defaults.get("terminal", {})
@ -608,6 +621,7 @@ def load_cli_config() -> Dict[str, Any]:
"container_persistent": "TERMINAL_CONTAINER_PERSISTENT",
"docker_volumes": "TERMINAL_DOCKER_VOLUMES",
"docker_env": "TERMINAL_DOCKER_ENV",
"docker_extra_args": "TERMINAL_DOCKER_EXTRA_ARGS",
"docker_mount_cwd_to_workspace": "TERMINAL_DOCKER_MOUNT_CWD_TO_WORKSPACE",
"docker_run_as_host_user": "TERMINAL_DOCKER_RUN_AS_HOST_USER",
"docker_persist_across_processes": "TERMINAL_DOCKER_PERSIST_ACROSS_PROCESSES",
@ -1019,11 +1033,20 @@ def _run_cleanup(*, notify_session_finalize: bool = True):
# partially-initialised agents where the attribute is missing.
_session_msgs = getattr(_active_agent_ref, '_session_messages', None)
if isinstance(_session_msgs, list):
logger.info(
"CLI cleanup calling memory shutdown for session %s with %d message(s)",
getattr(_active_agent_ref, "session_id", None) or "<unknown>",
len(_session_msgs),
)
_active_agent_ref.shutdown_memory_provider(_session_msgs)
else:
logger.info(
"CLI cleanup calling memory shutdown for session %s without session message list",
getattr(_active_agent_ref, "session_id", None) or "<unknown>",
)
_active_agent_ref.shutdown_memory_provider()
except Exception:
pass
except Exception as e:
logger.warning("CLI cleanup memory shutdown failed: %s", e, exc_info=True)
def _should_emit_cleanup_session_finalize(session_id: str | None) -> bool:
@ -1224,11 +1247,91 @@ def _path_is_within_root(path: Path, root: Path) -> bool:
return False
def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
def _resolve_worktree_base(repo_root: str) -> tuple:
"""Resolve the freshest base ref to branch a new worktree from.
The standalone clone's ``HEAD`` can lag the remote by hundreds of commits
(the ``~/.hermes/hermes-agent`` clone is updated only by ``hermes update``,
not on every session). Branching a worktree from that stale ``HEAD`` roots
every new branch on an old base so the PR diff GitHub computes against
current ``main`` balloons with unrelated changes, and the agent has to
discover the staleness via the pre-push gate and rebase. Branching from the
freshly-fetched remote tip instead means the worktree starts current.
Strategy (each step falls back to the next on failure):
1. If the current branch tracks an upstream, fetch and use that upstream
ref so a deliberate feature-branch worktree tracks its own remote,
not the default branch.
2. Else fetch the remote's default branch (``origin/HEAD`` → e.g.
``origin/main``) and use it.
3. Else fall back to ``HEAD`` (offline, no remote, or detached) the
old behavior, never worse than before.
Returns ``(base_ref, label)`` where *base_ref* is a git revision suitable
for ``git worktree add ... <base_ref>`` and *label* is a short
human-readable description for the session banner.
"""
import subprocess
def _git(args, timeout=20):
return subprocess.run(
["git", *args],
capture_output=True, text=True, timeout=timeout, cwd=repo_root,
)
# 1. Current branch's upstream, if it tracks one.
try:
up = _git(["rev-parse", "--abbrev-ref", "--symbolic-full-name", "@{upstream}"])
if up.returncode == 0:
upstream = up.stdout.strip() # e.g. "origin/main"
if upstream and "/" in upstream:
remote = upstream.split("/", 1)[0]
# Fetch just that branch; fail-soft if offline.
_git(["fetch", remote, upstream.split("/", 1)[1]], timeout=30)
return upstream, f"{upstream} (fetched)"
except Exception as e:
logger.debug("worktree base: upstream resolution failed: %s", e)
# 2. Remote default branch (origin/HEAD).
try:
# Resolve the remote's default branch symref.
head_ref = _git(["symbolic-ref", "--quiet", "refs/remotes/origin/HEAD"])
default_ref = ""
if head_ref.returncode == 0:
default_ref = head_ref.stdout.strip().replace("refs/remotes/", "", 1)
if not default_ref:
# origin/HEAD not set locally; ask the remote.
show = _git(["remote", "show", "origin"], timeout=30)
for line in show.stdout.splitlines():
line = line.strip()
if line.startswith("HEAD branch:"):
_branch = line.split(":", 1)[1].strip()
# A remote with no default branch reports "(unknown)";
# don't construct a bogus "origin/(unknown)" ref from it.
if _branch and _branch != "(unknown)":
default_ref = "origin/" + _branch
break
if default_ref and "/" in default_ref:
remote, branch = default_ref.split("/", 1)
_git(["fetch", remote, branch], timeout=30)
return default_ref, f"{default_ref} (fetched)"
except Exception as e:
logger.debug("worktree base: default-branch resolution failed: %s", e)
# 3. Fall back to local HEAD (offline / no remote / detached).
return "HEAD", "HEAD (local — could not reach remote)"
def _setup_worktree(repo_root: str = None, sync_base: bool = True) -> Optional[Dict[str, str]]:
"""Create an isolated git worktree for this CLI session.
Returns a dict with worktree metadata on success, None on failure.
The dict contains: path, branch, repo_root.
When *sync_base* is True (default), the worktree branches from the
freshly-fetched remote tip rather than the (possibly stale) local ``HEAD``
see ``_resolve_worktree_base``. Set ``worktree_sync: false`` in config to
branch from local ``HEAD`` (the pre-#10760-followup behavior).
"""
import subprocess
@ -1260,15 +1363,37 @@ def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
except Exception as e:
logger.debug("Could not update .gitignore: %s", e)
# Resolve the base ref. By default branch from the freshly-fetched remote
# tip so the worktree starts current with the project, not from the
# (possibly stale) local HEAD of the standalone clone (#10760 follow-up).
if sync_base:
base_ref, base_label = _resolve_worktree_base(repo_root)
else:
base_ref, base_label = "HEAD", "HEAD (local — worktree_sync disabled)"
# Create the worktree
try:
result = subprocess.run(
["git", "worktree", "add", str(wt_path), "-b", branch_name, "HEAD"],
["git", "worktree", "add", str(wt_path), "-b", branch_name, base_ref],
capture_output=True, text=True, timeout=30, cwd=repo_root,
)
if result.returncode != 0:
print(f"\033[31m✗ Failed to create worktree: {result.stderr.strip()}\033[0m")
return None
# If branching from the resolved remote ref failed for any reason
# (e.g. a partial fetch left the ref unusable), retry from local
# HEAD so worktree creation never hard-fails on a sync hiccup.
if base_ref != "HEAD":
logger.warning(
"worktree add from %s failed (%s); retrying from local HEAD",
base_ref, result.stderr.strip(),
)
base_ref, base_label = "HEAD", "HEAD (fallback — remote base failed)"
result = subprocess.run(
["git", "worktree", "add", str(wt_path), "-b", branch_name, base_ref],
capture_output=True, text=True, timeout=30, cwd=repo_root,
)
if result.returncode != 0:
print(f"\033[31m✗ Failed to create worktree: {result.stderr.strip()}\033[0m")
return None
except Exception as e:
print(f"\033[31m✗ Failed to create worktree: {e}\033[0m")
return None
@ -1340,14 +1465,27 @@ def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
except Exception as e:
logger.debug("Error copying .worktreeinclude entries: %s", e)
# Lock the worktree so other processes (and `git worktree remove`) can see
# it is actively in use. Fail-soft: a lock failure never blocks the session.
try:
subprocess.run(
["git", "worktree", "lock", "--reason", f"hermes pid={os.getpid()}", str(wt_path)],
capture_output=True, text=True, timeout=10, cwd=repo_root,
)
logger.debug("Worktree locked: %s (pid=%s)", wt_path, os.getpid())
except Exception as e:
logger.debug("git worktree lock failed (non-fatal): %s", e)
info = {
"path": str(wt_path),
"branch": branch_name,
"repo_root": repo_root,
"base": base_ref,
}
print(f"\033[32m✓ Worktree created:\033[0m {wt_path}")
print(f" Branch: {branch_name}")
print(f" Base: {base_label}")
return info
@ -1415,6 +1553,16 @@ def _cleanup_worktree(info: Dict[str, str] = None) -> None:
# Remove worktree (even if working tree is dirty — uncommitted
# changes without unpushed commits are just artifacts)
# Unlock first so `git worktree remove` isn't blocked by the lock we
# placed at creation time. Fail-soft — never block cleanup.
try:
subprocess.run(
["git", "worktree", "unlock", wt_path],
capture_output=True, text=True, timeout=10, cwd=repo_root,
)
except Exception as e:
logger.debug("git worktree unlock failed (non-fatal): %s", e)
try:
subprocess.run(
["git", "worktree", "remove", wt_path, "--force"],
@ -3259,6 +3407,9 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self.bell_on_complete = CLI_CONFIG["display"].get("bell_on_complete", False)
# show_reasoning: display model thinking/reasoning before the response
self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False)
# reasoning_full: when reasoning display is on, print the post-response
# recap box uncollapsed instead of clamping to the first 10 lines.
self.reasoning_full = CLI_CONFIG["display"].get("reasoning_full", False)
_configure_output_history(
enabled=CLI_CONFIG["display"].get("persistent_output", True),
max_lines=CLI_CONFIG["display"].get("persistent_output_max_lines", 200),
@ -3503,11 +3654,36 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._last_turn_finished_at: Optional[float] = None # time.time() when the last agent loop finished
# Initialize SQLite session store early so /title works before first message
self._session_db = None
self._session_db_unavailable = False
try:
from hermes_state import SessionDB
self._session_db = SessionDB()
except Exception as e:
# #41386: a failed session store means the transcript is NOT
# persisted to state.db — the live chat looks healthy but resume
# later shows a truncated/empty session. A buried log line is not
# enough; surface it prominently so the user knows persistence is
# off for this run and can fix the store before relying on resume.
self._session_db_unavailable = True
logger.warning("Failed to initialize SessionDB — session will NOT be indexed for search: %s", e)
try:
# Console is imported at module scope; do NOT re-import it here.
# A function-local `import` would make `Console` a local name for
# the whole __init__ body and break the earlier `self.console =
# Console()` with UnboundLocalError.
Console(stderr=True).print(
"[bold yellow]⚠ Session store unavailable[/bold yellow] — "
"this conversation will [bold]NOT be saved[/bold] to disk and "
"cannot be resumed later. Searching past sessions is also disabled.\n"
f" Reason: {e}\n"
" Fix the state.db store (e.g. `hermes update` to rebuild the venv) to restore persistence."
)
except Exception:
# Never let the warning path itself break startup.
print(
"WARNING: Session store unavailable — this conversation will NOT be "
f"saved to disk and cannot be resumed later. Reason: {e}"
)
# Opportunistic state.db maintenance — runs at most once per
# min_interval_hours, tracked via state_meta in state.db itself so
@ -3637,6 +3813,15 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._resize_recovery_lock = threading.Lock()
self._resize_recovery_timer = None
self._resize_recovery_pending = False
# Debounced timer that clears the post-resize suppression once the
# terminal reflow settles, so the status bar returns during idle
# without waiting for the next submitted input.
self._status_bar_unsuppress_timer = None
# Last terminal width seen by the resize handler. Used to distinguish a
# width change (column reflow → possible ghost chrome, needs a viewport
# clear) from a rows-only change (no reflow). None until the first
# resize fires.
self._last_resize_width = None
# Background task tracking: {task_id: threading.Thread}
self._background_tasks: Dict[str, threading.Thread] = {}
@ -3787,15 +3972,112 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
origin and can leave stale prompt glyphs after a narrow resize.
We also flag ``_status_bar_suppressed_after_resize`` so the dynamic
status bar and input separator rules stay hidden until the next user
input. On column shrink the terminal reflows already-rendered status
bar rows into scrollback before prompt_toolkit can erase them; drawing
a fresh full-width bar immediately makes the old and new versions
look duplicated (#19280, #22976). Clearing the suppression on the
next prompt restores the bar cleanly.
status bar and input separator rules stay hidden while the terminal
reflow settles. On column shrink the terminal reflows already-rendered
status bar rows into scrollback before prompt_toolkit can erase them;
drawing a fresh full-width bar immediately makes the old and new
versions look duplicated (#19280, #22976).
Suppression alone is not enough on a WIDTH change. prompt_toolkit's
``renderer.erase()`` does ``cursor_up(_cursor_pos.y)`` + ``erase_down()``
using the ``_cursor_pos.y`` cached from the LAST render at the OLD
width (renderer.py). When the column count shrinks, the terminal
reflows each already-painted full-width chrome row into 2+ physical
rows, so the cached ``y`` undershoots: ``cursor_up`` does not climb
past the reflowed rows and ``erase_down`` leaves the stale bar stranded
ABOVE the live origin. The next paint then stacks a fresh bar below it
the duplicated-status-bar report (two bars, two elapsed readings).
Suppression hides the *new* bar but never erases the already-reflowed
*old* one, so the ghost survives the whole suppression window.
Fix: on a width change, wipe the visible viewport with ``erase_screen``
(CSI 2J) BEFORE delegating to prompt_toolkit's resize, then let its
repaint redraw from a clean origin. This is banner-safe: 2J clears
only the visible screen, NOT scrollback history (that is CSI 3J, which
we do not send here ``rebuild_scrollback=False``), so the startup
banner that scrolled into history is preserved and
``_replay_output_history`` is not needed. Row-count-only changes skip
the clear (no reflow, so no ghost) to avoid an unnecessary repaint.
The suppression is transient: a short follow-up timer clears it and
repaints once the reflow has settled, so the bar returns on its own
during idle. Previously the flag was only cleared on the next
*submitted* user input, so a resize/reflow (tmux pane change, SSH
window restore, font zoom) followed by idle left the status bar hidden
indefinitely even while the refresh clock kept ticking (the dynamic
chrome rendered at height 0 on every repaint). The next-submit clear
at the input loop remains as a fast path.
"""
self._status_bar_suppressed_after_resize = True
# On a WIDTH change the terminal has already reflowed the old full-width
# chrome into extra physical rows that prompt_toolkit's stale-cursor
# erase (cursor_up(_cursor_pos.y) cached at the OLD width) will not
# reach, leaving a duplicated status bar stranded above the live origin.
# Ctrl+L / /redraw clears it cleanly, so route the resize path through
# the SAME recovery: wipe the visible viewport (banner-safe — CSI 2J
# only, never CSI 3J) and replay the transcript so nothing is lost.
# Row-count-only changes skip this (no reflow → no ghost) to avoid an
# unnecessary full repaint.
try:
new_width = self._get_tui_terminal_width()
except Exception:
new_width = None
prev_width = getattr(self, "_last_resize_width", None)
# First resize of the session has no prior width to compare against;
# treat it as a change so an initial maximize/restore is covered too.
width_changed = new_width is not None and new_width != prev_width
if width_changed:
try:
self._clear_prompt_toolkit_screen(app, rebuild_scrollback=False)
_replay_output_history()
except Exception:
pass
if new_width is not None:
self._last_resize_width = new_width
original_on_resize()
self._schedule_status_bar_unsuppress(app)
def _schedule_status_bar_unsuppress(self, app, delay: float = 0.35) -> None:
"""Clear the post-resize status-bar suppression after the reflow settles.
Debounced: a fresh resize cancels the pending unsuppress and restarts
the timer, so a resize storm only repaints the bar once it stops.
"""
try:
old_timer = getattr(self, "_status_bar_unsuppress_timer", None)
if old_timer is not None:
try:
old_timer.cancel()
except Exception:
pass
def _clear():
self._status_bar_suppressed_after_resize = False
try:
app.invalidate()
except Exception:
pass
def _fire():
try:
loop = getattr(app, "loop", None)
except Exception:
loop = None
if loop is not None:
try:
loop.call_soon_threadsafe(_clear)
return
except Exception:
pass
_clear()
timer = threading.Timer(delay, _fire)
timer.daemon = True
self._status_bar_unsuppress_timer = timer
timer.start()
except Exception:
# Fail open: never leave the bar stuck hidden.
self._status_bar_suppressed_after_resize = False
def _schedule_resize_recovery(self, app, original_on_resize, delay: float = 0.12) -> None:
"""Debounce resize redraws so footer chrome is not stamped into scrollback."""
@ -5328,12 +5610,86 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
# Set skip flag (again) so the text-change event fired when the
# editor closes does not re-collapse the returned content.
self._skip_paste_collapse = True
target_buffer.open_in_editor(validate_and_handle=False)
# Open the editor, then submit the saved draft on a clean exit —
# matching the TUI's Ctrl+G (openEditor), which sends the buffer
# instead of requiring a second Enter. Submission in this CLI is
# driven by the custom `enter` keybinding, NOT the buffer's
# accept_handler, so validate_and_handle can't route through it;
# chain a done-callback on the returned Task that re-uses the
# real submit pipeline via _submit_editor_buffer().
task = target_buffer.open_in_editor(validate_and_handle=False)
if task is not None and hasattr(task, "add_done_callback"):
task.add_done_callback(
lambda _t, b=target_buffer: self._submit_editor_buffer(b)
)
return True
except Exception as exc:
_cprint(f"{_DIM}Failed to open external editor: {exc}{_RST}")
return False
def _submit_editor_buffer(self, buffer) -> None:
"""Submit the draft an external editor left in ``buffer``.
Invoked from the Ctrl+G done-callback so saving the editor sends the
prompt (TUI parity) instead of leaving it sitting in the input area.
Mirrors the idle/queue branches of the `enter` keybinding handler:
an empty save is ignored (never submits a blank turn), a slash command
is dispatched, otherwise the text is routed through the same input
queues the normal Enter path uses. Runs on the prompt_toolkit event
loop via the Task callback, so it must be cheap and non-blocking.
"""
try:
text = (getattr(buffer, "text", "") or "").strip()
except Exception:
return
if not text:
# Editor saved empty / was cleared — match the TUI, which drops
# an empty draft instead of submitting a blank turn.
return
app = getattr(self, "_app", None)
# Slash commands: dispatch directly, same as the Enter handler's
# _looks_like_slash_command branch.
if _looks_like_slash_command(text):
try:
if not self.process_command(text):
self._should_exit = True
if app is not None and app.is_running:
app.exit()
except Exception as exc:
_cprint(f" {_DIM}Command failed: {exc}{_RST}")
finally:
self._reset_input_buffer(buffer)
if app is not None:
app.invalidate()
return
# Regular prompt: route through the same queues the Enter handler uses.
if self._agent_running:
# Agent busy → honour the configured busy-input behaviour by
# queueing for the next turn (the safe default; interrupt/steer
# remain reachable via the normal Enter path).
self._interrupt_queue.put(text) if self.busy_input_mode == "interrupt" else self._pending_input.put(text)
preview = text[:80] + ("..." if len(text) > 80 else "")
_cprint(f" Queued for the next turn: {preview}")
else:
self._pending_input.put(text)
self._reset_input_buffer(buffer)
if app is not None:
app.invalidate()
def _reset_input_buffer(self, buffer) -> None:
"""Clear an input buffer after a programmatic submit (best-effort)."""
try:
buffer.reset(append_to_history=True)
except Exception:
try:
buffer.text = ""
except Exception:
pass
def _install_tool_callbacks(self) -> None:
@ -6091,6 +6447,22 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
preview_limit = 400
visible_index = 0
hidden_tool_messages = 0
show_ts = bool(getattr(self, "show_timestamps", False))
def _ts_suffix(message: dict) -> str:
# Messages restored from SessionDB carry a unix `timestamp`; live
# unsaved turns may not. Only annotate when both the toggle is on
# and the turn actually has a stored time — never fabricate one.
if not show_ts:
return ""
ts = message.get("timestamp")
if not ts:
return ""
try:
from datetime import datetime
return f" [{datetime.fromtimestamp(float(ts)).strftime('%H:%M')}]"
except (ValueError, OSError, TypeError):
return ""
def flush_tool_summary():
nonlocal hidden_tool_messages
@ -6124,13 +6496,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
content_text = "" if content is None else str(content)
if role == "user":
print(f"\n [You #{visible_index}]")
print(f"\n [You #{visible_index}]{_ts_suffix(msg)}")
print(
f" {content_text[:preview_limit]}{'...' if len(content_text) > preview_limit else ''}"
)
continue
print(f"\n [Hermes #{visible_index}]")
print(f"\n [Hermes #{visible_index}]{_ts_suffix(msg)}")
tool_calls = msg.get("tool_calls") or []
if content_text:
preview = content_text[:preview_limit]
@ -6994,7 +7366,35 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
_cprint(f"{result.error_message}")
return
if self.agent is not None:
try:
from hermes_cli.context_switch_guard import merge_preflight_compression_warning
merge_preflight_compression_warning(
result,
agent=self.agent,
messages=list(self.conversation_history or []),
config_context_length=getattr(self.agent, "_config_context_length", None),
)
except Exception as exc:
logger.debug("preflight-compression switch warning failed: %s", exc)
old_model = self.model
# Snapshot the CLI-level credential/runtime fields BEFORE mutating them
# so a failed in-place agent swap can roll the whole CLI back to the old
# working model. Otherwise the broken credentials staged below leak into
# the next turn's resolution even though the agent itself rolled back
# (#50163).
_cli_snapshot = {
"model": self.model,
"provider": self.provider,
"requested_provider": self.requested_provider,
"_explicit_api_key": getattr(self, "_explicit_api_key", None),
"_explicit_base_url": getattr(self, "_explicit_base_url", None),
"api_key": self.api_key,
"base_url": self.base_url,
"api_mode": self.api_mode,
}
self.model = result.new_model
self.provider = result.target_provider
self.requested_provider = result.target_provider
@ -7020,7 +7420,17 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
api_mode=result.api_mode,
)
except Exception as exc:
_cprint(f" ⚠ Agent swap failed ({exc}); change applied to next session.")
# The agent rolled itself back to the old working model/client.
# Roll the CLI's own staged fields back too and abort the rest
# of the commit (note + success print) so a failed switch is a
# no-op rather than a dead session (#50163).
for _k, _v in _cli_snapshot.items():
setattr(self, _k, _v)
_cprint(
f" ⚠ Model switch to {result.new_model} failed ({exc}); "
f"staying on {old_model}."
)
return
self._pending_model_switch_note = (
f"[Note: model was just switched from {old_model} to {result.new_model} "
@ -7144,24 +7554,43 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._close_model_picker()
def _handle_model_switch(self, cmd_original: str):
"""Handle /model command — switch model for this session.
"""Handle /model command — switch model.
Supports:
/model show current model + usage hints
/model <name> switch for this session only
/model <name> --global switch and persist to config.yaml
/model <name> switch model (persists by default)
/model <name> --session switch for this session only
/model <name> --global switch and persist (explicit)
/model <name> --provider <provider> switch provider + model
/model --provider <provider> switch to provider, auto-detect model
Persistence defaults to on (``model.persist_switch_by_default`` in
config.yaml, default True). Use ``--session`` for a one-off switch.
"""
from hermes_cli.model_switch import switch_model, parse_model_flags
from hermes_cli.model_switch import (
switch_model,
parse_model_flags,
resolve_persist_behavior,
)
from hermes_cli.providers import get_label
# Parse args from the original command
parts = cmd_original.split(None, 1) # split off '/model'
raw_args = parts[1].strip() if len(parts) > 1 else ""
# Parse --provider, --global, and --refresh flags
model_input, explicit_provider, persist_global, force_refresh = parse_model_flags(raw_args)
# Parse --provider, --global, --session, and --refresh flags
(
model_input,
explicit_provider,
is_global_flag,
force_refresh,
is_session,
) = parse_model_flags(raw_args)
# Resolve the effective persistence once: --session overrides the
# config-gated default, --global forces persist, otherwise defer to
# model.persist_switch_by_default (defaults to True so /model survives
# across sessions).
persist_global = resolve_persist_behavior(is_global_flag, is_session)
# --refresh: wipe the on-disk picker cache before building the
# provider list. Forces a live re-fetch of every authed provider's
@ -7209,7 +7638,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
if not providers:
_cprint(" No authenticated providers found.")
_cprint("")
_cprint(" /model <name> switch model")
_cprint(" /model <name> switch model (persists)")
_cprint(" /model <name> --session switch for this session only")
_cprint(" /model --provider <slug> switch provider")
_cprint(" /model --refresh re-fetch live model lists")
return
@ -7240,6 +7670,19 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
_cprint(f"{result.error_message}")
return
if self.agent is not None:
try:
from hermes_cli.context_switch_guard import merge_preflight_compression_warning
merge_preflight_compression_warning(
result,
agent=self.agent,
messages=list(self.conversation_history or []),
config_context_length=getattr(self.agent, "_config_context_length", None),
)
except Exception as exc:
logger.debug("preflight-compression switch warning failed: %s", exc)
if not self._confirm_expensive_model_switch(result):
_cprint(" Model switch cancelled.")
return
@ -7248,6 +7691,18 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
# Update requested_provider so _ensure_runtime_credentials() doesn't
# overwrite the switch on the next turn (it re-resolves from this).
old_model = self.model
# Snapshot CLI-level fields before mutation so a failed in-place swap
# rolls the whole CLI back to the old working model (#50163).
_cli_snapshot = {
"model": self.model,
"provider": self.provider,
"requested_provider": self.requested_provider,
"_explicit_api_key": getattr(self, "_explicit_api_key", None),
"_explicit_base_url": getattr(self, "_explicit_base_url", None),
"api_key": self.api_key,
"base_url": self.base_url,
"api_mode": self.api_mode,
}
self.model = result.new_model
self.provider = result.target_provider
self.requested_provider = result.target_provider
@ -7274,7 +7729,15 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
api_mode=result.api_mode,
)
except Exception as exc:
_cprint(f" ⚠ Agent swap failed ({exc}); change applied to next session.")
# Agent rolled itself back; roll the CLI back too and abort so a
# failed switch is a no-op rather than a dead session (#50163).
for _k, _v in _cli_snapshot.items():
setattr(self, _k, _v)
_cprint(
f" ⚠ Model switch to {result.new_model} failed ({exc}); "
f"staying on {old_model}."
)
return
# Store a note to prepend to the next user message so the model
# knows a switch occurred (avoids injecting system messages mid-history
@ -7329,7 +7792,7 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
save_config_value("model.default", result.new_model)
if result.provider_changed:
save_config_value("model.provider", result.target_provider)
_cprint(" Saved to config.yaml (--global)")
_cprint(" Saved to config.yaml")
else:
_cprint(" (session only — add --global to persist)")
@ -7700,8 +8163,6 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._handle_model_switch(cmd_original)
elif canonical == "codex-runtime":
self._handle_codex_runtime(cmd_original)
elif canonical == "gquota":
self._handle_gquota_command(cmd_original)
elif canonical == "personality":
# Use original case (handler lowercases the personality name itself)
@ -7713,6 +8174,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
if retry_msg and hasattr(self, '_pending_input'):
# Re-queue the message so process_loop sends it to the agent
self._pending_input.put(retry_msg)
elif canonical == "prompt":
self._handle_prompt_compose_command(cmd_original)
elif canonical == "undo":
# Parse optional turn count: "/undo" → 1, "/undo 3" → 3.
_undo_n = 1
@ -7764,6 +8227,8 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._status_bar_visible = not self._status_bar_visible
state = "visible" if self._status_bar_visible else "hidden"
self._console_print(f" Status bar {state}")
elif canonical == "timestamps":
self._handle_timestamps_command(cmd_original)
elif canonical == "verbose":
self._toggle_verbose()
elif canonical == "footer":
@ -9710,16 +10175,35 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
else:
print(f" 🔧 {len(new_tools)} tool(s) available from {len(connected_servers)} server(s)")
# Refresh the agent's tool list so the model can call new tools
# Refresh the agent's tool list so the model can call new tools.
# Route through the shared helper so this CLI /reload-mcp path stays
# in lockstep with the TUI RPC / gateway reload / late-binding paths
# (name-diff, thread-safe, and — critically — additive-preserving so
# memory-provider and context-engine tools survive the rebuild).
if self.agent is not None:
self.agent.tools = get_tool_definitions(
enabled_toolsets=self.agent.enabled_toolsets
if hasattr(self.agent, "enabled_toolsets") else None,
from tools.mcp_tool import refresh_agent_mcp_tools
# Explicit reload: pick up MCP servers the user ENABLED in config
# this session. self.enabled_toolsets was resolved once at
# startup; merge in any now-connected server names (unless the
# user pinned `all`/`*`, which already includes everything) so a
# freshly-added server isn't filtered out. Mirrors startup, where
# MCP server names are part of enabled_toolsets (see __init__).
enabled_override = None
et = self.enabled_toolsets
if et and "all" not in et and "*" not in et:
merged = list(et)
for _name in sorted(connected_servers):
if _name not in merged:
merged.append(_name)
enabled_override = merged
refresh_agent_mcp_tools(
self.agent,
enabled_override=enabled_override,
quiet_mode=True,
)
self.agent.valid_tool_names = {
tool["function"]["name"] for tool in self.agent.tools
} if self.agent.tools else set()
# Keep the CLI's own list in sync with what the agent now uses.
if enabled_override is not None:
self.enabled_toolsets = enabled_override
# Inject a message at the END of conversation history so the
# model knows tools changed. Appended after all existing
@ -11400,11 +11884,12 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
r_fill = w - 2 - len(r_label)
r_top = f"{_DIM}┌─{r_label}{'' * max(r_fill - 1, 0)}{_RST}"
r_bot = f"{_DIM}{'' * (w - 2)}{_RST}"
# Collapse long reasoning: show first 10 lines
# Collapse long reasoning to the first 10 lines unless the
# user opted into full display via /reasoning full.
lines = reasoning.strip().splitlines()
if len(lines) > 10:
if len(lines) > 10 and not getattr(self, "reasoning_full", False):
display_reasoning = "\n".join(lines[:10])
display_reasoning += f"\n{_DIM} ... ({len(lines) - 10} more lines){_RST}"
display_reasoning += f"\n{_DIM} ... ({len(lines) - 10} more lines — /reasoning full to show){_RST}"
else:
display_reasoning = reasoning.strip()
_cprint(f"\n{r_top}\n{_DIM}{display_reasoning}{_RST}\n{r_bot}")
@ -11554,6 +12039,36 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
except Exception:
pass
def _persist_active_session_before_close(self):
"""Best-effort SQLite/JSON flush before the CLI marks a session closed.
``run_conversation()`` normally persists at turn boundaries, but a
terminal close/SIGHUP/SIGTERM can unwind the prompt_toolkit app while
the agent thread still holds the current turn only in memory. Flush the
agent's live ``_session_messages`` before ``end_session()`` so resume,
session_search, and state.db do not lose the interrupted turn.
"""
agent = getattr(self, "agent", None)
if not agent or not hasattr(agent, "_persist_session"):
return
messages = getattr(agent, "_session_messages", None)
if not isinstance(messages, list):
messages = getattr(self, "conversation_history", None)
if not isinstance(messages, list) or not messages:
return
conversation_history = getattr(self, "conversation_history", None)
if not isinstance(conversation_history, list):
conversation_history = messages
try:
agent._persist_session(messages, conversation_history)
if getattr(agent, "session_id", None):
self.session_id = agent.session_id
except (Exception, KeyboardInterrupt) as e:
logger.debug("Could not persist active CLI session before close: %s", e)
def _print_exit_summary(self):
"""Print session resume info on exit, similar to Claude Code."""
# Clear the screen + scrollback before printing the summary so the
@ -12114,7 +12629,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
# --- /model picker modal ---
if self._model_picker_state:
try:
self._handle_model_picker_selection()
# Picker selections persist by default (same default as
# /model <name>); honour model.persist_switch_by_default.
from hermes_cli.model_switch import resolve_persist_behavior
self._handle_model_picker_selection(
persist_global=resolve_persist_behavior(False, False)
)
except Exception as _exc:
_cprint(f" ✗ Model selection failed: {_exc}")
self._close_model_picker()
@ -13734,13 +14255,13 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
style=style,
full_screen=False,
mouse_support=False,
# The status bar contains wall-clock read-outs (live prompt elapsed
# and idle-since-last-turn). Once a turn finishes there may be no
# further events to invalidate the app, so prompt_toolkit would keep
# rendering the first post-turn value (usually ``✓ 0s``) forever.
# A low-rate refresh keeps the clock honest without reintroducing a
# custom repaint thread or touching conversation state.
refresh_interval=1.0,
# Read from display.cli_refresh_interval (default 0 = disabled).
# When non-zero, prompt_toolkit redraws the UI on this cadence
# during idle, keeping wall-clock status-bar read-outs ticking.
# Set to 0 to suppress background redraws entirely — avoids
# fighting terminal auto-scroll in non-fullscreen mode (Xshell,
# iTerm2, Windows Terminal). See #48309.
refresh_interval=float(CLI_CONFIG.get("display", {}).get("cli_refresh_interval", 0)),
# Erase the live bottom chrome (status bar, input box, separator
# rules) on exit instead of freezing a final copy into scrollback.
# Without this, prompt_toolkit's render_as_done teardown repaints
@ -14262,6 +14783,12 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
set_sudo_password_callback(None)
set_approval_callback(None)
set_secret_capture_callback(None)
# Flush any in-memory turn transcript before marking the session
# closed. On SIGHUP/SIGTERM/window close the agent thread may not
# reach its normal run_conversation() persistence path before the
# daemon thread is reaped.
self._persist_active_session_before_close()
# Close session in SQLite
if hasattr(self, '_session_db') and self._session_db and self.agent:
try:
@ -14509,7 +15036,11 @@ def main(
_repo = _git_repo_root()
if _repo:
_prune_stale_worktrees(_repo)
wt_info = _setup_worktree()
# Branch the worktree from the freshly-fetched remote tip by
# default so it starts current with the project. Opt out with
# worktree_sync: false to branch from local HEAD instead.
_sync_base = CLI_CONFIG.get("worktree_sync", True)
wt_info = _setup_worktree(sync_base=_sync_base)
if wt_info:
_active_worktree = wt_info
os.environ["TERMINAL_CWD"] = wt_info["path"]