diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 926dfadf17..da154cc168 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -4,6 +4,7 @@ import io import json import sys import threading +import time from unittest.mock import MagicMock, patch import pytest @@ -432,3 +433,81 @@ def test_command_dispatch_returns_skill_payload(server): assert result["type"] == "skill" assert result["message"] == fake_msg assert result["name"] == "hermes-agent-dev" + + +# ── dispatch(): pool routing for long handlers (#12546) ────────────── + + +def test_dispatch_runs_short_handlers_inline(server): + """Non-long handlers return their response synchronously from dispatch().""" + server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True}) + + resp = server.dispatch({"id": "r1", "method": "fast.ping", "params": {}}) + + assert resp == {"jsonrpc": "2.0", "id": "r1", "result": {"pong": True}} + + +def test_dispatch_offloads_long_handlers_and_emits_via_stdout(capture): + """Long handlers run on the pool and write their response via write_json.""" + server, buf = capture + server._methods["slash.exec"] = lambda rid, params: server._ok(rid, {"output": "hi"}) + + resp = server.dispatch({"id": "r2", "method": "slash.exec", "params": {}}) + assert resp is None + + for _ in range(50): + if buf.getvalue(): + break + time.sleep(0.01) + + written = json.loads(buf.getvalue()) + assert written == {"jsonrpc": "2.0", "id": "r2", "result": {"output": "hi"}} + + +def test_dispatch_long_handler_does_not_block_fast_handler(server): + """A slow long handler must not prevent a concurrent fast handler from completing.""" + released = threading.Event() + server._methods["slash.exec"] = lambda rid, params: (released.wait(timeout=5), server._ok(rid, {"done": True}))[1] + server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True}) + + t0 = time.monotonic() + assert server.dispatch({"id": "slow", "method": "slash.exec", "params": {}}) is None + + fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}}) + fast_elapsed = time.monotonic() - t0 + + assert fast_resp["result"] == {"pong": True} + assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind slow handler" + + released.set() + + +def test_dispatch_long_handler_exception_produces_error_response(capture): + """An exception inside a pool-dispatched handler still yields a JSON-RPC error.""" + server, buf = capture + + def boom(rid, params): + raise RuntimeError("kaboom") + + server._methods["slash.exec"] = boom + + server.dispatch({"id": "r3", "method": "slash.exec", "params": {}}) + + for _ in range(50): + if buf.getvalue(): + break + time.sleep(0.01) + + written = json.loads(buf.getvalue()) + assert written["id"] == "r3" + assert written["error"]["code"] == -32000 + assert "kaboom" in written["error"]["message"] + + +def test_dispatch_unknown_long_method_still_goes_inline(server): + """Method name not in _LONG_HANDLERS takes the sync path even if handler is slow.""" + server._methods["some.method"] = lambda rid, params: server._ok(rid, {"ok": True}) + + resp = server.dispatch({"id": "r4", "method": "some.method", "params": {}}) + + assert resp["result"] == {"ok": True} diff --git a/tui_gateway/entry.py b/tui_gateway/entry.py index a9667528de..d2b82b9dab 100644 --- a/tui_gateway/entry.py +++ b/tui_gateway/entry.py @@ -2,7 +2,7 @@ import json import signal import sys -from tui_gateway.server import handle_request, resolve_skin, write_json +from tui_gateway.server import dispatch, resolve_skin, write_json signal.signal(signal.SIGPIPE, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -28,7 +28,7 @@ def main(): sys.exit(0) continue - resp = handle_request(req) + resp = dispatch(req) if resp is not None: if not write_json(resp): sys.exit(0) diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 70dff3b17b..3a48e381e8 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -1,4 +1,5 @@ import atexit +import concurrent.futures import copy import json import os @@ -36,6 +37,23 @@ _cfg_cache: dict | None = None _cfg_mtime: float | None = None _SLASH_WORKER_TIMEOUT_S = max(5.0, float(os.environ.get("HERMES_TUI_SLASH_TIMEOUT_S", "45") or 45)) +# ── Async RPC dispatch (#12546) ────────────────────────────────────── +# A handful of handlers block the dispatcher loop in entry.py for seconds +# to minutes (slash.exec, cli.exec, shell.exec, session.resume, +# session.branch). While they're running, inbound RPCs — notably +# approval.respond and session.interrupt — sit unread in the stdin pipe. +# We route only those slow handlers onto a small thread pool; everything +# else stays on the main thread so ordering stays sane for the fast path. +# write_json is already _stdout_lock-guarded, so concurrent response +# writes are safe. +_LONG_HANDLERS = frozenset({"cli.exec", "session.branch", "session.resume", "shell.exec", "slash.exec"}) + +_pool = concurrent.futures.ThreadPoolExecutor( + max_workers=max(2, int(os.environ.get("HERMES_TUI_RPC_POOL_WORKERS", "4") or 4)), + thread_name_prefix="tui-rpc", +) +atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True)) + # Reserve real stdout for JSON-RPC only; redirect Python's stdout to stderr # so stray print() from libraries/tools becomes harmless gateway.stderr instead # of corrupting the JSON protocol. @@ -200,6 +218,29 @@ def handle_request(req: dict) -> dict | None: return fn(req.get("id"), req.get("params", {})) +def dispatch(req: dict) -> dict | None: + """Route inbound RPCs — long handlers to the pool, everything else inline. + + Returns a response dict when handled inline. Returns None when the + handler was scheduled on the pool; the worker writes its own + response via write_json when done. + """ + if req.get("method") not in _LONG_HANDLERS: + return handle_request(req) + + def run(): + try: + resp = handle_request(req) + except Exception as exc: + resp = _err(req.get("id"), -32000, f"handler error: {exc}") + if resp is not None: + write_json(resp) + + _pool.submit(run) + + return None + + def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None: ready = session.get("agent_ready") if ready is not None and not ready.wait(timeout=timeout): diff --git a/ui-tui/src/__tests__/providers.test.ts b/ui-tui/src/__tests__/providers.test.ts index a46102e893..2dfd76d022 100644 --- a/ui-tui/src/__tests__/providers.test.ts +++ b/ui-tui/src/__tests__/providers.test.ts @@ -4,9 +4,12 @@ import { providerDisplayNames } from '../domain/providers.js' describe('providerDisplayNames', () => { it('returns bare names when all are unique', () => { - expect(providerDisplayNames([{ name: 'Anthropic', slug: 'anthropic' }, { name: 'OpenAI', slug: 'openai' }])).toEqual( - ['Anthropic', 'OpenAI'] - ) + expect( + providerDisplayNames([ + { name: 'Anthropic', slug: 'anthropic' }, + { name: 'OpenAI', slug: 'openai' } + ]) + ).toEqual(['Anthropic', 'OpenAI']) }) it('appends slug to every collision so the disambiguation is symmetric', () => { diff --git a/ui-tui/src/app/createGatewayEventHandler.ts b/ui-tui/src/app/createGatewayEventHandler.ts index 699a3794de..8f45bb3d7e 100644 --- a/ui-tui/src/app/createGatewayEventHandler.ts +++ b/ui-tui/src/app/createGatewayEventHandler.ts @@ -46,7 +46,6 @@ const pushNote = pushUnique(6) const pushTool = pushUnique(8) export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: GatewayEvent) => void { - const { dequeue, queueEditRef, sendQueued } = ctx.composer const { rpc } = ctx.gateway const { STARTUP_RESUME_ID, newSession, resumeById, setCatalog } = ctx.session const { bellOnComplete, stdout, sys } = ctx.system @@ -394,16 +393,6 @@ export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: patchUiState(state => ({ ...state, usage: { ...state.usage, ...ev.payload!.usage } })) } - if (queueEditRef.current !== null) { - return - } - - const next = dequeue() - - if (next) { - sendQueued(next) - } - return } diff --git a/ui-tui/src/app/interfaces.ts b/ui-tui/src/app/interfaces.ts index 353c56535b..af13e047c7 100644 --- a/ui-tui/src/app/interfaces.ts +++ b/ui-tui/src/app/interfaces.ts @@ -193,11 +193,6 @@ export interface InputHandlerResult { } export interface GatewayEventHandlerContext { - composer: { - dequeue: () => string | undefined - queueEditRef: MutableRefObject - sendQueued: (text: string) => void - } gateway: GatewayServices session: { STARTUP_RESUME_ID: string diff --git a/ui-tui/src/app/useInputHandlers.ts b/ui-tui/src/app/useInputHandlers.ts index b71a1dc392..258cf7cee3 100644 --- a/ui-tui/src/app/useInputHandlers.ts +++ b/ui-tui/src/app/useInputHandlers.ts @@ -7,7 +7,6 @@ import type { SudoRespondResponse, VoiceRecordResponse } from '../gatewayTypes.js' - import { writeOsc52Clipboard } from '../lib/osc52.js' import { getInputSelection } from './inputSelectionStore.js' diff --git a/ui-tui/src/app/useMainApp.ts b/ui-tui/src/app/useMainApp.ts index fb48badea9..e0c18dec64 100644 --- a/ui-tui/src/app/useMainApp.ts +++ b/ui-tui/src/app/useMainApp.ts @@ -380,12 +380,13 @@ export function useMainApp(gw: GatewayClient) { sys }) - const prevSidRef = useRef(null) + // Drain one queued message whenever the session settles (busy → false): + // agent turn ends, interrupt, shell.exec finishes, error recovered, or the + // session first comes up with pre-queued messages. Without this, shell.exec + // and error paths never emit message.complete, so anything enqueued while + // `!sleep` / a failed turn was running would stay stuck forever. useEffect(() => { - const prev = prevSidRef.current - prevSidRef.current = ui.sid - - if (prev !== null || !ui.sid || ui.busy || composerRefs.queueEditRef.current !== null) { + if (!ui.sid || ui.busy || composerRefs.queueEditRef.current !== null) { return } @@ -416,7 +417,6 @@ export function useMainApp(gw: GatewayClient) { const onEvent = useMemo( () => createGatewayEventHandler({ - composer: { dequeue: composerActions.dequeue, queueEditRef: composerRefs.queueEditRef, sendQueued }, gateway, session: { STARTUP_RESUME_ID, @@ -432,11 +432,8 @@ export function useMainApp(gw: GatewayClient) { [ appendMessage, bellOnComplete, - composerActions, - composerRefs, gateway, panel, - sendQueued, session.newSession, session.resetSession, session.resumeById, diff --git a/ui-tui/src/components/modelPicker.tsx b/ui-tui/src/components/modelPicker.tsx index 406047bc11..5ee19e407c 100644 --- a/ui-tui/src/components/modelPicker.tsx +++ b/ui-tui/src/components/modelPicker.tsx @@ -181,7 +181,10 @@ export function ModelPicker({ gw, onCancel, onSelect, sessionId, t }: ModelPicke const idx = off + i return ( - + {providerIdx === idx ? '▸ ' : ' '} {i + 1}. {row} @@ -212,7 +215,10 @@ export function ModelPicker({ gw, onCancel, onSelect, sessionId, t }: ModelPicke const idx = off + i return ( - + {modelIdx === idx ? '▸ ' : ' '} {i + 1}. {row} diff --git a/ui-tui/src/components/prompts.tsx b/ui-tui/src/components/prompts.tsx index cd9c3a2d1d..f9d00dbfe3 100644 --- a/ui-tui/src/components/prompts.tsx +++ b/ui-tui/src/components/prompts.tsx @@ -155,31 +155,21 @@ export function ConfirmPrompt({ onCancel, onConfirm, req, t }: ConfirmPromptProp const [sel, setSel] = useState(0) useInput((ch, key) => { - if (key.escape || (key.ctrl && ch.toLowerCase() === 'c')) { - onCancel() - - return - } - const lower = ch.toLowerCase() + if (key.escape || (key.ctrl && lower === 'c') || lower === 'n') { + return onCancel() + } + if (lower === 'y') { - onConfirm() - - return + return onConfirm() } - if (lower === 'n') { - onCancel() - - return - } - - if (key.upArrow && sel > 0) { + if (key.upArrow) { setSel(0) } - if (key.downArrow && sel < 1) { + if (key.downArrow) { setSel(1) } @@ -189,12 +179,10 @@ export function ConfirmPrompt({ onCancel, onConfirm, req, t }: ConfirmPromptProp }) const accent = req.danger ? t.color.error : t.color.warn - const confirmLabel = req.confirmLabel ?? 'Yes' - const cancelLabel = req.cancelLabel ?? 'No' const rows = [ - { color: t.color.cornsilk, label: cancelLabel }, - { color: req.danger ? t.color.error : t.color.cornsilk, label: confirmLabel } + { color: t.color.cornsilk, label: req.cancelLabel ?? 'No' }, + { color: req.danger ? t.color.error : t.color.cornsilk, label: req.confirmLabel ?? 'Yes' } ] return ( diff --git a/ui-tui/src/config/env.ts b/ui-tui/src/config/env.ts index 999607dacf..60f1e80c53 100644 --- a/ui-tui/src/config/env.ts +++ b/ui-tui/src/config/env.ts @@ -1,5 +1,3 @@ export const STARTUP_RESUME_ID = (process.env.HERMES_TUI_RESUME ?? '').trim() export const MOUSE_TRACKING = !/^(?:1|true|yes|on)$/i.test((process.env.HERMES_TUI_DISABLE_MOUSE ?? '').trim()) -export const NO_CONFIRM_DESTRUCTIVE = /^(?:1|true|yes|on)$/i.test( - (process.env.HERMES_TUI_NO_CONFIRM ?? '').trim() -) +export const NO_CONFIRM_DESTRUCTIVE = /^(?:1|true|yes|on)$/i.test((process.env.HERMES_TUI_NO_CONFIRM ?? '').trim()) diff --git a/ui-tui/src/domain/paths.ts b/ui-tui/src/domain/paths.ts index 6b95dcbac1..43c023b6ba 100644 --- a/ui-tui/src/domain/paths.ts +++ b/ui-tui/src/domain/paths.ts @@ -10,8 +10,7 @@ export const fmtCwdBranch = (cwd: string, branch: null | string, max = 40) => { return shortCwd(cwd, max) } - const b = branch.length > 16 ? `…${branch.slice(-15)}` : branch - const tag = ` (${b})` + const tag = ` (${branch.length > 16 ? `…${branch.slice(-15)}` : branch})` return `${shortCwd(cwd, Math.max(8, max - tag.length))}${tag}` } diff --git a/ui-tui/src/domain/providers.ts b/ui-tui/src/domain/providers.ts index 02cc99b922..83ac016ff1 100644 --- a/ui-tui/src/domain/providers.ts +++ b/ui-tui/src/domain/providers.ts @@ -5,13 +5,7 @@ export const providerDisplayNames = (providers: readonly { name: string; slug: s counts.set(p.name, (counts.get(p.name) ?? 0) + 1) } - return providers.map(p => { - const dup = (counts.get(p.name) ?? 0) > 1 - - if (!dup || !p.slug || p.slug === p.name) { - return p.name - } - - return `${p.name} (${p.slug})` - }) + return providers.map(p => + (counts.get(p.name) ?? 0) > 1 && p.slug && p.slug !== p.name ? `${p.name} (${p.slug})` : p.name + ) }