mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(cli): bracketed-paste timeout prevents permanent input freeze (#16263)
When the terminal drops the ESC[201~ end mark during a bracketed paste (terminal race, torn write, SSH glitch, macOS sleep/wake), prompt_toolkit's Vt100Parser keeps buffering all later input in _paste_buffer forever. From the user's perspective, the CLI appears frozen — the only recovery was closing the tab/session. This patch monkey-patches Vt100Parser.feed() so that bracketed-paste mode flushes buffered content as a normal BracketedPaste event after 2 seconds without an end marker, then restores normal parsing. Includes 8 regression tests covering normal paste, timeout recovery, torn end marks, and edge cases. Surgical reapply of PR #27518. Original branch was many months stale (1193 files / 172k LOC of unrelated reverts); the substantive ~77 LOC patch in cli.py plus the new 157-line test file were reapplied onto current main with the contributor's authorship preserved via --author.
This commit is contained in:
parent
8697471419
commit
1b12cd5241
2 changed files with 244 additions and 0 deletions
87
cli.py
87
cli.py
|
|
@ -2360,6 +2360,89 @@ def _strip_leaked_bracketed_paste_wrappers(text: str) -> str:
|
|||
return text
|
||||
|
||||
|
||||
def _apply_bracketed_paste_timeout_patch() -> None:
|
||||
"""Patch prompt_toolkit to recover from torn bracketed-paste sequences.
|
||||
|
||||
prompt_toolkit's ``Vt100Parser.feed()`` buffers all input while waiting
|
||||
for the ESC[201~ end mark. If a terminal drops that end mark (terminal
|
||||
race, torn write, SSH glitch, macOS sleep/wake), input appears frozen
|
||||
forever — the only recovery used to be killing the tab.
|
||||
|
||||
This patch wraps ``Vt100Parser.feed`` so that bracketed-paste mode
|
||||
flushes buffered content as a normal ``BracketedPaste`` event after
|
||||
``_BP_TIMEOUT_S`` seconds without an end marker, then resumes normal
|
||||
parsing. See upstream issue #16263.
|
||||
|
||||
The patch is idempotent — repeated calls are no-ops via the
|
||||
``_hermes_bp_timeout_patched`` sentinel on the module.
|
||||
"""
|
||||
try:
|
||||
import prompt_toolkit.input.vt100_parser as _vt100_mod
|
||||
from prompt_toolkit.keys import Keys as _PtKeys
|
||||
from prompt_toolkit.key_binding.key_processor import KeyPress as _PtKeyPress
|
||||
|
||||
if getattr(_vt100_mod, "_hermes_bp_timeout_patched", False):
|
||||
return
|
||||
|
||||
_BP_TIMEOUT_S = 2.0 # max time to wait for ESC[201~ before flushing
|
||||
|
||||
def _patched_vt100_feed(self_parser, data: str) -> None:
|
||||
if self_parser._in_bracketed_paste:
|
||||
self_parser._paste_buffer += data
|
||||
end_mark = "\x1b[201~"
|
||||
|
||||
if end_mark in self_parser._paste_buffer:
|
||||
end_index = self_parser._paste_buffer.index(end_mark)
|
||||
paste_content = self_parser._paste_buffer[:end_index]
|
||||
self_parser.feed_key_callback(
|
||||
_PtKeyPress(_PtKeys.BracketedPaste, paste_content)
|
||||
)
|
||||
self_parser._in_bracketed_paste = False
|
||||
remaining = self_parser._paste_buffer[
|
||||
end_index + len(end_mark):
|
||||
]
|
||||
self_parser._paste_buffer = ""
|
||||
self_parser._hermes_bp_start = None
|
||||
if remaining:
|
||||
_patched_vt100_feed(self_parser, remaining)
|
||||
else:
|
||||
bp_start = getattr(self_parser, "_hermes_bp_start", None)
|
||||
now = time.monotonic()
|
||||
if bp_start is None:
|
||||
self_parser._hermes_bp_start = now
|
||||
elif now - bp_start > _BP_TIMEOUT_S:
|
||||
paste_content = self_parser._paste_buffer
|
||||
self_parser._in_bracketed_paste = False
|
||||
self_parser._paste_buffer = ""
|
||||
self_parser._hermes_bp_start = None
|
||||
if paste_content:
|
||||
self_parser.feed_key_callback(
|
||||
_PtKeyPress(_PtKeys.BracketedPaste, paste_content)
|
||||
)
|
||||
logger.warning(
|
||||
"Bracketed-paste timeout (%.1fs) — flushed %d bytes "
|
||||
"without end mark. Terminal may have dropped ESC[201~ "
|
||||
"(see #16263).",
|
||||
now - bp_start,
|
||||
len(paste_content),
|
||||
)
|
||||
else:
|
||||
# Normal mode — re-inline prompt_toolkit's normal feed path.
|
||||
# Calling the original feed here would double-buffer after the
|
||||
# bracketed-paste entry transition.
|
||||
for i, c in enumerate(data):
|
||||
if self_parser._in_bracketed_paste:
|
||||
_patched_vt100_feed(self_parser, data[i:])
|
||||
break
|
||||
self_parser._input_parser.send(c)
|
||||
|
||||
_vt100_mod.Vt100Parser.feed = _patched_vt100_feed
|
||||
_vt100_mod._hermes_bp_timeout_patched = True
|
||||
logger.debug("Applied Vt100Parser bracketed-paste timeout patch (#16263)")
|
||||
except Exception as exc: # noqa: BLE001 — defensive: never break startup
|
||||
logger.debug("Bracketed-paste timeout patch skipped: %s", exc)
|
||||
|
||||
|
||||
# Cursor Position Report (CPR / DSR) response, format ``ESC[<row>;<col>R``.
|
||||
# prompt_toolkit's _on_resize() + renderer send ``ESC[6n`` queries to the
|
||||
# terminal; under resize storms or tab switches the terminal's reply can
|
||||
|
|
@ -14151,6 +14234,10 @@ class HermesCLI:
|
|||
except Exception:
|
||||
pass
|
||||
|
||||
# Apply bracketed-paste timeout recovery so torn ESC[201~ end marks
|
||||
# don't permanently freeze the input (issue #16263). Idempotent.
|
||||
_apply_bracketed_paste_timeout_patch()
|
||||
|
||||
_original_on_resize = app._on_resize
|
||||
|
||||
def _resize_clear_ghosts():
|
||||
|
|
|
|||
157
tests/cli/test_bracketed_paste_timeout.py
Normal file
157
tests/cli/test_bracketed_paste_timeout.py
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
"""Tests for bracketed-paste timeout safety valve (#16263).
|
||||
|
||||
Verifies the production helper in cli.py monkey-patches prompt_toolkit's
|
||||
Vt100Parser.feed() so the parser auto-escapes from bracketed-paste mode when
|
||||
the ESC[201~ end mark is never received.
|
||||
"""
|
||||
import ast
|
||||
import importlib
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from prompt_toolkit.keys import Keys
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
CLI_PATH = ROOT / "cli.py"
|
||||
|
||||
|
||||
def _load_production_patch_helper():
|
||||
"""Load cli._apply_bracketed_paste_timeout_patch without importing cli.
|
||||
|
||||
Importing cli.py pulls optional runtime deps that aren't required for this
|
||||
parser-level regression. AST-loading the exact helper keeps the test tied
|
||||
to production code while avoiding unrelated import side effects. If the
|
||||
production helper is removed, this test fails.
|
||||
"""
|
||||
source = CLI_PATH.read_text(encoding="utf-8")
|
||||
tree = ast.parse(source)
|
||||
helper_node = next(
|
||||
(
|
||||
node
|
||||
for node in tree.body
|
||||
if isinstance(node, ast.FunctionDef)
|
||||
and node.name == "_apply_bracketed_paste_timeout_patch"
|
||||
),
|
||||
None,
|
||||
)
|
||||
assert helper_node is not None, (
|
||||
"cli.py must define _apply_bracketed_paste_timeout_patch()"
|
||||
)
|
||||
helper_source = ast.get_source_segment(source, helper_node)
|
||||
namespace = {"time": time, "logger": logging.getLogger("test.cli")}
|
||||
exec(helper_source, namespace)
|
||||
return namespace["_apply_bracketed_paste_timeout_patch"]
|
||||
|
||||
|
||||
def _reset_and_apply_production_patch():
|
||||
"""Reload prompt_toolkit's parser and apply Hermes' production patch."""
|
||||
import prompt_toolkit.input.vt100_parser as vt100_mod
|
||||
|
||||
vt100_mod = importlib.reload(vt100_mod)
|
||||
# importlib.reload() preserves module dict entries that the reloaded source
|
||||
# does not redefine, so clear Hermes' sentinel before re-applying.
|
||||
if hasattr(vt100_mod, "_hermes_bp_timeout_patched"):
|
||||
delattr(vt100_mod, "_hermes_bp_timeout_patched")
|
||||
_load_production_patch_helper()()
|
||||
assert getattr(vt100_mod, "_hermes_bp_timeout_patched", False)
|
||||
return vt100_mod
|
||||
|
||||
|
||||
class TestBracketedPasteTimeout:
|
||||
"""Verify the Vt100Parser monkey-patch prevents frozen bracketed-paste."""
|
||||
|
||||
def _make_parser(self):
|
||||
"""Create a Vt100Parser after applying the production patch."""
|
||||
vt100_mod = _reset_and_apply_production_patch()
|
||||
callback = MagicMock()
|
||||
parser = vt100_mod.Vt100Parser(callback)
|
||||
return parser, callback
|
||||
|
||||
def test_normal_bracketed_paste_works(self):
|
||||
"""A complete bracketed-paste sequence should work normally."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~hello world\x1b[201~")
|
||||
callback.assert_called_once()
|
||||
call_args = callback.call_args[0][0]
|
||||
assert call_args.data == "hello world"
|
||||
|
||||
def test_incomplete_paste_times_out(self):
|
||||
"""If ESC[201~ is never received, parser should recover after timeout."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~some pasted text")
|
||||
assert parser._in_bracketed_paste
|
||||
|
||||
parser._hermes_bp_start = time.monotonic() - 3.0
|
||||
parser.feed("more data")
|
||||
|
||||
assert not parser._in_bracketed_paste
|
||||
assert callback.called
|
||||
|
||||
def test_timeout_preserves_buffered_content(self):
|
||||
"""Auto-escape should flush buffered content, not lose it."""
|
||||
parser, callback = self._make_parser()
|
||||
content = "line1\nline2\nline3"
|
||||
parser.feed(f"\x1b[200~{content}")
|
||||
parser._hermes_bp_start = time.monotonic() - 3.0
|
||||
parser.feed("")
|
||||
|
||||
paste_events = [
|
||||
c[0][0]
|
||||
for c in callback.call_args_list
|
||||
if hasattr(c[0][0], "key") and c[0][0].key == Keys.BracketedPaste
|
||||
]
|
||||
assert len(paste_events) >= 1
|
||||
assert content in paste_events[0].data
|
||||
|
||||
def test_normal_keys_after_timeout_recovery(self):
|
||||
"""After timeout recovery, normal key processing should resume."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~stuck")
|
||||
parser._hermes_bp_start = time.monotonic() - 3.0
|
||||
parser.feed("")
|
||||
|
||||
assert not parser._in_bracketed_paste
|
||||
callback.reset_mock()
|
||||
parser.feed("a")
|
||||
assert not parser._in_bracketed_paste
|
||||
|
||||
def test_no_timeout_when_end_mark_arrives_quickly(self):
|
||||
"""No timeout should fire if end mark arrives within the window."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~quick paste\x1b[201~")
|
||||
assert not parser._in_bracketed_paste
|
||||
callback.assert_called_once()
|
||||
|
||||
def test_subsequent_data_after_incomplete_paste(self):
|
||||
"""Data arriving after a stuck paste should be processable."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~content")
|
||||
parser._hermes_bp_start = time.monotonic() - 5.0
|
||||
parser.feed("x")
|
||||
|
||||
assert not parser._in_bracketed_paste
|
||||
assert callback.call_count >= 1
|
||||
|
||||
def test_torn_end_mark_recovers(self):
|
||||
"""If end mark arrives split across feeds within timeout, it still works."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~some content\x1b[20")
|
||||
assert parser._in_bracketed_paste
|
||||
|
||||
parser.feed("1~")
|
||||
assert not parser._in_bracketed_paste
|
||||
callback.assert_called_once()
|
||||
assert callback.call_args[0][0].data == "some content"
|
||||
|
||||
def test_no_timeout_under_threshold(self):
|
||||
"""Bracketed-paste mode should not timeout within the 2s window."""
|
||||
parser, callback = self._make_parser()
|
||||
parser.feed("\x1b[200~waiting")
|
||||
parser._hermes_bp_start = time.monotonic() - 0.5
|
||||
parser.feed("more waiting")
|
||||
|
||||
assert parser._in_bracketed_paste
|
||||
assert not callback.called
|
||||
Loading…
Add table
Add a link
Reference in a new issue