diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 926dfadf1..da154cc16 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -4,6 +4,7 @@ import io import json import sys import threading +import time from unittest.mock import MagicMock, patch import pytest @@ -432,3 +433,81 @@ def test_command_dispatch_returns_skill_payload(server): assert result["type"] == "skill" assert result["message"] == fake_msg assert result["name"] == "hermes-agent-dev" + + +# ── dispatch(): pool routing for long handlers (#12546) ────────────── + + +def test_dispatch_runs_short_handlers_inline(server): + """Non-long handlers return their response synchronously from dispatch().""" + server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True}) + + resp = server.dispatch({"id": "r1", "method": "fast.ping", "params": {}}) + + assert resp == {"jsonrpc": "2.0", "id": "r1", "result": {"pong": True}} + + +def test_dispatch_offloads_long_handlers_and_emits_via_stdout(capture): + """Long handlers run on the pool and write their response via write_json.""" + server, buf = capture + server._methods["slash.exec"] = lambda rid, params: server._ok(rid, {"output": "hi"}) + + resp = server.dispatch({"id": "r2", "method": "slash.exec", "params": {}}) + assert resp is None + + for _ in range(50): + if buf.getvalue(): + break + time.sleep(0.01) + + written = json.loads(buf.getvalue()) + assert written == {"jsonrpc": "2.0", "id": "r2", "result": {"output": "hi"}} + + +def test_dispatch_long_handler_does_not_block_fast_handler(server): + """A slow long handler must not prevent a concurrent fast handler from completing.""" + released = threading.Event() + server._methods["slash.exec"] = lambda rid, params: (released.wait(timeout=5), server._ok(rid, {"done": True}))[1] + server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True}) + + t0 = time.monotonic() + assert server.dispatch({"id": "slow", "method": "slash.exec", "params": {}}) is None + + fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}}) + fast_elapsed = time.monotonic() - t0 + + assert fast_resp["result"] == {"pong": True} + assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind slow handler" + + released.set() + + +def test_dispatch_long_handler_exception_produces_error_response(capture): + """An exception inside a pool-dispatched handler still yields a JSON-RPC error.""" + server, buf = capture + + def boom(rid, params): + raise RuntimeError("kaboom") + + server._methods["slash.exec"] = boom + + server.dispatch({"id": "r3", "method": "slash.exec", "params": {}}) + + for _ in range(50): + if buf.getvalue(): + break + time.sleep(0.01) + + written = json.loads(buf.getvalue()) + assert written["id"] == "r3" + assert written["error"]["code"] == -32000 + assert "kaboom" in written["error"]["message"] + + +def test_dispatch_unknown_long_method_still_goes_inline(server): + """Method name not in _LONG_HANDLERS takes the sync path even if handler is slow.""" + server._methods["some.method"] = lambda rid, params: server._ok(rid, {"ok": True}) + + resp = server.dispatch({"id": "r4", "method": "some.method", "params": {}}) + + assert resp["result"] == {"ok": True} diff --git a/tui_gateway/entry.py b/tui_gateway/entry.py index a9667528d..d2b82b9da 100644 --- a/tui_gateway/entry.py +++ b/tui_gateway/entry.py @@ -2,7 +2,7 @@ import json import signal import sys -from tui_gateway.server import handle_request, resolve_skin, write_json +from tui_gateway.server import dispatch, resolve_skin, write_json signal.signal(signal.SIGPIPE, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -28,7 +28,7 @@ def main(): sys.exit(0) continue - resp = handle_request(req) + resp = dispatch(req) if resp is not None: if not write_json(resp): sys.exit(0) diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 70dff3b17..6d0dbea65 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -1,4 +1,5 @@ import atexit +import concurrent.futures import copy import json import os @@ -36,6 +37,29 @@ _cfg_cache: dict | None = None _cfg_mtime: float | None = None _SLASH_WORKER_TIMEOUT_S = max(5.0, float(os.environ.get("HERMES_TUI_SLASH_TIMEOUT_S", "45") or 45)) +# ── Async RPC dispatch (#12546) ────────────────────────────────────── +# A handful of handlers block the dispatcher loop in entry.py for seconds +# to minutes (slash.exec, cli.exec, shell.exec, session.resume, +# session.branch). While they're running, inbound RPCs — notably +# approval.respond and session.interrupt — sit unread in the stdin pipe. +# We route only those slow handlers onto a small thread pool; everything +# else stays on the main thread so ordering stays sane for the fast path. +# write_json is already _stdout_lock-guarded, so concurrent response +# writes are safe. +_LONG_HANDLERS = frozenset({ + "cli.exec", + "session.branch", + "session.resume", + "shell.exec", + "slash.exec", +}) +_RPC_POOL_WORKERS = max(2, int(os.environ.get("HERMES_TUI_RPC_POOL_WORKERS", "4") or 4)) +_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=_RPC_POOL_WORKERS, + thread_name_prefix="tui-rpc", +) +atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True)) + # Reserve real stdout for JSON-RPC only; redirect Python's stdout to stderr # so stray print() from libraries/tools becomes harmless gateway.stderr instead # of corrupting the JSON protocol. @@ -200,6 +224,33 @@ def handle_request(req: dict) -> dict | None: return fn(req.get("id"), req.get("params", {})) +def _run_and_emit(req: dict) -> None: + """Run a handler on the RPC pool and write its response directly. + + Catches any unexpected exception so a misbehaving handler can't kill + the worker thread silently — the caller still sees a JSON-RPC error. + """ + try: + resp = handle_request(req) + except Exception as exc: + resp = _err(req.get("id"), -32000, f"handler error: {exc}") + if resp is not None: + write_json(resp) + + +def dispatch(req: dict) -> dict | None: + """Route an inbound RPC — long handlers to the pool, everything else inline. + + Returns the response for sync-dispatched requests so the caller + (entry.py) can write it. Returns None when the request has been + scheduled on the pool; the worker writes the response itself. + """ + if req.get("method", "") in _LONG_HANDLERS: + _pool.submit(_run_and_emit, req) + return None + return handle_request(req) + + def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None: ready = session.get("agent_ready") if ready is not None and not ready.wait(timeout=timeout):