fix(cli): sanitize interactive command output

This commit is contained in:
helix4u 2026-04-17 13:51:14 -06:00 committed by kshitij
parent 175cf7e6bb
commit c94d26c69b
3 changed files with 94 additions and 52 deletions

111
cli.py
View file

@ -1810,7 +1810,7 @@ class HermesCLI:
mcp_names = set((CLI_CONFIG.get("mcp_servers") or {}).keys())
invalid = [t for t in toolsets if not validate_toolset(t) and t not in mcp_names]
if invalid:
self.console.print(f"[bold red]Warning: Unknown toolsets: {', '.join(invalid)}[/]")
self._console_print(f"[bold red]Warning: Unknown toolsets: {', '.join(invalid)}[/]")
# Filesystem checkpoints: CLI flag > config
cp_cfg = CLI_CONFIG.get("checkpoints", {})
@ -2261,7 +2261,7 @@ class HermesCLI:
normalized_model = normalize_model_for_provider(current_model, resolved_provider)
if normalized_model and normalized_model != current_model:
if not self._model_is_default:
self.console.print(
self._console_print(
f"[yellow]⚠️ Normalized model '{current_model}' to '{normalized_model}' for {resolved_provider}.[/]"
)
self.model = normalized_model
@ -2277,7 +2277,7 @@ class HermesCLI:
canonical = normalize_copilot_model_id(current_model, api_key=self.api_key)
if canonical and canonical != current_model:
if not self._model_is_default:
self.console.print(
self._console_print(
f"[yellow]⚠️ Normalized Copilot model '{current_model}' to '{canonical}'.[/]"
)
self.model = canonical
@ -2299,7 +2299,7 @@ class HermesCLI:
canonical = normalize_opencode_model_id(resolved_provider, current_model)
if canonical and canonical != current_model:
if not self._model_is_default:
self.console.print(
self._console_print(
f"[yellow]⚠️ Stripped provider prefix from '{current_model}'; using '{canonical}' for {resolved_provider}.[/]"
)
self.model = canonical
@ -2321,7 +2321,7 @@ class HermesCLI:
if "/" in current_model:
slug = current_model.split("/", 1)[1]
if not self._model_is_default:
self.console.print(
self._console_print(
f"[yellow]⚠️ Stripped provider prefix from '{current_model}'; "
f"using '{slug}' for OpenAI Codex.[/]"
)
@ -3070,7 +3070,7 @@ class HermesCLI:
use_compact = self.compact or term_width < 80
if use_compact:
self.console.print(_build_compact_banner())
self._console_print(_build_compact_banner())
self._show_status()
else:
# Get tools for display
@ -3095,25 +3095,25 @@ class HermesCLI:
# Warn about very low context lengths (common with local servers)
if ctx_len and ctx_len <= 8192:
self.console.print()
self.console.print(
self._console_print()
self._console_print(
f"[yellow]⚠️ Context length is only {ctx_len:,} tokens — "
f"this is likely too low for agent use with tools.[/]"
)
self.console.print(
self._console_print(
"[dim] Hermes needs 16k32k minimum. Tool schemas + system prompt alone use ~4k8k.[/]"
)
base_url = getattr(self, "base_url", "") or ""
if "11434" in base_url or "ollama" in base_url.lower():
self.console.print(
self._console_print(
"[dim] Ollama fix: OLLAMA_CONTEXT_LENGTH=32768 ollama serve[/]"
)
elif "1234" in base_url:
self.console.print(
self._console_print(
"[dim] LM Studio fix: Set context length in model settings → reload model[/]"
)
else:
self.console.print(
self._console_print(
"[dim] Fix: Set model.context_length in config.yaml, or increase your server's context setting[/]"
)
@ -3122,20 +3122,20 @@ class HermesCLI:
model_name = getattr(self, "model", "") or ""
if is_nous_hermes_non_agentic(model_name):
self.console.print()
self.console.print(
self._console_print()
self._console_print(
"[bold yellow]⚠ Nous Research Hermes 3 & 4 models are NOT agentic and are not "
"designed for use with Hermes Agent.[/]"
)
self.console.print(
self._console_print(
"[dim] They lack tool-calling capabilities required for agent workflows. "
"Consider using an agentic model (Claude, GPT, Gemini, DeepSeek, etc.).[/]"
)
self.console.print(
self._console_print(
"[dim] Switch with: /model sonnet or /model gpt5[/]"
)
self.console.print()
self._console_print()
def _preload_resumed_session(self) -> bool:
"""Load a resumed session's history from the DB early (before first chat).
@ -3153,10 +3153,10 @@ class HermesCLI:
session_meta = self._session_db.get_session(self.session_id)
if not session_meta:
self.console.print(
self._console_print(
f"[bold red]Session not found: {self.session_id}[/]"
)
self.console.print(
self._console_print(
"[dim]Use a session ID from a previous CLI run "
"(hermes sessions list).[/]"
)
@ -3171,7 +3171,7 @@ class HermesCLI:
if session_meta.get("title"):
title_part = f' "{session_meta["title"]}"'
accent_color = _accent_hex()
self.console.print(
self._console_print(
f"[{accent_color}]↻ Resumed session [bold]{self.session_id}[/bold]"
f"{title_part} "
f"({msg_count} user message{'s' if msg_count != 1 else ''}, "
@ -3179,7 +3179,7 @@ class HermesCLI:
)
else:
accent_color = _accent_hex()
self.console.print(
self._console_print(
f"[{accent_color}]Session {self.session_id} found but has no "
f"messages. Starting fresh.[/]"
)
@ -3354,7 +3354,7 @@ class HermesCLI:
padding=(0, 1),
style=_history_text_c,
)
self.console.print(panel)
self._console_print(panel)
def _try_attach_clipboard_image(self) -> bool:
"""Check clipboard for an image and attach it if found.
@ -3790,14 +3790,14 @@ class HermesCLI:
api_key_missing = [u for u in unavailable if u["missing_vars"]]
if api_key_missing:
self.console.print()
self.console.print("[yellow]⚠️ Some tools disabled (missing API keys):[/]")
self._console_print()
self._console_print("[yellow]⚠️ Some tools disabled (missing API keys):[/]")
for item in api_key_missing:
tools_str = ", ".join(item["tools"][:2]) # Show first 2 tools
if len(item["tools"]) > 2:
tools_str += f", +{len(item['tools'])-2} more"
self.console.print(f" [dim]• {item['name']}[/] [dim italic]({', '.join(item['missing_vars'])})[/]")
self.console.print("[dim] Run 'hermes setup' to configure[/]")
self._console_print(f" [dim]• {item['name']}[/] [dim italic]({', '.join(item['missing_vars'])})[/]")
self._console_print("[dim] Run 'hermes setup' to configure[/]")
except Exception:
pass # Don't crash on import errors
@ -3835,7 +3835,7 @@ class HermesCLI:
if self._provider_source:
provider_info += f" [dim {separator_color}]·[/] [dim]auth: {self._provider_source}[/]"
self.console.print(
self._console_print(
f" {api_indicator} [{accent_color}]{model_short}[/] "
f"[dim {separator_color}]·[/] [bold {label_color}]{tool_count} tools[/]"
f"{toolsets_info}{provider_info}"
@ -3892,7 +3892,7 @@ class HermesCLI:
f"Tokens: {total_tokens:,}",
f"Agent Running: {'Yes' if is_running else 'No'}",
])
self.console.print("\n".join(lines), highlight=False, markup=False)
self._console_print("\n".join(lines), highlight=False, markup=False)
def _fast_command_available(self) -> bool:
try:
@ -5090,8 +5090,15 @@ class HermesCLI:
print(" To change model or provider, use: hermes model")
def _output_console(self):
"""Use prompt_toolkit-safe Rich rendering once the TUI is live."""
if getattr(self, "_app", None):
return ChatConsole()
return self.console
def _console_print(self, *args, **kwargs):
"""Print through the active command-safe console."""
self._output_console().print(*args, **kwargs)
@staticmethod
def _resolve_personality_prompt(value) -> str:
@ -5111,14 +5118,14 @@ class HermesCLI:
from agent.google_oauth import get_valid_access_token, GoogleOAuthError, load_credentials
from agent.google_code_assist import retrieve_user_quota, CodeAssistError
except ImportError as exc:
self.console.print(f" [red]Gemini modules unavailable: {exc}[/]")
self._console_print(f" [red]Gemini modules unavailable: {exc}[/]")
return
try:
access_token = get_valid_access_token()
except GoogleOAuthError as exc:
self.console.print(f" [yellow]{exc}[/]")
self.console.print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.")
self._console_print(f" [yellow]{exc}[/]")
self._console_print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.")
return
creds = load_credentials()
@ -5127,18 +5134,18 @@ class HermesCLI:
try:
buckets = retrieve_user_quota(access_token, project_id=project_id)
except CodeAssistError as exc:
self.console.print(f" [red]Quota lookup failed:[/] {exc}")
self._console_print(f" [red]Quota lookup failed:[/] {exc}")
return
if not buckets:
self.console.print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]")
self._console_print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]")
return
# Sort for stable display, group by model
buckets.sort(key=lambda b: (b.model_id, b.token_type))
self.console.print()
self.console.print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})")
self.console.print()
self._console_print()
self._console_print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})")
self._console_print()
for b in buckets:
pct = max(0.0, min(1.0, b.remaining_fraction))
width = 20
@ -5148,8 +5155,8 @@ class HermesCLI:
header = b.model_id
if b.token_type:
header += f" [{b.token_type}]"
self.console.print(f" {header:40s} {bar} {pct_str}")
self.console.print()
self._console_print(f" {header:40s} {bar} {pct_str}")
self._console_print()
def _handle_personality_command(self, cmd: str):
"""Handle the /personality command to set predefined personalities."""
@ -5597,7 +5604,7 @@ class HermesCLI:
_tip_color = get_active_skin().get_color("banner_dim", "#B8860B")
except Exception:
_tip_color = "#B8860B"
self.console.print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]")
self._console_print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]")
except Exception:
pass
elif canonical == "history":
@ -5691,7 +5698,7 @@ class HermesCLI:
elif canonical == "statusbar":
self._status_bar_visible = not self._status_bar_visible
state = "visible" if self._status_bar_visible else "hidden"
self.console.print(f" Status bar {state}")
self._console_print(f" Status bar {state}")
elif canonical == "verbose":
self._toggle_verbose()
elif canonical == "yolo":
@ -5814,15 +5821,15 @@ class HermesCLI:
)
output = result.stdout.strip() or result.stderr.strip()
if output:
self.console.print(_rich_text_from_ansi(output))
self._console_print(_rich_text_from_ansi(output))
else:
self.console.print("[dim]Command returned no output[/]")
self._console_print("[dim]Command returned no output[/]")
except subprocess.TimeoutExpired:
self.console.print("[bold red]Quick command timed out (30s)[/]")
self._console_print("[bold red]Quick command timed out (30s)[/]")
except Exception as e:
self.console.print(f"[bold red]Quick command error: {e}[/]")
self._console_print(f"[bold red]Quick command error: {e}[/]")
else:
self.console.print(f"[bold red]Quick command '{base_cmd}' has no command defined[/]")
self._console_print(f"[bold red]Quick command '{base_cmd}' has no command defined[/]")
elif qcmd.get("type") == "alias":
target = qcmd.get("target", "").strip()
if target:
@ -5831,9 +5838,9 @@ class HermesCLI:
aliased_command = f"{target} {user_args}".strip()
return self.process_command(aliased_command)
else:
self.console.print(f"[bold red]Quick command '{base_cmd}' has no target defined[/]")
self._console_print(f"[bold red]Quick command '{base_cmd}' has no target defined[/]")
else:
self.console.print(f"[bold red]Quick command '{base_cmd}' has unsupported type (supported: 'exec', 'alias')[/]")
self._console_print(f"[bold red]Quick command '{base_cmd}' has unsupported type (supported: 'exec', 'alias')[/]")
# Check for plugin-registered slash commands
elif base_cmd.lstrip("/") in _get_plugin_cmd_handler_names():
from hermes_cli.plugins import get_plugin_command_handler
@ -8603,7 +8610,7 @@ class HermesCLI:
except Exception:
_welcome_text = "Welcome to Hermes Agent! Type your message or /help for commands."
_welcome_color = "#FFF8DC"
self.console.print(f"[{_welcome_color}]{_welcome_text}[/]")
self._console_print(f"[{_welcome_color}]{_welcome_text}[/]")
# Show a random tip to help users discover features
try:
from hermes_cli.tips import get_random_tip
@ -8612,16 +8619,16 @@ class HermesCLI:
_tip_color = _welcome_skin.get_color("banner_dim", "#B8860B")
except Exception:
_tip_color = "#B8860B"
self.console.print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]")
self._console_print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]")
except Exception:
pass # Tips are non-critical — never break startup
if self.preloaded_skills and not self._startup_skills_line_shown:
skills_label = ", ".join(self.preloaded_skills)
self.console.print(
self._console_print(
f"[bold {_accent_hex()}]Activated skills:[/] {skills_label}"
)
self._startup_skills_line_shown = True
self.console.print()
self._console_print()
# State for async operation
self._agent_running = False

View file

@ -0,0 +1,21 @@
from unittest.mock import MagicMock, patch
def test_gquota_uses_chat_console_when_tui_is_live():
from agent.google_oauth import GoogleOAuthError
from cli import HermesCLI
cli = HermesCLI.__new__(HermesCLI)
cli.console = MagicMock()
cli._app = object()
live_console = MagicMock()
with patch("cli.ChatConsole", return_value=live_console), \
patch("agent.google_oauth.get_valid_access_token", side_effect=GoogleOAuthError("No Google OAuth credentials found")), \
patch("agent.google_oauth.load_credentials", return_value=None), \
patch("agent.google_code_assist.retrieve_user_quota"):
cli._handle_gquota_command("/gquota")
assert live_console.print.call_count == 2
cli.console.print.assert_not_called()

View file

@ -33,6 +33,20 @@ class TestCLIQuickCommands:
printed = self._printed_plain(cli.console.print.call_args[0][0])
assert printed == "daily-note"
def test_exec_command_uses_chat_console_when_tui_is_live(self):
cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}})
cli._app = object()
live_console = MagicMock()
with patch("cli.ChatConsole", return_value=live_console):
result = cli.process_command("/dn")
assert result is True
live_console.print.assert_called_once()
printed = self._printed_plain(live_console.print.call_args[0][0])
assert printed == "daily-note"
cli.console.print.assert_not_called()
def test_exec_command_stderr_shown_on_no_stdout(self):
cli = self._make_cli({"err": {"type": "exec", "command": "echo error >&2"}})
result = cli.process_command("/err")