From 9f086173befafdbc18c8a8825a93c1f775db1d65 Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Mon, 9 Mar 2026 19:52:28 -0500 Subject: [PATCH] fix: formatting output newlines etc --- cli.py | 1048 ++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 972 insertions(+), 76 deletions(-) diff --git a/cli.py b/cli.py index 5b66fdc295..70e5cfff4f 100755 --- a/cli.py +++ b/cli.py @@ -31,9 +31,19 @@ os.environ["HERMES_QUIET"] = "1" # Our own modules import yaml -# prompt_toolkit for input only (readline replacement) -from prompt_toolkit import PromptSession +# prompt_toolkit for fixed input area TUI from prompt_toolkit.history import FileHistory +from prompt_toolkit.styles import Style as PTStyle +from prompt_toolkit.patch_stdout import patch_stdout +from prompt_toolkit.application import Application +from prompt_toolkit.layout import Layout, HSplit, Window, FormattedTextControl, ConditionalContainer +from prompt_toolkit.layout.processors import Processor, Transformation, PasswordProcessor, ConditionalProcessor +from prompt_toolkit.filters import Condition +from prompt_toolkit.layout.dimension import Dimension +from prompt_toolkit.layout.menus import CompletionsMenu +from prompt_toolkit.widgets import TextArea +from prompt_toolkit.key_binding import KeyBindings +from prompt_toolkit import print_formatted_text as _pt_print from prompt_toolkit.formatted_text import ANSI as _PT_ANSI import threading import queue @@ -148,6 +158,7 @@ def load_cli_config() -> Dict[str, Any]: "singularity_image": "docker://python:3.11", "modal_image": "python:3.11", "daytona_image": "nikolaik/python-nodejs:python3.11-nodejs20", + "docker_volumes": [], # host:container volume mounts for Docker backend }, "browser": { "inactivity_timeout": 120, # Auto-cleanup inactive browser sessions after 2 min @@ -658,7 +669,36 @@ _DIM = "\033[2m" _RST = "\033[0m" def _cprint(text: str): - print(text) + """Print ANSI-colored text through prompt_toolkit's native renderer. + + Raw ANSI escapes written via print() are swallowed by patch_stdout's + StdoutProxy. Routing through print_formatted_text(ANSI(...)) lets + prompt_toolkit parse the escapes and render real colors. + """ + _pt_print(_PT_ANSI(text)) + + +class ChatConsole: + """Rich Console adapter for prompt_toolkit's patch_stdout context. + + Captures Rich's rendered ANSI output and routes it through _cprint + so colors and markup render correctly inside the interactive chat loop. + Drop-in replacement for Rich Console — just pass this to any function + that expects a console.print() interface. + """ + + def __init__(self): + from io import StringIO + self._buffer = StringIO() + self._inner = Console(file=self._buffer, force_terminal=True, highlight=False) + + def print(self, *args, **kwargs): + self._buffer.seek(0) + self._buffer.truncate() + self._inner.print(*args, **kwargs) + output = self._buffer.getvalue() + for line in output.rstrip("\n").split("\n"): + _cprint(line) # ASCII Art - HERMES-AGENT logo (full width, single line - requires ~95 char terminal) HERMES_AGENT_LOGO = """[bold #FFD700]██╗ ██╗███████╗██████╗ ███╗ ███╗███████╗███████╗ █████╗ ██████╗ ███████╗███╗ ██╗████████╗[/] @@ -1118,7 +1158,7 @@ class HermesCLI: # Agent will be initialized on first use self.agent: Optional[AIAgent] = None - self._app = None + self._app = None # prompt_toolkit Application (set in run()) # Conversation state self.conversation_history: List[Dict[str, Any]] = [] @@ -1146,8 +1186,16 @@ class HermesCLI: # History file for persistent input recall across sessions self._history_file = Path.home() / ".hermes_history" + self._last_invalidate: float = 0.0 # throttle UI repaints + self._stream_buf = "" + def _invalidate(self, min_interval: float = 0.25) -> None: - pass + """Throttled UI repaint — prevents terminal blinking on slow/SSH connections.""" + import time as _time + now = _time.monotonic() + if hasattr(self, "_app") and self._app and (now - self._last_invalidate) >= min_interval: + self._last_invalidate = now + self._app.invalidate() def _normalize_model_for_provider(self, resolved_provider: str) -> bool: """Strip provider prefixes and swap the default model for Codex. @@ -2268,7 +2316,7 @@ class HermesCLI: def _handle_skills_command(self, cmd: str): """Handle /skills slash command — delegates to hermes_cli.skills_hub.""" from hermes_cli.skills_hub import handle_skills_slash - handle_skills_slash(cmd, self.console) + handle_skills_slash(cmd, ChatConsole()) def _show_gateway_status(self): """Show status of the gateway and connected messaging platforms.""" @@ -2358,10 +2406,47 @@ class HermesCLI: self.agent.flush_memories(self.conversation_history) except Exception: pass - self.console.clear() + # Clear terminal screen. Inside the TUI, Rich's console.clear() + # goes through patch_stdout's StdoutProxy which swallows the + # screen-clear escape sequences. Use prompt_toolkit's output + # object directly to actually clear the terminal. + if self._app: + out = self._app.output + out.erase_screen() + out.cursor_goto(0, 0) + out.flush() + else: + self.console.clear() + # Reset conversation self.conversation_history = [] - self.show_banner() - print(" ✨ (◕‿◕)✨ Fresh start! Screen cleared and conversation reset.\n") + # Show fresh banner. Inside the TUI we must route Rich output + # through ChatConsole (which uses prompt_toolkit's native ANSI + # renderer) instead of self.console (which writes raw to stdout + # and gets mangled by patch_stdout). + if self._app: + cc = ChatConsole() + term_w = shutil.get_terminal_size().columns + if self.compact or term_w < 80: + cc.print(_build_compact_banner()) + else: + tools = get_tool_definitions(enabled_toolsets=self.enabled_toolsets, quiet_mode=True) + cwd = os.getenv("TERMINAL_CWD", os.getcwd()) + ctx_len = None + if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'context_compressor'): + ctx_len = self.agent.context_compressor.context_length + build_welcome_banner( + console=cc, + model=self.model, + cwd=cwd, + tools=tools, + enabled_toolsets=self.enabled_toolsets, + session_id=self.session_id, + context_length=ctx_len, + ) + _cprint(" ✨ (◕‿◕)✨ Fresh start! Screen cleared and conversation reset.\n") + else: + self.show_banner() + print(" ✨ (◕‿◕)✨ Fresh start! Screen cleared and conversation reset.\n") elif cmd_lower == "/history": self.show_history() elif cmd_lower.startswith("/title"): @@ -2822,9 +2907,27 @@ class HermesCLI: except Exception as e: print(f" ❌ MCP reload failed: {e}") + _stream_started = False + def _stream_delta(self, text: str): - sys.stdout.write(text) - sys.stdout.flush() + """Buffer streaming tokens; emit complete lines via _cprint.""" + if not text: + return + if not self._stream_started: + text = text.lstrip("\n") + if not text: + return + self._stream_started = True + self._stream_buf += text + while "\n" in self._stream_buf: + line, self._stream_buf = self._stream_buf.split("\n", 1) + _cprint(line) + + def _flush_stream(self): + """Emit any remaining partial line from the stream buffer.""" + if self._stream_buf: + _cprint(self._stream_buf) + self._stream_buf = "" def _clarify_callback(self, question, choices): """ @@ -2962,29 +3065,96 @@ class HermesCLI: self._approval_state = None self._approval_deadline = 0 self._invalidate() - def chat(self, message, images: list = None) -> Optional[str]: - """Send a message and stream the response to stdout.""" + """ + Send a message to the agent and get a response. + + Handles streaming output, interrupt detection (user typing while agent + is working), and re-queueing of interrupted messages. + + Uses a dedicated _interrupt_queue (separate from _pending_input) to avoid + race conditions between the process_loop and interrupt monitoring. Messages + typed while the agent is running go to _interrupt_queue; messages typed while + idle go to _pending_input. + + Args: + message: The user's message (str or multimodal content list) + images: Optional list of Path objects for attached images + + Returns: + The agent's response, or None on error + """ + # Refresh provider credentials if needed (handles key rotation transparently) if not self._ensure_runtime_credentials(): return None + + # Initialize agent if needed if not self._init_agent(): return None - + + # Pre-process images through the vision tool (Gemini Flash) so the + # main model receives text descriptions instead of raw base64 image + # content — works with any model, not just vision-capable ones. if images: message = self._preprocess_images_with_vision( - message if isinstance(message, str) else "", images) + message if isinstance(message, str) else "", images + ) self.conversation_history.append({"role": "user", "content": message}) + self._stream_buf = "" + self._stream_started = False w = shutil.get_terminal_size().columns - print(f"{_GOLD}{'─' * w}{_RST}", flush=True) - + _cprint(f"\n{_GOLD}╭─ ⚕ Hermes {'─' * max(w - 15, 0)}╮{_RST}") + try: - result = self.agent.run_conversation( - user_message=message, - conversation_history=self.conversation_history[:-1], - task_id=self.session_id, - ) + # Run the conversation with interrupt monitoring + result = None + + def run_agent(): + nonlocal result + result = self.agent.run_conversation( + user_message=message, + conversation_history=self.conversation_history[:-1], # Exclude the message we just added + task_id=self.session_id, + ) + + # Start agent in background thread + agent_thread = threading.Thread(target=run_agent) + agent_thread.start() + + # Monitor the dedicated interrupt queue while the agent runs. + # _interrupt_queue is separate from _pending_input, so process_loop + # and chat() never compete for the same queue. + # When a clarify question is active, user input is handled entirely + # by the Enter key binding (routed to the clarify response queue), + # so we skip interrupt processing to avoid stealing that input. + interrupt_msg = None + while agent_thread.is_alive(): + if hasattr(self, '_interrupt_queue'): + try: + interrupt_msg = self._interrupt_queue.get(timeout=0.1) + if interrupt_msg: + # If clarify is active, the Enter handler routes + # input directly; this queue shouldn't have anything. + # But if it does (race condition), don't interrupt. + if self._clarify_state or self._clarify_freetext: + continue + print(f"\n⚡ New message detected, interrupting...") + self.agent.interrupt(interrupt_msg) + break + except queue.Empty: + pass # Queue empty or timeout, continue waiting + else: + # Fallback for non-interactive mode (e.g., single-query) + agent_thread.join(0.1) + + agent_thread.join() # Ensure agent thread completes + + self._flush_stream() + sys.stdout.flush() + import time as _time + _time.sleep(0.15) self.conversation_history = result.get("messages", self.conversation_history) if result else self.conversation_history response = result.get("final_response", "") if result else "" @@ -2992,24 +3162,41 @@ class HermesCLI: if result and result.get("failed") and not response: response = f"Error: {result.get('error', 'Unknown error')}" - # If streaming was active, tokens were already printed to stdout. - # If not (codex path, non-streaming fallback), print the response. - if response and not self.agent.stream_delta_callback: - print(f"\n{response}") - print(flush=True) - w = shutil.get_terminal_size().columns - print(f"{_GOLD}{'─' * w}{_RST}") + pending_message = None + if result and result.get("interrupted"): + pending_message = result.get("interrupt_message") or interrupt_msg + if response and pending_message: + response += "\n\n---\n_[Interrupted - processing new message]_" + if response and not (self.agent and self.agent.stream_delta_callback): + _cprint(f"\n{response}") + + w = shutil.get_terminal_size().columns + _cprint(f"{_GOLD}╰{'─' * (w - 2)}╯{_RST}") + + # Play terminal bell when agent finishes (if enabled). + # Works over SSH — the bell propagates to the user's terminal. if self.bell_on_complete: sys.stdout.write("\a") sys.stdout.flush() - + + # Combine all interrupt messages (user may have typed multiple while waiting) + # and re-queue as one prompt for process_loop + if pending_message and hasattr(self, '_pending_input'): + all_parts = [pending_message] + while not self._interrupt_queue.empty(): + try: + extra = self._interrupt_queue.get_nowait() + if extra: + all_parts.append(extra) + except queue.Empty: + break + combined = "\n".join(all_parts) + print(f"\n📨 Queued: '{combined[:50]}{'...' if len(combined) > 50 else ''}'") + self._pending_input.put(combined) + return response - except KeyboardInterrupt: - if self.agent: - self.agent.interrupt() - print("\n⚡ Interrupted") - return None + except Exception as e: print(f"Error: {e}") return None @@ -3041,11 +3228,11 @@ class HermesCLI: print("Goodbye! ⚕") def run(self): - """Run the interactive CLI loop. Uses PromptSession for input, plain - stdout for output. Streaming tokens go directly to stdout — no TUI - framework, no proxy, no layout. Copy/paste and scrolling work natively.""" + """Run the interactive CLI loop with persistent input at bottom.""" self.show_banner() + # If resuming a session, load history and display it immediately + # so the user has context before typing their first message. if self._resumed: if self._preload_resumed_session(): self._display_resumed_history() @@ -3053,71 +3240,780 @@ class HermesCLI: self.console.print("[#FFF8DC]Welcome to Hermes Agent! Type your message or /help for commands.[/]") self.console.print() + # State for async operation self._agent_running = False + self._pending_input = queue.Queue() # For normal input (commands + new queries) + self._interrupt_queue = queue.Queue() # For messages typed while agent is running self._should_exit = False - self._last_ctrl_c_time = 0 - self._clarify_state = None - self._clarify_freetext = False - self._clarify_deadline = 0 - self._sudo_state = None + self._last_ctrl_c_time = 0 # Track double Ctrl+C for force exit + + # Clarify tool state: interactive question/answer with the user. + # When the agent calls the clarify tool, _clarify_state is set and + # the prompt_toolkit UI switches to a selection mode. + self._clarify_state = None # dict with question, choices, selected, response_queue + self._clarify_freetext = False # True when user chose "Other" and is typing + self._clarify_deadline = 0 # monotonic timestamp when the clarify times out + + # Sudo password prompt state (similar mechanism to clarify) + self._sudo_state = None # dict with response_queue when active self._sudo_deadline = 0 - self._approval_state = None + + # Dangerous command approval state (similar mechanism to clarify) + self._approval_state = None # dict with command, description, choices, selected, response_queue self._approval_deadline = 0 + + # Clipboard image attachments (paste images into the CLI) self._attached_images: list[Path] = [] self._image_counter = 0 + # Register callbacks so terminal_tool prompts route through our UI set_sudo_password_callback(self._sudo_password_callback) set_approval_callback(self._approval_callback) + + # Key bindings for the input area + kb = KeyBindings() + + @kb.add('enter') + def handle_enter(event): + """Handle Enter key - submit input. + + Routes to the correct queue based on active UI state: + - Sudo password prompt: password goes to sudo response queue + - Approval selection: selected choice goes to approval response queue + - Clarify freetext mode: answer goes to the clarify response queue + - Clarify choice mode: selected choice goes to the clarify response queue + - Agent running: goes to _interrupt_queue (chat() monitors this) + - Agent idle: goes to _pending_input (process_loop monitors this) + Commands (starting with /) always go to _pending_input so they're + handled as commands, not sent as interrupt text to the agent. + """ + # --- Sudo password prompt: submit the typed password --- + if self._sudo_state: + text = event.app.current_buffer.text + self._sudo_state["response_queue"].put(text) + self._sudo_state = None + event.app.current_buffer.reset() + event.app.invalidate() + return - from hermes_cli.commands import SlashCommandCompleter - session = PromptSession( - history=FileHistory(str(self._history_file)), - completer=SlashCommandCompleter(skill_commands_provider=lambda: _skill_commands), + # --- Approval selection: confirm the highlighted choice --- + if self._approval_state: + state = self._approval_state + selected = state["selected"] + choices = state["choices"] + if 0 <= selected < len(choices): + state["response_queue"].put(choices[selected]) + self._approval_state = None + event.app.invalidate() + return + + # --- Clarify freetext mode: user typed their own answer --- + if self._clarify_freetext and self._clarify_state: + text = event.app.current_buffer.text.strip() + if text: + self._clarify_state["response_queue"].put(text) + self._clarify_state = None + self._clarify_freetext = False + event.app.current_buffer.reset() + event.app.invalidate() + return + + # --- Clarify choice mode: confirm the highlighted selection --- + if self._clarify_state and not self._clarify_freetext: + state = self._clarify_state + selected = state["selected"] + choices = state.get("choices") or [] + if selected < len(choices): + state["response_queue"].put(choices[selected]) + self._clarify_state = None + event.app.invalidate() + else: + # "Other" selected → switch to freetext + self._clarify_freetext = True + event.app.invalidate() + return + + # --- Normal input routing --- + text = event.app.current_buffer.text.strip() + has_images = bool(self._attached_images) + if text or has_images: + # Snapshot and clear attached images + images = list(self._attached_images) + self._attached_images.clear() + event.app.invalidate() + # Bundle text + images as a tuple when images are present + payload = (text, images) if images else text + if self._agent_running and not (text and text.startswith("/")): + self._interrupt_queue.put(payload) + else: + self._pending_input.put(payload) + event.app.current_buffer.reset(append_to_history=True) + + @kb.add('escape', 'enter') + def handle_alt_enter(event): + """Alt+Enter inserts a newline for multi-line input.""" + event.current_buffer.insert_text('\n') + + @kb.add('c-j') + def handle_ctrl_enter(event): + """Ctrl+Enter (c-j) inserts a newline. Most terminals send c-j for Ctrl+Enter.""" + event.current_buffer.insert_text('\n') + + # --- Clarify tool: arrow-key navigation for multiple-choice questions --- + + @kb.add('up', filter=Condition(lambda: bool(self._clarify_state) and not self._clarify_freetext)) + def clarify_up(event): + """Move selection up in clarify choices.""" + if self._clarify_state: + self._clarify_state["selected"] = max(0, self._clarify_state["selected"] - 1) + event.app.invalidate() + + @kb.add('down', filter=Condition(lambda: bool(self._clarify_state) and not self._clarify_freetext)) + def clarify_down(event): + """Move selection down in clarify choices.""" + if self._clarify_state: + choices = self._clarify_state.get("choices") or [] + max_idx = len(choices) # last index is the "Other" option + self._clarify_state["selected"] = min(max_idx, self._clarify_state["selected"] + 1) + event.app.invalidate() + + # --- Dangerous command approval: arrow-key navigation --- + + @kb.add('up', filter=Condition(lambda: bool(self._approval_state))) + def approval_up(event): + if self._approval_state: + self._approval_state["selected"] = max(0, self._approval_state["selected"] - 1) + event.app.invalidate() + + @kb.add('down', filter=Condition(lambda: bool(self._approval_state))) + def approval_down(event): + if self._approval_state: + max_idx = len(self._approval_state["choices"]) - 1 + self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1) + event.app.invalidate() + + # --- History navigation: up/down browse history in normal input mode --- + # The TextArea is multiline, so by default up/down only move the cursor. + # Buffer.auto_up/auto_down handle both: cursor movement when multi-line, + # history browsing when on the first/last line (or single-line input). + _normal_input = Condition( + lambda: not self._clarify_state and not self._approval_state and not self._sudo_state ) - atexit.register(_run_cleanup) - try: + @kb.add('up', filter=_normal_input) + def history_up(event): + """Up arrow: browse history when on first line, else move cursor up.""" + event.app.current_buffer.auto_up(count=event.arg) + + @kb.add('down', filter=_normal_input) + def history_down(event): + """Down arrow: browse history when on last line, else move cursor down.""" + event.app.current_buffer.auto_down(count=event.arg) + + @kb.add('c-c') + def handle_ctrl_c(event): + """Handle Ctrl+C - cancel interactive prompts, interrupt agent, or exit. + + Priority: + 1. Cancel active sudo/approval/clarify prompt + 2. Interrupt the running agent (first press) + 3. Force exit (second press within 2s, or when idle) + """ + import time as _time + now = _time.time() + + # Cancel sudo prompt + if self._sudo_state: + self._sudo_state["response_queue"].put("") + self._sudo_state = None + event.app.current_buffer.reset() + event.app.invalidate() + return + + # Cancel approval prompt (deny) + if self._approval_state: + self._approval_state["response_queue"].put("deny") + self._approval_state = None + event.app.invalidate() + return + + # Cancel clarify prompt + if self._clarify_state: + self._clarify_state["response_queue"].put( + "The user cancelled. Use your best judgement to proceed." + ) + self._clarify_state = None + self._clarify_freetext = False + event.app.current_buffer.reset() + event.app.invalidate() + return + + if self._agent_running and self.agent: + if now - self._last_ctrl_c_time < 2.0: + print("\n⚡ Force exiting...") + self._should_exit = True + event.app.exit() + return + + self._last_ctrl_c_time = now + print("\n⚡ Interrupting agent... (press Ctrl+C again to force exit)") + self.agent.interrupt() + else: + # If there's text or images, clear them (like bash). + # If everything is already empty, exit. + if event.app.current_buffer.text or self._attached_images: + event.app.current_buffer.reset() + self._attached_images.clear() + event.app.invalidate() + else: + self._should_exit = True + event.app.exit() + + @kb.add('c-d') + def handle_ctrl_d(event): + """Handle Ctrl+D - exit.""" + self._should_exit = True + event.app.exit() + + from prompt_toolkit.keys import Keys + + @kb.add(Keys.BracketedPaste, eager=True) + def handle_paste(event): + """Handle terminal paste — detect clipboard images. + + When the terminal supports bracketed paste, Ctrl+V / Cmd+V + triggers this with the pasted text. We also check the + clipboard for an image on every paste event. + """ + pasted_text = event.data or "" + if self._try_attach_clipboard_image(): + event.app.invalidate() + if pasted_text: + event.current_buffer.insert_text(pasted_text) + + @kb.add('c-v') + def handle_ctrl_v(event): + """Fallback image paste for terminals without bracketed paste. + + On Linux terminals (GNOME Terminal, Konsole, etc.), Ctrl+V + sends raw byte 0x16 instead of triggering a paste. This + binding catches that and checks the clipboard for images. + On terminals that DO intercept Ctrl+V for paste (macOS + Terminal, iTerm2, VSCode, Windows Terminal), the bracketed + paste handler fires instead and this binding never triggers. + """ + if self._try_attach_clipboard_image(): + event.app.invalidate() + + @kb.add('escape', 'v') + def handle_alt_v(event): + """Alt+V — paste image from clipboard. + + Alt key combos pass through all terminal emulators (sent as + ESC + key), unlike Ctrl+V which terminals intercept for text + paste. This is the reliable way to attach clipboard images + on WSL2, VSCode, and any terminal over SSH where Ctrl+V + can't reach the application for image-only clipboard. + """ + if self._try_attach_clipboard_image(): + event.app.invalidate() + else: + # No image found — show a hint + pass # silent when no image (avoid noise on accidental press) + + # Dynamic prompt: shows Hermes symbol when agent is working, + # or answer prompt when clarify freetext mode is active. + cli_ref = self + + def get_prompt(): + if cli_ref._sudo_state: + return [('class:sudo-prompt', '🔐 ❯ ')] + if cli_ref._approval_state: + return [('class:prompt-working', '⚠ ❯ ')] + if cli_ref._clarify_freetext: + return [('class:clarify-selected', '✎ ❯ ')] + if cli_ref._clarify_state: + return [('class:prompt-working', '? ❯ ')] + if cli_ref._agent_running: + return [('class:prompt-working', '⚕ ❯ ')] + return [('class:prompt', '❯ ')] + + # Create the input area with multiline (shift+enter), autocomplete, and paste handling + input_area = TextArea( + height=Dimension(min=1, max=8, preferred=1), + prompt=get_prompt, + style='class:input-area', + multiline=True, + wrap_lines=True, + history=FileHistory(str(self._history_file)), + completer=SlashCommandCompleter(skill_commands_provider=lambda: _skill_commands), + complete_while_typing=True, + ) + + # Dynamic height: accounts for both explicit newlines AND visual + # wrapping of long lines so the input area always fits its content. + # The prompt characters ("❯ " etc.) consume ~4 columns. + def _input_height(): + try: + doc = input_area.buffer.document + available_width = shutil.get_terminal_size().columns - 4 # subtract prompt width + if available_width < 10: + available_width = 40 + visual_lines = 0 + for line in doc.lines: + # Each logical line takes at least 1 visual row; long lines wrap + if len(line) == 0: + visual_lines += 1 + else: + visual_lines += max(1, -(-len(line) // available_width)) # ceil division + return min(max(visual_lines, 1), 8) + except Exception: + return 1 + + input_area.window.height = _input_height + + # Paste collapsing: detect large pastes and save to temp file + _paste_counter = [0] + _prev_text_len = [0] + + def _on_text_changed(buf): + """Detect large pastes and collapse them to a file reference.""" + text = buf.text + line_count = text.count('\n') + chars_added = len(text) - _prev_text_len[0] + _prev_text_len[0] = len(text) + # Heuristic: a real paste adds many characters at once (not just a + # single newline from Alt+Enter) AND the result has 5+ lines. + if line_count >= 5 and chars_added > 1 and not text.startswith('/'): + _paste_counter[0] += 1 + # Save to temp file + paste_dir = Path(os.path.expanduser("~/.hermes/pastes")) + paste_dir.mkdir(parents=True, exist_ok=True) + paste_file = paste_dir / f"paste_{_paste_counter[0]}_{datetime.now().strftime('%H%M%S')}.txt" + paste_file.write_text(text, encoding="utf-8") + # Replace buffer with compact reference + buf.text = f"[Pasted text #{_paste_counter[0]}: {line_count + 1} lines → {paste_file}]" + buf.cursor_position = len(buf.text) + + input_area.buffer.on_text_changed += _on_text_changed + + # --- Input processors for password masking and inline placeholder --- + + # Mask input with '*' when the sudo password prompt is active + input_area.control.input_processors.append( + ConditionalProcessor( + PasswordProcessor(), + filter=Condition(lambda: bool(cli_ref._sudo_state)), + ) + ) + + class _PlaceholderProcessor(Processor): + """Render grayed-out placeholder text inside the input when empty.""" + def __init__(self, get_text): + self._get_text = get_text + + def apply_transformation(self, ti): + if not ti.document.text and ti.lineno == 0: + text = self._get_text() + if text: + # Append after existing fragments (preserves the ❯ prompt) + return Transformation(fragments=ti.fragments + [('class:placeholder', text)]) + return Transformation(fragments=ti.fragments) + + def _get_placeholder(): + if cli_ref._sudo_state: + return "type password (hidden), Enter to skip" + if cli_ref._approval_state: + return "" + if cli_ref._clarify_state: + return "" + if cli_ref._agent_running: + return "type a message + Enter to interrupt, Ctrl+C to cancel" + return "Ask Hermes anything... (Alt+Enter for newline)" + + input_area.control.input_processors.append(_PlaceholderProcessor(_get_placeholder)) + + # Hint line above input: shown only for interactive prompts that need + # extra instructions (sudo countdown, approval navigation, clarify). + # The agent-running interrupt hint is now an inline placeholder above. + def get_hint_text(): + import time as _time + + if cli_ref._sudo_state: + remaining = max(0, int(cli_ref._sudo_deadline - _time.monotonic())) + return [ + ('class:hint', ' password hidden · Enter to skip'), + ('class:clarify-countdown', f' ({remaining}s)'), + ] + + if cli_ref._approval_state: + remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic())) + return [ + ('class:hint', ' ↑/↓ to select, Enter to confirm'), + ('class:clarify-countdown', f' ({remaining}s)'), + ] + + if cli_ref._clarify_state: + remaining = max(0, int(cli_ref._clarify_deadline - _time.monotonic())) + countdown = f' ({remaining}s)' if cli_ref._clarify_deadline else '' + if cli_ref._clarify_freetext: + return [ + ('class:hint', ' type your answer and press Enter'), + ('class:clarify-countdown', countdown), + ] + return [ + ('class:hint', ' ↑/↓ to select, Enter to confirm'), + ('class:clarify-countdown', countdown), + ] + + return [] + + def get_hint_height(): + if cli_ref._sudo_state or cli_ref._approval_state or cli_ref._clarify_state: + return 1 + # Keep a 1-line spacer while agent runs so output doesn't push + # right up against the top rule of the input area + return 1 if cli_ref._agent_running else 0 + + spacer = Window( + content=FormattedTextControl(get_hint_text), + height=get_hint_height, + ) + + # --- Clarify tool: dynamic display widget for questions + choices --- + + def _get_clarify_display(): + """Build styled text for the clarify question/choices panel.""" + state = cli_ref._clarify_state + if not state: + return [] + + question = state["question"] + choices = state.get("choices") or [] + selected = state.get("selected", 0) + + lines = [] + # Box top border + lines.append(('class:clarify-border', '╭─ ')) + lines.append(('class:clarify-title', 'Hermes needs your input')) + lines.append(('class:clarify-border', ' ─────────────────────────────╮\n')) + lines.append(('class:clarify-border', '│\n')) + + # Question text + lines.append(('class:clarify-border', '│ ')) + lines.append(('class:clarify-question', question)) + lines.append(('', '\n')) + lines.append(('class:clarify-border', '│\n')) + + if choices: + # Multiple-choice mode: show selectable options + for i, choice in enumerate(choices): + lines.append(('class:clarify-border', '│ ')) + if i == selected and not cli_ref._clarify_freetext: + lines.append(('class:clarify-selected', f'❯ {choice}')) + else: + lines.append(('class:clarify-choice', f' {choice}')) + lines.append(('', '\n')) + + # "Other" option (5th line, only shown when choices exist) + other_idx = len(choices) + lines.append(('class:clarify-border', '│ ')) + if selected == other_idx and not cli_ref._clarify_freetext: + lines.append(('class:clarify-selected', '❯ Other (type your answer)')) + elif cli_ref._clarify_freetext: + lines.append(('class:clarify-active-other', '❯ Other (type below)')) + else: + lines.append(('class:clarify-choice', ' Other (type your answer)')) + lines.append(('', '\n')) + + lines.append(('class:clarify-border', '│\n')) + lines.append(('class:clarify-border', '╰──────────────────────────────────────────────────╯\n')) + return lines + + clarify_widget = ConditionalContainer( + Window( + FormattedTextControl(_get_clarify_display), + wrap_lines=True, + ), + filter=Condition(lambda: cli_ref._clarify_state is not None), + ) + + # --- Sudo password: display widget --- + + def _get_sudo_display(): + state = cli_ref._sudo_state + if not state: + return [] + lines = [] + lines.append(('class:sudo-border', '╭─ ')) + lines.append(('class:sudo-title', '🔐 Sudo Password Required')) + lines.append(('class:sudo-border', ' ──────────────────────────╮\n')) + lines.append(('class:sudo-border', '│\n')) + lines.append(('class:sudo-border', '│ ')) + lines.append(('class:sudo-text', 'Enter password below (hidden), or press Enter to skip')) + lines.append(('', '\n')) + lines.append(('class:sudo-border', '│\n')) + lines.append(('class:sudo-border', '╰──────────────────────────────────────────────────╯\n')) + return lines + + sudo_widget = ConditionalContainer( + Window( + FormattedTextControl(_get_sudo_display), + wrap_lines=True, + ), + filter=Condition(lambda: cli_ref._sudo_state is not None), + ) + + # --- Dangerous command approval: display widget --- + + def _get_approval_display(): + state = cli_ref._approval_state + if not state: + return [] + command = state["command"] + description = state["description"] + choices = state["choices"] + selected = state.get("selected", 0) + + cmd_display = command[:70] + '...' if len(command) > 70 else command + choice_labels = { + "once": "Allow once", + "session": "Allow for this session", + "always": "Add to permanent allowlist", + "deny": "Deny", + } + + lines = [] + lines.append(('class:approval-border', '╭─ ')) + lines.append(('class:approval-title', '⚠️ Dangerous Command')) + lines.append(('class:approval-border', ' ───────────────────────────────╮\n')) + lines.append(('class:approval-border', '│\n')) + lines.append(('class:approval-border', '│ ')) + lines.append(('class:approval-desc', description)) + lines.append(('', '\n')) + lines.append(('class:approval-border', '│ ')) + lines.append(('class:approval-cmd', cmd_display)) + lines.append(('', '\n')) + lines.append(('class:approval-border', '│\n')) + for i, choice in enumerate(choices): + lines.append(('class:approval-border', '│ ')) + label = choice_labels.get(choice, choice) + if i == selected: + lines.append(('class:approval-selected', f'❯ {label}')) + else: + lines.append(('class:approval-choice', f' {label}')) + lines.append(('', '\n')) + lines.append(('class:approval-border', '│\n')) + lines.append(('class:approval-border', '╰──────────────────────────────────────────────────────╯\n')) + return lines + + approval_widget = ConditionalContainer( + Window( + FormattedTextControl(_get_approval_display), + wrap_lines=True, + ), + filter=Condition(lambda: cli_ref._approval_state is not None), + ) + + # Horizontal rules above and below the input (bronze, 1 line each). + # The bottom rule moves down as the TextArea grows with newlines. + # Using char='─' instead of hardcoded repetition so the rule + # always spans the full terminal width on any screen size. + input_rule_top = Window( + char='─', + height=1, + style='class:input-rule', + ) + input_rule_bot = Window( + char='─', + height=1, + style='class:input-rule', + ) + + # Image attachment indicator — shows badges like [📎 Image #1] above input + cli_ref = self + + def _get_image_bar(): + if not cli_ref._attached_images: + return [] + base = cli_ref._image_counter - len(cli_ref._attached_images) + 1 + badges = " ".join( + f"[📎 Image #{base + i}]" + for i in range(len(cli_ref._attached_images)) + ) + return [("class:image-badge", f" {badges} ")] + + image_bar = Window( + content=FormattedTextControl(_get_image_bar), + height=Condition(lambda: bool(cli_ref._attached_images)), + ) + + # Layout: interactive prompt widgets + ruled input at bottom. + # The sudo, approval, and clarify widgets appear above the input when + # the corresponding interactive prompt is active. + layout = Layout( + HSplit([ + Window(height=0), + sudo_widget, + approval_widget, + clarify_widget, + spacer, + input_rule_top, + image_bar, + input_area, + input_rule_bot, + CompletionsMenu(max_height=12, scroll_offset=1), + ]) + ) + + # Style for the application + style = PTStyle.from_dict({ + 'input-area': '#FFF8DC', + 'placeholder': '#555555 italic', + 'prompt': '#FFF8DC', + 'prompt-working': '#888888 italic', + 'hint': '#555555 italic', + # Bronze horizontal rules around the input area + 'input-rule': '#CD7F32', + # Clipboard image attachment badges + 'image-badge': '#87CEEB bold', + 'completion-menu': 'bg:#1a1a2e #FFF8DC', + 'completion-menu.completion': 'bg:#1a1a2e #FFF8DC', + 'completion-menu.completion.current': 'bg:#333355 #FFD700', + 'completion-menu.meta.completion': 'bg:#1a1a2e #888888', + 'completion-menu.meta.completion.current': 'bg:#333355 #FFBF00', + # Clarify question panel + 'clarify-border': '#CD7F32', + 'clarify-title': '#FFD700 bold', + 'clarify-question': '#FFF8DC bold', + 'clarify-choice': '#AAAAAA', + 'clarify-selected': '#FFD700 bold', + 'clarify-active-other': '#FFD700 italic', + 'clarify-countdown': '#CD7F32', + # Sudo password panel + 'sudo-prompt': '#FF6B6B bold', + 'sudo-border': '#CD7F32', + 'sudo-title': '#FF6B6B bold', + 'sudo-text': '#FFF8DC', + # Dangerous command approval panel + 'approval-border': '#CD7F32', + 'approval-title': '#FF8C00 bold', + 'approval-desc': '#FFF8DC bold', + 'approval-cmd': '#AAAAAA italic', + 'approval-choice': '#AAAAAA', + 'approval-selected': '#FFD700 bold', + }) + + # Create the application + app = Application( + layout=layout, + key_bindings=kb, + style=style, + full_screen=False, + mouse_support=False, + ) + self._app = app # Store reference for clarify_callback + + # Background thread to process inputs and run agent + def process_loop(): while not self._should_exit: try: - text = session.prompt(_PT_ANSI(f"{_GOLD}❯{_RST} ")) - except KeyboardInterrupt: - if self._agent_running and self.agent: - self.agent.interrupt() - print("\n⚡ Interrupted") + # Check for pending input with timeout + try: + user_input = self._pending_input.get(timeout=0.1) + except queue.Empty: + continue + + if not user_input: + continue + + # Unpack image payload: (text, [Path, ...]) or plain str + submit_images = [] + if isinstance(user_input, tuple): + user_input, submit_images = user_input + + # Check for commands + if isinstance(user_input, str) and user_input.startswith("/"): + print(f"\n⚙️ {user_input}") + if not self.process_command(user_input): + self._should_exit = True + # Schedule app exit + if app.is_running: + app.exit() + continue + + # Expand paste references back to full content + import re as _re + paste_match = _re.match(r'\[Pasted text #\d+: \d+ lines → (.+)\]', user_input) if isinstance(user_input, str) else None + if paste_match: + paste_path = Path(paste_match.group(1)) + if paste_path.exists(): + full_text = paste_path.read_text(encoding="utf-8") + line_count = full_text.count('\n') + 1 + print() + _cprint(f"{_GOLD}●{_RST} {_BOLD}[Pasted text: {line_count} lines]{_RST}") + user_input = full_text + else: + print() + _cprint(f"{_GOLD}●{_RST} {_BOLD}{user_input}{_RST}") else: - break - continue - except EOFError: - break + if '\n' in user_input: + first_line = user_input.split('\n')[0] + line_count = user_input.count('\n') + 1 + print() + _cprint(f"{_GOLD}●{_RST} {_BOLD}{first_line}{_RST} {_DIM}(+{line_count - 1} lines){_RST}") + else: + print() + _cprint(f"{_GOLD}●{_RST} {_BOLD}{user_input}{_RST}") + + # Show image attachment count + if submit_images: + n = len(submit_images) + _cprint(f" {_DIM}📎 {n} image{'s' if n > 1 else ''} attached{_RST}") - text = text.strip() - if not text: - continue - - if text.startswith("/"): - if not self.process_command(text): - break - continue - - print(f"\n{_GOLD}●{_RST} {_BOLD}{text[:80]}{'...' if len(text) > 80 else ''}{_RST}") - self._agent_running = True - try: - self.chat(text) - finally: - self._agent_running = False + # Regular chat - run agent + self._agent_running = True + app.invalidate() # Refresh status line + + try: + self.chat(user_input, images=submit_images or None) + finally: + self._agent_running = False + app.invalidate() # Refresh status line + + except Exception as e: + print(f"Error: {e}") + + # Start processing thread + process_thread = threading.Thread(target=process_loop, daemon=True) + process_thread.start() + + # Register atexit cleanup so resources are freed even on unexpected exit + atexit.register(_run_cleanup) + + # Run the application with patch_stdout for proper output handling + try: + with patch_stdout(): + app.run() + except (EOFError, KeyboardInterrupt): + pass finally: + self._should_exit = True + # Flush memories before exit (only for substantial conversations) if self.agent and self.conversation_history: try: self.agent.flush_memories(self.conversation_history) except Exception: pass + # Unregister terminal_tool callbacks to avoid dangling references set_sudo_password_callback(None) set_approval_callback(None) + # Close session in SQLite if hasattr(self, '_session_db') and self._session_db and self.agent: try: self._session_db.end_session(self.agent.session_id, "cli_close") - except Exception: - pass + except Exception as e: + logger.debug("Could not close session in DB: %s", e) _run_cleanup() self._print_exit_summary()