mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-03 02:11:48 +00:00
fix(cli): recover from leaked mouse tracking escapes
Detect leaked SGR mouse-report fragments in CLI input, strip them, and reset terminal modes in-place so scroll and typing recover without reopening the tab. Add regression tests for escaped, visible, and bare leak forms.
This commit is contained in:
parent
8cce85b819
commit
98a428fd61
2 changed files with 101 additions and 6 deletions
87
cli.py
87
cli.py
|
|
@ -1540,9 +1540,27 @@ def _strip_leaked_bracketed_paste_wrappers(text: str) -> str:
|
||||||
# that appears when the ESC byte was stripped by a prior filter.
|
# that appears when the ESC byte was stripped by a prior filter.
|
||||||
_DSR_CPR_ESC_RE = re.compile(r"\x1b\[\d+;\d+R")
|
_DSR_CPR_ESC_RE = re.compile(r"\x1b\[\d+;\d+R")
|
||||||
_DSR_CPR_VISIBLE_RE = re.compile(r"\^\[\[\d+;\d+R")
|
_DSR_CPR_VISIBLE_RE = re.compile(r"\^\[\[\d+;\d+R")
|
||||||
|
_SGR_MOUSE_ESC_RE = re.compile(r"\x1b\[<\d{1,3};\d{1,4};\d{1,4}[Mm]")
|
||||||
|
_SGR_MOUSE_VISIBLE_RE = re.compile(r"\^\[\[<\d{1,3};\d{1,4};\d{1,4}[Mm]")
|
||||||
|
# Some terminals/filters can drop ESC and literal "^[[", leaving only
|
||||||
|
# "<btn;col;rowM" fragments in the buffer. Keep this broad on purpose:
|
||||||
|
# these fragments are extremely unlikely to be intentional user input, and
|
||||||
|
# stripping them is better than sending corrupted prompts.
|
||||||
|
_SGR_MOUSE_BARE_RE = re.compile(r"<\d{1,3};\d{1,4};\d{1,4}[Mm]")
|
||||||
|
_TERMINAL_INPUT_MODE_RESET_SEQ = (
|
||||||
|
"\x1b[?1006l" # disable SGR mouse
|
||||||
|
"\x1b[?1003l" # disable any-motion tracking
|
||||||
|
"\x1b[?1002l" # disable button-motion tracking
|
||||||
|
"\x1b[?1000l" # disable click tracking
|
||||||
|
"\x1b[?1004l" # disable focus events
|
||||||
|
"\x1b[?2004l" # disable bracketed paste
|
||||||
|
"\x1b[?1049l" # leave alt screen (if stuck there)
|
||||||
|
"\x1b[0m" # reset text attributes
|
||||||
|
"\x1b[?25h" # ensure cursor visible
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _strip_leaked_terminal_responses(text: str) -> str:
|
def _strip_leaked_terminal_responses_with_meta(text: str) -> tuple[str, bool]:
|
||||||
"""Strip leaked terminal control-response sequences from user input.
|
"""Strip leaked terminal control-response sequences from user input.
|
||||||
|
|
||||||
Covers Cursor Position Report (CPR / DSR) responses — ``ESC[<row>;<col>R``
|
Covers Cursor Position Report (CPR / DSR) responses — ``ESC[<row>;<col>R``
|
||||||
|
|
@ -1552,12 +1570,30 @@ def _strip_leaked_terminal_responses(text: str) -> str:
|
||||||
(resize storms, multiplexer focus changes, slow PTYs) the response
|
(resize storms, multiplexer focus changes, slow PTYs) the response
|
||||||
lands in the input buffer as literal text and corrupts what the user
|
lands in the input buffer as literal text and corrupts what the user
|
||||||
typed.
|
typed.
|
||||||
|
|
||||||
|
Also strips leaked SGR mouse-report fragments (``ESC[<...M/m`` and
|
||||||
|
degraded visible forms). Returns ``(cleaned_text, had_mouse_reports)``
|
||||||
|
so callers can trigger an in-place terminal mode recovery when needed.
|
||||||
"""
|
"""
|
||||||
if not text:
|
if not text:
|
||||||
return text
|
return text, False
|
||||||
|
had_mouse_reports = bool(
|
||||||
|
_SGR_MOUSE_ESC_RE.search(text)
|
||||||
|
or _SGR_MOUSE_VISIBLE_RE.search(text)
|
||||||
|
or _SGR_MOUSE_BARE_RE.search(text)
|
||||||
|
)
|
||||||
text = _DSR_CPR_ESC_RE.sub("", text)
|
text = _DSR_CPR_ESC_RE.sub("", text)
|
||||||
text = _DSR_CPR_VISIBLE_RE.sub("", text)
|
text = _DSR_CPR_VISIBLE_RE.sub("", text)
|
||||||
return text
|
text = _SGR_MOUSE_ESC_RE.sub("", text)
|
||||||
|
text = _SGR_MOUSE_VISIBLE_RE.sub("", text)
|
||||||
|
text = _SGR_MOUSE_BARE_RE.sub("", text)
|
||||||
|
return text, had_mouse_reports
|
||||||
|
|
||||||
|
|
||||||
|
def _strip_leaked_terminal_responses(text: str) -> str:
|
||||||
|
"""Compatibility wrapper returning only cleaned text."""
|
||||||
|
cleaned, _ = _strip_leaked_terminal_responses_with_meta(text)
|
||||||
|
return cleaned
|
||||||
|
|
||||||
|
|
||||||
def _collect_query_images(query: str | None, image_arg: str | None = None) -> tuple[str, list[Path]]:
|
def _collect_query_images(query: str | None, image_arg: str | None = None) -> tuple[str, list[Path]]:
|
||||||
|
|
@ -1931,6 +1967,8 @@ class HermesCLI:
|
||||||
self._stream_box_opened = False # True once the response box header is printed
|
self._stream_box_opened = False # True once the response box header is printed
|
||||||
self._reasoning_preview_buf = "" # Coalesce tiny reasoning chunks for [thinking] output
|
self._reasoning_preview_buf = "" # Coalesce tiny reasoning chunks for [thinking] output
|
||||||
self._pending_edit_snapshots = {}
|
self._pending_edit_snapshots = {}
|
||||||
|
self._last_input_mode_recovery = 0.0
|
||||||
|
self._input_mode_recovery_notice_shown = False
|
||||||
|
|
||||||
# Configuration - priority: CLI args > env vars > config file
|
# Configuration - priority: CLI args > env vars > config file
|
||||||
# Model comes from: CLI arg or config.yaml (single source of truth).
|
# Model comes from: CLI arg or config.yaml (single source of truth).
|
||||||
|
|
@ -4120,6 +4158,37 @@ class HermesCLI:
|
||||||
sys.stdout.write(seq)
|
sys.stdout.write(seq)
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
def _recover_terminal_input_modes(self, *, reason: str) -> None:
|
||||||
|
"""Best-effort reset when leaked mouse reports indicate mode drift."""
|
||||||
|
now = time.monotonic()
|
||||||
|
# Rate-limit to avoid thrashing if a terminal floods reports.
|
||||||
|
if now - self._last_input_mode_recovery < 0.5:
|
||||||
|
return
|
||||||
|
self._last_input_mode_recovery = now
|
||||||
|
|
||||||
|
out = getattr(self, "_app", None)
|
||||||
|
output = getattr(out, "output", None) if out else None
|
||||||
|
try:
|
||||||
|
if output and hasattr(output, "write_raw"):
|
||||||
|
output.write_raw(_TERMINAL_INPUT_MODE_RESET_SEQ)
|
||||||
|
output.flush()
|
||||||
|
elif output and hasattr(output, "write"):
|
||||||
|
output.write(_TERMINAL_INPUT_MODE_RESET_SEQ)
|
||||||
|
output.flush()
|
||||||
|
else:
|
||||||
|
sys.stdout.write(_TERMINAL_INPUT_MODE_RESET_SEQ)
|
||||||
|
sys.stdout.flush()
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.warning("Recovered terminal input modes after leak: %s", reason)
|
||||||
|
if not self._input_mode_recovery_notice_shown:
|
||||||
|
self._input_mode_recovery_notice_shown = True
|
||||||
|
_cprint(
|
||||||
|
f" {_DIM}Recovered terminal input modes after leaked mouse reports. "
|
||||||
|
f"If this repeats, run /new or restart this tab.{_RST}"
|
||||||
|
)
|
||||||
|
|
||||||
def _handle_copy_command(self, cmd_original: str) -> None:
|
def _handle_copy_command(self, cmd_original: str) -> None:
|
||||||
"""Handle /copy [number] — copy assistant output to clipboard."""
|
"""Handle /copy [number] — copy assistant output to clipboard."""
|
||||||
parts = cmd_original.split(maxsplit=1)
|
parts = cmd_original.split(maxsplit=1)
|
||||||
|
|
@ -10035,7 +10104,9 @@ class HermesCLI:
|
||||||
# so the 5-line collapse threshold and display are consistent.
|
# so the 5-line collapse threshold and display are consistent.
|
||||||
pasted_text = pasted_text.replace('\r\n', '\n').replace('\r', '\n')
|
pasted_text = pasted_text.replace('\r\n', '\n').replace('\r', '\n')
|
||||||
pasted_text = _strip_leaked_bracketed_paste_wrappers(pasted_text)
|
pasted_text = _strip_leaked_bracketed_paste_wrappers(pasted_text)
|
||||||
pasted_text = _strip_leaked_terminal_responses(pasted_text)
|
pasted_text, _had_mouse_reports = _strip_leaked_terminal_responses_with_meta(pasted_text)
|
||||||
|
if _had_mouse_reports:
|
||||||
|
self._recover_terminal_input_modes(reason="mouse reports leaked into bracketed paste payload")
|
||||||
if _should_auto_attach_clipboard_image_on_paste(pasted_text) and self._try_attach_clipboard_image():
|
if _should_auto_attach_clipboard_image_on_paste(pasted_text) and self._try_attach_clipboard_image():
|
||||||
event.app.invalidate()
|
event.app.invalidate()
|
||||||
if pasted_text:
|
if pasted_text:
|
||||||
|
|
@ -10189,7 +10260,9 @@ class HermesCLI:
|
||||||
event so it never triggers this.
|
event so it never triggers this.
|
||||||
"""
|
"""
|
||||||
text = _strip_leaked_bracketed_paste_wrappers(buf.text)
|
text = _strip_leaked_bracketed_paste_wrappers(buf.text)
|
||||||
text = _strip_leaked_terminal_responses(text)
|
text, _had_mouse_reports = _strip_leaked_terminal_responses_with_meta(text)
|
||||||
|
if _had_mouse_reports:
|
||||||
|
self._recover_terminal_input_modes(reason="mouse reports leaked into prompt buffer")
|
||||||
if text != buf.text:
|
if text != buf.text:
|
||||||
cursor = min(buf.cursor_position, len(text))
|
cursor = min(buf.cursor_position, len(text))
|
||||||
_paste_just_collapsed[0] = True
|
_paste_just_collapsed[0] = True
|
||||||
|
|
@ -10942,7 +11015,9 @@ class HermesCLI:
|
||||||
|
|
||||||
if isinstance(user_input, str):
|
if isinstance(user_input, str):
|
||||||
user_input = _strip_leaked_bracketed_paste_wrappers(user_input)
|
user_input = _strip_leaked_bracketed_paste_wrappers(user_input)
|
||||||
user_input = _strip_leaked_terminal_responses(user_input)
|
user_input, _had_mouse_reports = _strip_leaked_terminal_responses_with_meta(user_input)
|
||||||
|
if _had_mouse_reports:
|
||||||
|
self._recover_terminal_input_modes(reason="mouse reports leaked into submitted input")
|
||||||
|
|
||||||
# Check for commands — but detect dragged/pasted file paths first.
|
# Check for commands — but detect dragged/pasted file paths first.
|
||||||
# See _detect_file_drop() for details.
|
# See _detect_file_drop() for details.
|
||||||
|
|
|
||||||
|
|
@ -55,3 +55,23 @@ class TestStripLeakedTerminalResponses:
|
||||||
def test_preserves_multiline_content(self):
|
def test_preserves_multiline_content(self):
|
||||||
text = "line 1\n\x1b[53;1Rline 2"
|
text = "line 1\n\x1b[53;1Rline 2"
|
||||||
assert _strip_leaked_terminal_responses(text) == "line 1\nline 2"
|
assert _strip_leaked_terminal_responses(text) == "line 1\nline 2"
|
||||||
|
|
||||||
|
def test_strips_sgr_mouse_report_esc_form(self):
|
||||||
|
text = "abc\x1b[<65;1;49Mdef"
|
||||||
|
assert _strip_leaked_terminal_responses(text) == "abcdef"
|
||||||
|
|
||||||
|
def test_strips_sgr_mouse_report_visible_form(self):
|
||||||
|
text = "abc^[[<65;1;49Mdef"
|
||||||
|
assert _strip_leaked_terminal_responses(text) == "abcdef"
|
||||||
|
|
||||||
|
def test_strips_sgr_mouse_report_bare_form(self):
|
||||||
|
text = "abc<65;1;49Mdef"
|
||||||
|
assert _strip_leaked_terminal_responses(text) == "abcdef"
|
||||||
|
|
||||||
|
def test_strips_multiple_concatenated_sgr_mouse_reports(self):
|
||||||
|
text = "<65;1;49M<35;1;42Mhello<64;1;40m"
|
||||||
|
assert _strip_leaked_terminal_responses(text) == "hello"
|
||||||
|
|
||||||
|
def test_does_not_strip_regular_angle_bracket_text(self):
|
||||||
|
text = "render <div class='hero'> literal"
|
||||||
|
assert _strip_leaked_terminal_responses(text) == text
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue