fix(tui-gateway): dispatch slow RPC handlers on a thread pool (#12546)

The stdin-read loop in entry.py calls handle_request() inline, so the
five handlers that can block for seconds to minutes
(slash.exec, cli.exec, shell.exec, session.resume, session.branch)
freeze the dispatcher. While one is running, any inbound RPC —
notably approval.respond and session.interrupt — sits unread in the
pipe buffer and lands only after the slow handler returns.

Route only those five onto a small ThreadPoolExecutor; every other
handler stays on the main thread so the fast-path ordering is
unchanged and the audit surface stays small. write_json is already
_stdout_lock-guarded, so concurrent response writes are safe. Pool
size defaults to 4 (overridable via HERMES_TUI_RPC_POOL_WORKERS).

- add _LONG_HANDLERS set + ThreadPoolExecutor + atexit shutdown
- new dispatch(req) function: pool for long handlers, inline for rest
- _run_and_emit wraps pool work in a try/except so a misbehaving
  handler still surfaces as a JSON-RPC error instead of silently
  dying in a worker
- entry.py swaps handle_request → dispatch
- 5 new tests: sync path still inline, long handlers emit via stdout,
  fast handler not blocked behind slow one, handler exceptions map to
  error responses, non-long methods always take the sync path

Manual repro confirms the fix: shell.exec(sleep 3) + terminal.resize
sent back-to-back now returns the resize response at t=0s while the
sleep finishes independently at t=3s. Before, both landed together
at t=3s.

Fixes #12546.
This commit is contained in:
Brooklyn Nicholson 2026-04-19 07:47:15 -05:00
parent c567adb58a
commit a6fe5d0872
3 changed files with 132 additions and 2 deletions

View file

@ -4,6 +4,7 @@ import io
import json
import sys
import threading
import time
from unittest.mock import MagicMock, patch
import pytest
@ -432,3 +433,81 @@ def test_command_dispatch_returns_skill_payload(server):
assert result["type"] == "skill"
assert result["message"] == fake_msg
assert result["name"] == "hermes-agent-dev"
# ── dispatch(): pool routing for long handlers (#12546) ──────────────
def test_dispatch_runs_short_handlers_inline(server):
"""Non-long handlers return their response synchronously from dispatch()."""
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
resp = server.dispatch({"id": "r1", "method": "fast.ping", "params": {}})
assert resp == {"jsonrpc": "2.0", "id": "r1", "result": {"pong": True}}
def test_dispatch_offloads_long_handlers_and_emits_via_stdout(capture):
"""Long handlers run on the pool and write their response via write_json."""
server, buf = capture
server._methods["slash.exec"] = lambda rid, params: server._ok(rid, {"output": "hi"})
resp = server.dispatch({"id": "r2", "method": "slash.exec", "params": {}})
assert resp is None
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written == {"jsonrpc": "2.0", "id": "r2", "result": {"output": "hi"}}
def test_dispatch_long_handler_does_not_block_fast_handler(server):
"""A slow long handler must not prevent a concurrent fast handler from completing."""
released = threading.Event()
server._methods["slash.exec"] = lambda rid, params: (released.wait(timeout=5), server._ok(rid, {"done": True}))[1]
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
t0 = time.monotonic()
assert server.dispatch({"id": "slow", "method": "slash.exec", "params": {}}) is None
fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}})
fast_elapsed = time.monotonic() - t0
assert fast_resp["result"] == {"pong": True}
assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind slow handler"
released.set()
def test_dispatch_long_handler_exception_produces_error_response(capture):
"""An exception inside a pool-dispatched handler still yields a JSON-RPC error."""
server, buf = capture
def boom(rid, params):
raise RuntimeError("kaboom")
server._methods["slash.exec"] = boom
server.dispatch({"id": "r3", "method": "slash.exec", "params": {}})
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written["id"] == "r3"
assert written["error"]["code"] == -32000
assert "kaboom" in written["error"]["message"]
def test_dispatch_unknown_long_method_still_goes_inline(server):
"""Method name not in _LONG_HANDLERS takes the sync path even if handler is slow."""
server._methods["some.method"] = lambda rid, params: server._ok(rid, {"ok": True})
resp = server.dispatch({"id": "r4", "method": "some.method", "params": {}})
assert resp["result"] == {"ok": True}

View file

@ -2,7 +2,7 @@ import json
import signal
import sys
from tui_gateway.server import handle_request, resolve_skin, write_json
from tui_gateway.server import dispatch, resolve_skin, write_json
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -28,7 +28,7 @@ def main():
sys.exit(0)
continue
resp = handle_request(req)
resp = dispatch(req)
if resp is not None:
if not write_json(resp):
sys.exit(0)

View file

@ -1,4 +1,5 @@
import atexit
import concurrent.futures
import copy
import json
import os
@ -36,6 +37,29 @@ _cfg_cache: dict | None = None
_cfg_mtime: float | None = None
_SLASH_WORKER_TIMEOUT_S = max(5.0, float(os.environ.get("HERMES_TUI_SLASH_TIMEOUT_S", "45") or 45))
# ── Async RPC dispatch (#12546) ──────────────────────────────────────
# A handful of handlers block the dispatcher loop in entry.py for seconds
# to minutes (slash.exec, cli.exec, shell.exec, session.resume,
# session.branch). While they're running, inbound RPCs — notably
# approval.respond and session.interrupt — sit unread in the stdin pipe.
# We route only those slow handlers onto a small thread pool; everything
# else stays on the main thread so ordering stays sane for the fast path.
# write_json is already _stdout_lock-guarded, so concurrent response
# writes are safe.
_LONG_HANDLERS = frozenset({
"cli.exec",
"session.branch",
"session.resume",
"shell.exec",
"slash.exec",
})
_RPC_POOL_WORKERS = max(2, int(os.environ.get("HERMES_TUI_RPC_POOL_WORKERS", "4") or 4))
_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=_RPC_POOL_WORKERS,
thread_name_prefix="tui-rpc",
)
atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True))
# Reserve real stdout for JSON-RPC only; redirect Python's stdout to stderr
# so stray print() from libraries/tools becomes harmless gateway.stderr instead
# of corrupting the JSON protocol.
@ -200,6 +224,33 @@ def handle_request(req: dict) -> dict | None:
return fn(req.get("id"), req.get("params", {}))
def _run_and_emit(req: dict) -> None:
"""Run a handler on the RPC pool and write its response directly.
Catches any unexpected exception so a misbehaving handler can't kill
the worker thread silently the caller still sees a JSON-RPC error.
"""
try:
resp = handle_request(req)
except Exception as exc:
resp = _err(req.get("id"), -32000, f"handler error: {exc}")
if resp is not None:
write_json(resp)
def dispatch(req: dict) -> dict | None:
"""Route an inbound RPC — long handlers to the pool, everything else inline.
Returns the response for sync-dispatched requests so the caller
(entry.py) can write it. Returns None when the request has been
scheduled on the pool; the worker writes the response itself.
"""
if req.get("method", "") in _LONG_HANDLERS:
_pool.submit(_run_and_emit, req)
return None
return handle_request(req)
def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None:
ready = session.get("agent_ready")
if ready is not None and not ready.wait(timeout=timeout):