mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-11 08:42:11 +00:00
The setup wizard's provider/model pickers (curses_radiolist via prompt_choice) bailed to the numbered "Select [1-N]" fallback the moment a user pressed up or down. Root cause: even with keypad(True) — which curses.wrapper sets — many terminals/terminfo entries deliver cursor keys to getch() as raw CSI/SS3 byte sequences (e.g. 27, 91, 66 for arrow-down) rather than the translated curses.KEY_DOWN. The menus matched only curses.KEY_UP/KEY_DOWN and treated the leading 27 (ESC) as cancel, so navigation dropped into the text fallback and the trailing bytes leaked into the next input(). Add a shared read_menu_key() helper that decodes CSI/SS3 escape sequences into normalized NAV_* actions (only a lone ESC, with no continuation byte within a short timeout, still cancels) and consumes the tail of unhandled sequences so stray bytes can't corrupt later input(). Route all three curses menus (checklist, radiolist, single_select) through it. Add regression tests covering raw CSI/SS3 arrows, translated KEY_* constants, vim keys, lone-ESC cancel, and full consumption of unhandled sequences (Delete/Home/End).
544 lines
19 KiB
Python
544 lines
19 KiB
Python
"""Shared curses-based UI components for Hermes CLI.
|
|
|
|
Used by `hermes tools` and `hermes skills` for interactive checklists.
|
|
Provides a curses multi-select with keyboard navigation, plus a
|
|
text-based numbered fallback for terminals without curses support.
|
|
"""
|
|
import sys
|
|
from typing import Callable, List, Optional, Set
|
|
|
|
from hermes_cli.colors import Colors, color
|
|
|
|
|
|
def flush_stdin() -> None:
|
|
"""Flush any stray bytes from the stdin input buffer.
|
|
|
|
Must be called after ``curses.wrapper()`` (or any terminal-mode library
|
|
like simple_term_menu) returns, **before** the next ``input()`` /
|
|
``getpass.getpass()`` call. ``curses.endwin()`` restores the terminal
|
|
but does NOT drain the OS input buffer — leftover escape-sequence bytes
|
|
(from arrow keys, terminal mode-switch responses, or rapid keypresses)
|
|
remain buffered and silently get consumed by the next ``input()`` call,
|
|
corrupting user data (e.g. writing ``^[^[`` into .env files).
|
|
|
|
On non-TTY stdin (piped, redirected) or Windows, this is a no-op.
|
|
"""
|
|
try:
|
|
if not sys.stdin.isatty():
|
|
return
|
|
import termios
|
|
termios.tcflush(sys.stdin, termios.TCIFLUSH)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# Normalized menu actions returned by ``read_menu_key``. Using sentinels keeps
|
|
# every menu's key-handling branch identical and free of raw escape-byte logic.
|
|
NAV_UP = "up"
|
|
NAV_DOWN = "down"
|
|
NAV_SELECT = "select"
|
|
NAV_TOGGLE = "toggle"
|
|
NAV_CANCEL = "cancel"
|
|
NAV_NONE = "none"
|
|
|
|
|
|
def read_menu_key(stdscr) -> str:
|
|
"""Read one keypress and normalize it to a menu action.
|
|
|
|
Decodes raw arrow-key escape sequences in addition to the translated
|
|
``curses.KEY_*`` values. Even with ``keypad(True)`` (which
|
|
``curses.wrapper`` sets), some terminals/terminfo entries deliver cursor
|
|
keys as raw CSI/SS3 byte sequences — ``getch()`` then returns ``27`` (ESC)
|
|
followed by e.g. ``[`` ``A``. Treating that leading ``27`` as a cancel is
|
|
what made the setup wizard's provider/model pickers bail to the numbered
|
|
fallback the moment a user pressed up/down.
|
|
|
|
Returns one of the ``NAV_*`` constants. A lone ESC (no continuation byte
|
|
within a short window) is the only thing that maps to ``NAV_CANCEL`` via
|
|
the escape path; ``q`` also cancels. Unknown sequences map to
|
|
``NAV_NONE`` so the caller simply ignores them rather than misfiring.
|
|
"""
|
|
import curses
|
|
|
|
key = stdscr.getch()
|
|
|
|
if key in (curses.KEY_UP, ord("k")):
|
|
return NAV_UP
|
|
if key in (curses.KEY_DOWN, ord("j")):
|
|
return NAV_DOWN
|
|
if key in (curses.KEY_ENTER, 10, 13):
|
|
return NAV_SELECT
|
|
if key == ord(" "):
|
|
return NAV_TOGGLE
|
|
if key == ord("q"):
|
|
return NAV_CANCEL
|
|
|
|
if key == 27: # ESC — could be a lone ESC (cancel) or an escape sequence.
|
|
# Wait briefly for a continuation byte. On slow PTYs (SSH/tmux) the
|
|
# bytes of an arrow key can arrive across separate reads, so a tiny
|
|
# timeout avoids misreading a split sequence as a bare ESC.
|
|
try:
|
|
stdscr.timeout(60)
|
|
nxt = stdscr.getch()
|
|
finally:
|
|
stdscr.timeout(-1) # restore blocking mode
|
|
|
|
if nxt == -1:
|
|
return NAV_CANCEL # genuine lone ESC
|
|
|
|
if nxt in (ord("["), ord("O")): # CSI / SS3 introducer
|
|
final = stdscr.getch()
|
|
if final in (ord("A"), ord("k")):
|
|
return NAV_UP
|
|
if final in (ord("B"), ord("j")):
|
|
return NAV_DOWN
|
|
# Consume the tail of any other CSI sequence (e.g. ``[3~`` Delete,
|
|
# ``[H`` Home) up to its terminator so stray bytes don't leak into
|
|
# the next input() and corrupt it.
|
|
while 0x20 <= final <= 0x3F: # CSI parameter/intermediate bytes
|
|
final = stdscr.getch()
|
|
return NAV_NONE
|
|
# ESC followed by some other byte we don't handle — swallow it.
|
|
return NAV_NONE
|
|
|
|
return NAV_NONE
|
|
|
|
|
|
def curses_checklist(
|
|
title: str,
|
|
items: List[str],
|
|
selected: Set[int],
|
|
*,
|
|
cancel_returns: Set[int] | None = None,
|
|
status_fn: Optional[Callable[[Set[int]], str]] = None,
|
|
) -> Set[int]:
|
|
"""Curses multi-select checklist. Returns set of selected indices.
|
|
|
|
Args:
|
|
title: Header line displayed above the checklist.
|
|
items: Display labels for each row.
|
|
selected: Indices that start checked (pre-selected).
|
|
cancel_returns: Returned on ESC/q. Defaults to the original *selected*.
|
|
status_fn: Optional callback ``f(chosen_indices) -> str`` whose return
|
|
value is rendered on the bottom row of the terminal. Use this for
|
|
live aggregate info (e.g. estimated token counts).
|
|
"""
|
|
if cancel_returns is None:
|
|
cancel_returns = set(selected)
|
|
|
|
# Safety: curses and input() both hang or spin when stdin is not a
|
|
# terminal (e.g. subprocess pipe). Return defaults immediately.
|
|
if not sys.stdin.isatty():
|
|
return cancel_returns
|
|
|
|
try:
|
|
import curses
|
|
chosen = set(selected)
|
|
result_holder: list = [None]
|
|
|
|
def _draw(stdscr):
|
|
curses.curs_set(0)
|
|
if curses.has_colors():
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(2, curses.COLOR_YELLOW, -1)
|
|
curses.init_pair(3, 8 if curses.COLORS > 8 else curses.COLOR_WHITE, -1) # dim gray
|
|
cursor = 0
|
|
scroll_offset = 0
|
|
|
|
while True:
|
|
stdscr.clear()
|
|
max_y, max_x = stdscr.getmaxyx()
|
|
|
|
# Reserve bottom row for status bar when status_fn provided
|
|
footer_rows = 1 if status_fn else 0
|
|
|
|
# Header
|
|
try:
|
|
hattr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
hattr |= curses.color_pair(2)
|
|
stdscr.addnstr(0, 0, title, max_x - 1, hattr)
|
|
stdscr.addnstr(
|
|
1, 0,
|
|
" ↑↓ navigate SPACE toggle ENTER confirm ESC cancel",
|
|
max_x - 1, curses.A_DIM,
|
|
)
|
|
except curses.error:
|
|
pass
|
|
|
|
# Scrollable item list
|
|
visible_rows = max_y - 3 - footer_rows
|
|
if cursor < scroll_offset:
|
|
scroll_offset = cursor
|
|
elif cursor >= scroll_offset + visible_rows:
|
|
scroll_offset = cursor - visible_rows + 1
|
|
|
|
for draw_i, i in enumerate(
|
|
range(scroll_offset, min(len(items), scroll_offset + visible_rows))
|
|
):
|
|
y = draw_i + 3
|
|
if y >= max_y - 1 - footer_rows:
|
|
break
|
|
check = "✓" if i in chosen else " "
|
|
arrow = "→" if i == cursor else " "
|
|
line = f" {arrow} [{check}] {items[i]}"
|
|
attr = curses.A_NORMAL
|
|
if i == cursor:
|
|
attr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
attr |= curses.color_pair(1)
|
|
try:
|
|
stdscr.addnstr(y, 0, line, max_x - 1, attr)
|
|
except curses.error:
|
|
pass
|
|
|
|
# Status bar (bottom row, right-aligned)
|
|
if status_fn:
|
|
try:
|
|
status_text = status_fn(chosen)
|
|
if status_text:
|
|
# Right-align on the bottom row
|
|
sx = max(0, max_x - len(status_text) - 1)
|
|
sattr = curses.A_DIM
|
|
if curses.has_colors():
|
|
sattr |= curses.color_pair(3)
|
|
stdscr.addnstr(max_y - 1, sx, status_text, max_x - sx - 1, sattr)
|
|
except curses.error:
|
|
pass
|
|
|
|
stdscr.refresh()
|
|
action = read_menu_key(stdscr)
|
|
|
|
if action == NAV_UP:
|
|
cursor = (cursor - 1) % len(items)
|
|
elif action == NAV_DOWN:
|
|
cursor = (cursor + 1) % len(items)
|
|
elif action == NAV_TOGGLE:
|
|
chosen.symmetric_difference_update({cursor})
|
|
elif action == NAV_SELECT:
|
|
result_holder[0] = set(chosen)
|
|
return
|
|
elif action == NAV_CANCEL:
|
|
result_holder[0] = cancel_returns
|
|
return
|
|
|
|
curses.wrapper(_draw)
|
|
flush_stdin()
|
|
return result_holder[0] if result_holder[0] is not None else cancel_returns
|
|
|
|
except KeyboardInterrupt:
|
|
return cancel_returns
|
|
except Exception:
|
|
return _numbered_fallback(title, items, selected, cancel_returns, status_fn)
|
|
|
|
|
|
def curses_radiolist(
|
|
title: str,
|
|
items: List[str],
|
|
selected: int = 0,
|
|
*,
|
|
cancel_returns: int | None = None,
|
|
description: str | None = None,
|
|
) -> int:
|
|
"""Curses single-select radio list. Returns the selected index.
|
|
|
|
Args:
|
|
title: Header line displayed above the list.
|
|
items: Display labels for each row.
|
|
selected: Index that starts selected (pre-selected).
|
|
cancel_returns: Returned on ESC/q. Defaults to the original *selected*.
|
|
description: Optional multi-line text shown between the title and
|
|
the item list. Useful for context that should survive the
|
|
curses screen clear.
|
|
"""
|
|
if cancel_returns is None:
|
|
cancel_returns = selected
|
|
|
|
if not sys.stdin.isatty():
|
|
return cancel_returns
|
|
|
|
desc_lines: list[str] = []
|
|
if description:
|
|
desc_lines = description.splitlines()
|
|
|
|
try:
|
|
import curses
|
|
result_holder: list = [None]
|
|
|
|
def _draw(stdscr):
|
|
curses.curs_set(0)
|
|
if curses.has_colors():
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(2, curses.COLOR_YELLOW, -1)
|
|
cursor = selected
|
|
scroll_offset = 0
|
|
|
|
while True:
|
|
stdscr.clear()
|
|
max_y, max_x = stdscr.getmaxyx()
|
|
|
|
row = 0
|
|
|
|
# Header
|
|
try:
|
|
hattr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
hattr |= curses.color_pair(2)
|
|
stdscr.addnstr(row, 0, title, max_x - 1, hattr)
|
|
row += 1
|
|
|
|
# Description lines
|
|
for dline in desc_lines:
|
|
if row >= max_y - 1:
|
|
break
|
|
stdscr.addnstr(row, 0, dline, max_x - 1, curses.A_NORMAL)
|
|
row += 1
|
|
|
|
stdscr.addnstr(
|
|
row, 0,
|
|
" \u2191\u2193 navigate ENTER/SPACE select ESC cancel",
|
|
max_x - 1, curses.A_DIM,
|
|
)
|
|
row += 1
|
|
except curses.error:
|
|
pass
|
|
|
|
# Scrollable item list
|
|
items_start = row + 1
|
|
visible_rows = max_y - items_start - 1
|
|
if cursor < scroll_offset:
|
|
scroll_offset = cursor
|
|
elif cursor >= scroll_offset + visible_rows:
|
|
scroll_offset = cursor - visible_rows + 1
|
|
|
|
for draw_i, i in enumerate(
|
|
range(scroll_offset, min(len(items), scroll_offset + visible_rows))
|
|
):
|
|
y = draw_i + items_start
|
|
if y >= max_y - 1:
|
|
break
|
|
radio = "\u25cf" if i == selected else "\u25cb"
|
|
arrow = "\u2192" if i == cursor else " "
|
|
line = f" {arrow} ({radio}) {items[i]}"
|
|
attr = curses.A_NORMAL
|
|
if i == cursor:
|
|
attr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
attr |= curses.color_pair(1)
|
|
try:
|
|
stdscr.addnstr(y, 0, line, max_x - 1, attr)
|
|
except curses.error:
|
|
pass
|
|
|
|
stdscr.refresh()
|
|
action = read_menu_key(stdscr)
|
|
|
|
if action == NAV_UP:
|
|
cursor = (cursor - 1) % len(items)
|
|
elif action == NAV_DOWN:
|
|
cursor = (cursor + 1) % len(items)
|
|
elif action in (NAV_SELECT, NAV_TOGGLE):
|
|
result_holder[0] = cursor
|
|
return
|
|
elif action == NAV_CANCEL:
|
|
result_holder[0] = cancel_returns
|
|
return
|
|
|
|
curses.wrapper(_draw)
|
|
flush_stdin()
|
|
return result_holder[0] if result_holder[0] is not None else cancel_returns
|
|
|
|
except KeyboardInterrupt:
|
|
return cancel_returns
|
|
except Exception:
|
|
return _radio_numbered_fallback(title, items, selected, cancel_returns)
|
|
|
|
|
|
def _radio_numbered_fallback(
|
|
title: str,
|
|
items: List[str],
|
|
selected: int,
|
|
cancel_returns: int,
|
|
) -> int:
|
|
"""Text-based numbered fallback for radio selection."""
|
|
print(color(f"\n {title}", Colors.YELLOW))
|
|
print(color(" Select by number, Enter to confirm.\n", Colors.DIM))
|
|
|
|
for i, label in enumerate(items):
|
|
marker = color("(\u25cf)", Colors.GREEN) if i == selected else "(\u25cb)"
|
|
print(f" {marker} {i + 1:>2}. {label}")
|
|
print()
|
|
try:
|
|
val = input(color(f" Choice [default {selected + 1}]: ", Colors.DIM)).strip()
|
|
if not val:
|
|
return selected
|
|
idx = int(val) - 1
|
|
if 0 <= idx < len(items):
|
|
return idx
|
|
return selected
|
|
except (ValueError, KeyboardInterrupt, EOFError):
|
|
return cancel_returns
|
|
|
|
|
|
def curses_single_select(
|
|
title: str,
|
|
items: List[str],
|
|
default_index: int = 0,
|
|
*,
|
|
cancel_label: str = "Cancel",
|
|
) -> int | None:
|
|
"""Curses single-select menu. Returns selected index or None on cancel.
|
|
|
|
Works inside prompt_toolkit because curses.wrapper() restores the terminal
|
|
safely, unlike simple_term_menu which conflicts with /dev/tty.
|
|
"""
|
|
if not sys.stdin.isatty():
|
|
return None
|
|
|
|
try:
|
|
import curses
|
|
result_holder: list = [None]
|
|
|
|
all_items = list(items) + [cancel_label]
|
|
cancel_idx = len(items)
|
|
|
|
def _draw(stdscr):
|
|
curses.curs_set(0)
|
|
if curses.has_colors():
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
|
curses.init_pair(2, curses.COLOR_YELLOW, -1)
|
|
cursor = min(default_index, len(all_items) - 1)
|
|
scroll_offset = 0
|
|
|
|
while True:
|
|
stdscr.clear()
|
|
max_y, max_x = stdscr.getmaxyx()
|
|
|
|
try:
|
|
hattr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
hattr |= curses.color_pair(2)
|
|
stdscr.addnstr(0, 0, title, max_x - 1, hattr)
|
|
stdscr.addnstr(
|
|
1, 0,
|
|
" ↑↓ navigate ENTER confirm ESC/q cancel",
|
|
max_x - 1, curses.A_DIM,
|
|
)
|
|
except curses.error:
|
|
pass
|
|
|
|
visible_rows = max_y - 3
|
|
if cursor < scroll_offset:
|
|
scroll_offset = cursor
|
|
elif cursor >= scroll_offset + visible_rows:
|
|
scroll_offset = cursor - visible_rows + 1
|
|
|
|
for draw_i, i in enumerate(
|
|
range(scroll_offset, min(len(all_items), scroll_offset + visible_rows))
|
|
):
|
|
y = draw_i + 3
|
|
if y >= max_y - 1:
|
|
break
|
|
arrow = "→" if i == cursor else " "
|
|
line = f" {arrow} {all_items[i]}"
|
|
attr = curses.A_NORMAL
|
|
if i == cursor:
|
|
attr = curses.A_BOLD
|
|
if curses.has_colors():
|
|
attr |= curses.color_pair(1)
|
|
try:
|
|
stdscr.addnstr(y, 0, line, max_x - 1, attr)
|
|
except curses.error:
|
|
pass
|
|
|
|
stdscr.refresh()
|
|
action = read_menu_key(stdscr)
|
|
|
|
if action == NAV_UP:
|
|
cursor = (cursor - 1) % len(all_items)
|
|
elif action == NAV_DOWN:
|
|
cursor = (cursor + 1) % len(all_items)
|
|
elif action == NAV_SELECT:
|
|
result_holder[0] = cursor
|
|
return
|
|
elif action == NAV_CANCEL:
|
|
result_holder[0] = None
|
|
return
|
|
|
|
curses.wrapper(_draw)
|
|
flush_stdin()
|
|
if result_holder[0] is not None and result_holder[0] >= cancel_idx:
|
|
return None
|
|
return result_holder[0]
|
|
|
|
except KeyboardInterrupt:
|
|
return None
|
|
except Exception:
|
|
all_items = list(items) + [cancel_label]
|
|
cancel_idx = len(items)
|
|
return _numbered_single_fallback(title, all_items, cancel_idx)
|
|
|
|
|
|
def _numbered_single_fallback(
|
|
title: str,
|
|
items: List[str],
|
|
cancel_idx: int,
|
|
) -> int | None:
|
|
"""Text-based numbered fallback for single-select."""
|
|
print(f"\n {title}\n")
|
|
for i, label in enumerate(items, 1):
|
|
print(f" {i}. {label}")
|
|
print()
|
|
try:
|
|
val = input(f" Choice [1-{len(items)}]: ").strip()
|
|
if not val:
|
|
return None
|
|
idx = int(val) - 1
|
|
if 0 <= idx < len(items) and idx < cancel_idx:
|
|
return idx
|
|
if idx == cancel_idx:
|
|
return None
|
|
except (ValueError, KeyboardInterrupt, EOFError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _numbered_fallback(
|
|
title: str,
|
|
items: List[str],
|
|
selected: Set[int],
|
|
cancel_returns: Set[int],
|
|
status_fn: Optional[Callable[[Set[int]], str]] = None,
|
|
) -> Set[int]:
|
|
"""Text-based toggle fallback for terminals without curses."""
|
|
chosen = set(selected)
|
|
print(color(f"\n {title}", Colors.YELLOW))
|
|
print(color(" Toggle by number, Enter to confirm.\n", Colors.DIM))
|
|
|
|
while True:
|
|
for i, label in enumerate(items):
|
|
marker = color("[✓]", Colors.GREEN) if i in chosen else "[ ]"
|
|
print(f" {marker} {i + 1:>2}. {label}")
|
|
if status_fn:
|
|
status_text = status_fn(chosen)
|
|
if status_text:
|
|
print(color(f"\n {status_text}", Colors.DIM))
|
|
print()
|
|
try:
|
|
val = input(color(" Toggle # (or Enter to confirm): ", Colors.DIM)).strip()
|
|
if not val:
|
|
break
|
|
idx = int(val) - 1
|
|
if 0 <= idx < len(items):
|
|
chosen.symmetric_difference_update({idx})
|
|
except (ValueError, KeyboardInterrupt, EOFError):
|
|
return cancel_returns
|
|
print()
|
|
|
|
return chosen
|