diff --git a/cli.py b/cli.py index c9ce95e9f..e814e35b1 100644 --- a/cli.py +++ b/cli.py @@ -1810,7 +1810,7 @@ class HermesCLI: mcp_names = set((CLI_CONFIG.get("mcp_servers") or {}).keys()) invalid = [t for t in toolsets if not validate_toolset(t) and t not in mcp_names] if invalid: - self.console.print(f"[bold red]Warning: Unknown toolsets: {', '.join(invalid)}[/]") + self._console_print(f"[bold red]Warning: Unknown toolsets: {', '.join(invalid)}[/]") # Filesystem checkpoints: CLI flag > config cp_cfg = CLI_CONFIG.get("checkpoints", {}) @@ -2261,7 +2261,7 @@ class HermesCLI: normalized_model = normalize_model_for_provider(current_model, resolved_provider) if normalized_model and normalized_model != current_model: if not self._model_is_default: - self.console.print( + self._console_print( f"[yellow]⚠️ Normalized model '{current_model}' to '{normalized_model}' for {resolved_provider}.[/]" ) self.model = normalized_model @@ -2277,7 +2277,7 @@ class HermesCLI: canonical = normalize_copilot_model_id(current_model, api_key=self.api_key) if canonical and canonical != current_model: if not self._model_is_default: - self.console.print( + self._console_print( f"[yellow]⚠️ Normalized Copilot model '{current_model}' to '{canonical}'.[/]" ) self.model = canonical @@ -2299,7 +2299,7 @@ class HermesCLI: canonical = normalize_opencode_model_id(resolved_provider, current_model) if canonical and canonical != current_model: if not self._model_is_default: - self.console.print( + self._console_print( f"[yellow]⚠️ Stripped provider prefix from '{current_model}'; using '{canonical}' for {resolved_provider}.[/]" ) self.model = canonical @@ -2321,7 +2321,7 @@ class HermesCLI: if "/" in current_model: slug = current_model.split("/", 1)[1] if not self._model_is_default: - self.console.print( + self._console_print( f"[yellow]⚠️ Stripped provider prefix from '{current_model}'; " f"using '{slug}' for OpenAI Codex.[/]" ) @@ -3070,7 +3070,7 @@ class HermesCLI: use_compact = self.compact or term_width < 80 if use_compact: - self.console.print(_build_compact_banner()) + self._console_print(_build_compact_banner()) self._show_status() else: # Get tools for display @@ -3095,25 +3095,25 @@ class HermesCLI: # Warn about very low context lengths (common with local servers) if ctx_len and ctx_len <= 8192: - self.console.print() - self.console.print( + self._console_print() + self._console_print( f"[yellow]⚠️ Context length is only {ctx_len:,} tokens — " f"this is likely too low for agent use with tools.[/]" ) - self.console.print( + self._console_print( "[dim] Hermes needs 16k–32k minimum. Tool schemas + system prompt alone use ~4k–8k.[/]" ) base_url = getattr(self, "base_url", "") or "" if "11434" in base_url or "ollama" in base_url.lower(): - self.console.print( + self._console_print( "[dim] Ollama fix: OLLAMA_CONTEXT_LENGTH=32768 ollama serve[/]" ) elif "1234" in base_url: - self.console.print( + self._console_print( "[dim] LM Studio fix: Set context length in model settings → reload model[/]" ) else: - self.console.print( + self._console_print( "[dim] Fix: Set model.context_length in config.yaml, or increase your server's context setting[/]" ) @@ -3122,20 +3122,20 @@ class HermesCLI: model_name = getattr(self, "model", "") or "" if is_nous_hermes_non_agentic(model_name): - self.console.print() - self.console.print( + self._console_print() + self._console_print( "[bold yellow]⚠ Nous Research Hermes 3 & 4 models are NOT agentic and are not " "designed for use with Hermes Agent.[/]" ) - self.console.print( + self._console_print( "[dim] They lack tool-calling capabilities required for agent workflows. " "Consider using an agentic model (Claude, GPT, Gemini, DeepSeek, etc.).[/]" ) - self.console.print( + self._console_print( "[dim] Switch with: /model sonnet or /model gpt5[/]" ) - self.console.print() + self._console_print() def _preload_resumed_session(self) -> bool: """Load a resumed session's history from the DB early (before first chat). @@ -3153,10 +3153,10 @@ class HermesCLI: session_meta = self._session_db.get_session(self.session_id) if not session_meta: - self.console.print( + self._console_print( f"[bold red]Session not found: {self.session_id}[/]" ) - self.console.print( + self._console_print( "[dim]Use a session ID from a previous CLI run " "(hermes sessions list).[/]" ) @@ -3171,7 +3171,7 @@ class HermesCLI: if session_meta.get("title"): title_part = f' "{session_meta["title"]}"' accent_color = _accent_hex() - self.console.print( + self._console_print( f"[{accent_color}]↻ Resumed session [bold]{self.session_id}[/bold]" f"{title_part} " f"({msg_count} user message{'s' if msg_count != 1 else ''}, " @@ -3179,7 +3179,7 @@ class HermesCLI: ) else: accent_color = _accent_hex() - self.console.print( + self._console_print( f"[{accent_color}]Session {self.session_id} found but has no " f"messages. Starting fresh.[/]" ) @@ -3354,7 +3354,7 @@ class HermesCLI: padding=(0, 1), style=_history_text_c, ) - self.console.print(panel) + self._console_print(panel) def _try_attach_clipboard_image(self) -> bool: """Check clipboard for an image and attach it if found. @@ -3790,14 +3790,14 @@ class HermesCLI: api_key_missing = [u for u in unavailable if u["missing_vars"]] if api_key_missing: - self.console.print() - self.console.print("[yellow]⚠️ Some tools disabled (missing API keys):[/]") + self._console_print() + self._console_print("[yellow]⚠️ Some tools disabled (missing API keys):[/]") for item in api_key_missing: tools_str = ", ".join(item["tools"][:2]) # Show first 2 tools if len(item["tools"]) > 2: tools_str += f", +{len(item['tools'])-2} more" - self.console.print(f" [dim]• {item['name']}[/] [dim italic]({', '.join(item['missing_vars'])})[/]") - self.console.print("[dim] Run 'hermes setup' to configure[/]") + self._console_print(f" [dim]• {item['name']}[/] [dim italic]({', '.join(item['missing_vars'])})[/]") + self._console_print("[dim] Run 'hermes setup' to configure[/]") except Exception: pass # Don't crash on import errors @@ -3835,7 +3835,7 @@ class HermesCLI: if self._provider_source: provider_info += f" [dim {separator_color}]·[/] [dim]auth: {self._provider_source}[/]" - self.console.print( + self._console_print( f" {api_indicator} [{accent_color}]{model_short}[/] " f"[dim {separator_color}]·[/] [bold {label_color}]{tool_count} tools[/]" f"{toolsets_info}{provider_info}" @@ -3892,7 +3892,7 @@ class HermesCLI: f"Tokens: {total_tokens:,}", f"Agent Running: {'Yes' if is_running else 'No'}", ]) - self.console.print("\n".join(lines), highlight=False, markup=False) + self._console_print("\n".join(lines), highlight=False, markup=False) def _fast_command_available(self) -> bool: try: @@ -5090,8 +5090,15 @@ class HermesCLI: print(" To change model or provider, use: hermes model") + def _output_console(self): + """Use prompt_toolkit-safe Rich rendering once the TUI is live.""" + if getattr(self, "_app", None): + return ChatConsole() + return self.console - + def _console_print(self, *args, **kwargs): + """Print through the active command-safe console.""" + self._output_console().print(*args, **kwargs) @staticmethod def _resolve_personality_prompt(value) -> str: @@ -5111,14 +5118,14 @@ class HermesCLI: from agent.google_oauth import get_valid_access_token, GoogleOAuthError, load_credentials from agent.google_code_assist import retrieve_user_quota, CodeAssistError except ImportError as exc: - self.console.print(f" [red]Gemini modules unavailable: {exc}[/]") + self._console_print(f" [red]Gemini modules unavailable: {exc}[/]") return try: access_token = get_valid_access_token() except GoogleOAuthError as exc: - self.console.print(f" [yellow]{exc}[/]") - self.console.print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.") + self._console_print(f" [yellow]{exc}[/]") + self._console_print(" Run [bold]/model[/] and pick 'Google Gemini (OAuth)' to sign in.") return creds = load_credentials() @@ -5127,18 +5134,18 @@ class HermesCLI: try: buckets = retrieve_user_quota(access_token, project_id=project_id) except CodeAssistError as exc: - self.console.print(f" [red]Quota lookup failed:[/] {exc}") + self._console_print(f" [red]Quota lookup failed:[/] {exc}") return if not buckets: - self.console.print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]") + self._console_print(" [dim]No quota buckets reported (account may be on legacy/unmetered tier).[/]") return # Sort for stable display, group by model buckets.sort(key=lambda b: (b.model_id, b.token_type)) - self.console.print() - self.console.print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})") - self.console.print() + self._console_print() + self._console_print(f" [bold]Gemini Code Assist quota[/] (project: {project_id or '(auto / free-tier)'})") + self._console_print() for b in buckets: pct = max(0.0, min(1.0, b.remaining_fraction)) width = 20 @@ -5148,8 +5155,8 @@ class HermesCLI: header = b.model_id if b.token_type: header += f" [{b.token_type}]" - self.console.print(f" {header:40s} {bar} {pct_str}") - self.console.print() + self._console_print(f" {header:40s} {bar} {pct_str}") + self._console_print() def _handle_personality_command(self, cmd: str): """Handle the /personality command to set predefined personalities.""" @@ -5597,7 +5604,7 @@ class HermesCLI: _tip_color = get_active_skin().get_color("banner_dim", "#B8860B") except Exception: _tip_color = "#B8860B" - self.console.print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]") + self._console_print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]") except Exception: pass elif canonical == "history": @@ -5691,7 +5698,7 @@ class HermesCLI: elif canonical == "statusbar": self._status_bar_visible = not self._status_bar_visible state = "visible" if self._status_bar_visible else "hidden" - self.console.print(f" Status bar {state}") + self._console_print(f" Status bar {state}") elif canonical == "verbose": self._toggle_verbose() elif canonical == "yolo": @@ -5814,15 +5821,15 @@ class HermesCLI: ) output = result.stdout.strip() or result.stderr.strip() if output: - self.console.print(_rich_text_from_ansi(output)) + self._console_print(_rich_text_from_ansi(output)) else: - self.console.print("[dim]Command returned no output[/]") + self._console_print("[dim]Command returned no output[/]") except subprocess.TimeoutExpired: - self.console.print("[bold red]Quick command timed out (30s)[/]") + self._console_print("[bold red]Quick command timed out (30s)[/]") except Exception as e: - self.console.print(f"[bold red]Quick command error: {e}[/]") + self._console_print(f"[bold red]Quick command error: {e}[/]") else: - self.console.print(f"[bold red]Quick command '{base_cmd}' has no command defined[/]") + self._console_print(f"[bold red]Quick command '{base_cmd}' has no command defined[/]") elif qcmd.get("type") == "alias": target = qcmd.get("target", "").strip() if target: @@ -5831,9 +5838,9 @@ class HermesCLI: aliased_command = f"{target} {user_args}".strip() return self.process_command(aliased_command) else: - self.console.print(f"[bold red]Quick command '{base_cmd}' has no target defined[/]") + self._console_print(f"[bold red]Quick command '{base_cmd}' has no target defined[/]") else: - self.console.print(f"[bold red]Quick command '{base_cmd}' has unsupported type (supported: 'exec', 'alias')[/]") + self._console_print(f"[bold red]Quick command '{base_cmd}' has unsupported type (supported: 'exec', 'alias')[/]") # Check for plugin-registered slash commands elif base_cmd.lstrip("/") in _get_plugin_cmd_handler_names(): from hermes_cli.plugins import get_plugin_command_handler @@ -8603,7 +8610,7 @@ class HermesCLI: except Exception: _welcome_text = "Welcome to Hermes Agent! Type your message or /help for commands." _welcome_color = "#FFF8DC" - self.console.print(f"[{_welcome_color}]{_welcome_text}[/]") + self._console_print(f"[{_welcome_color}]{_welcome_text}[/]") # Show a random tip to help users discover features try: from hermes_cli.tips import get_random_tip @@ -8612,16 +8619,16 @@ class HermesCLI: _tip_color = _welcome_skin.get_color("banner_dim", "#B8860B") except Exception: _tip_color = "#B8860B" - self.console.print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]") + self._console_print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]") except Exception: pass # Tips are non-critical — never break startup if self.preloaded_skills and not self._startup_skills_line_shown: skills_label = ", ".join(self.preloaded_skills) - self.console.print( + self._console_print( f"[bold {_accent_hex()}]Activated skills:[/] {skills_label}" ) self._startup_skills_line_shown = True - self.console.print() + self._console_print() # State for async operation self._agent_running = False diff --git a/tests/cli/test_gquota_command.py b/tests/cli/test_gquota_command.py new file mode 100644 index 000000000..0740e0012 --- /dev/null +++ b/tests/cli/test_gquota_command.py @@ -0,0 +1,21 @@ +from unittest.mock import MagicMock, patch + + +def test_gquota_uses_chat_console_when_tui_is_live(): + from agent.google_oauth import GoogleOAuthError + from cli import HermesCLI + + cli = HermesCLI.__new__(HermesCLI) + cli.console = MagicMock() + cli._app = object() + + live_console = MagicMock() + + with patch("cli.ChatConsole", return_value=live_console), \ + patch("agent.google_oauth.get_valid_access_token", side_effect=GoogleOAuthError("No Google OAuth credentials found")), \ + patch("agent.google_oauth.load_credentials", return_value=None), \ + patch("agent.google_code_assist.retrieve_user_quota"): + cli._handle_gquota_command("/gquota") + + assert live_console.print.call_count == 2 + cli.console.print.assert_not_called() diff --git a/tests/cli/test_quick_commands.py b/tests/cli/test_quick_commands.py index 7a89d4ca2..1c94cb1b0 100644 --- a/tests/cli/test_quick_commands.py +++ b/tests/cli/test_quick_commands.py @@ -33,6 +33,20 @@ class TestCLIQuickCommands: printed = self._printed_plain(cli.console.print.call_args[0][0]) assert printed == "daily-note" + def test_exec_command_uses_chat_console_when_tui_is_live(self): + cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}}) + cli._app = object() + live_console = MagicMock() + + with patch("cli.ChatConsole", return_value=live_console): + result = cli.process_command("/dn") + + assert result is True + live_console.print.assert_called_once() + printed = self._printed_plain(live_console.print.call_args[0][0]) + assert printed == "daily-note" + cli.console.print.assert_not_called() + def test_exec_command_stderr_shown_on_no_stdout(self): cli = self._make_cli({"err": {"type": "exec", "command": "echo error >&2"}}) result = cli.process_command("/err")