Merge pull request #12560 from NousResearch/bb/tui-gateway-rpc-pool

fix(tui-gateway): dispatch slow RPC handlers on a thread pool (#12546)
This commit is contained in:
brooklyn! 2026-04-19 09:49:39 -05:00 committed by GitHub
commit 6af04474a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 156 additions and 68 deletions

View file

@ -4,6 +4,7 @@ import io
import json
import sys
import threading
import time
from unittest.mock import MagicMock, patch
import pytest
@ -432,3 +433,81 @@ def test_command_dispatch_returns_skill_payload(server):
assert result["type"] == "skill"
assert result["message"] == fake_msg
assert result["name"] == "hermes-agent-dev"
# ── dispatch(): pool routing for long handlers (#12546) ──────────────
def test_dispatch_runs_short_handlers_inline(server):
"""Non-long handlers return their response synchronously from dispatch()."""
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
resp = server.dispatch({"id": "r1", "method": "fast.ping", "params": {}})
assert resp == {"jsonrpc": "2.0", "id": "r1", "result": {"pong": True}}
def test_dispatch_offloads_long_handlers_and_emits_via_stdout(capture):
"""Long handlers run on the pool and write their response via write_json."""
server, buf = capture
server._methods["slash.exec"] = lambda rid, params: server._ok(rid, {"output": "hi"})
resp = server.dispatch({"id": "r2", "method": "slash.exec", "params": {}})
assert resp is None
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written == {"jsonrpc": "2.0", "id": "r2", "result": {"output": "hi"}}
def test_dispatch_long_handler_does_not_block_fast_handler(server):
"""A slow long handler must not prevent a concurrent fast handler from completing."""
released = threading.Event()
server._methods["slash.exec"] = lambda rid, params: (released.wait(timeout=5), server._ok(rid, {"done": True}))[1]
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
t0 = time.monotonic()
assert server.dispatch({"id": "slow", "method": "slash.exec", "params": {}}) is None
fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}})
fast_elapsed = time.monotonic() - t0
assert fast_resp["result"] == {"pong": True}
assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind slow handler"
released.set()
def test_dispatch_long_handler_exception_produces_error_response(capture):
"""An exception inside a pool-dispatched handler still yields a JSON-RPC error."""
server, buf = capture
def boom(rid, params):
raise RuntimeError("kaboom")
server._methods["slash.exec"] = boom
server.dispatch({"id": "r3", "method": "slash.exec", "params": {}})
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written["id"] == "r3"
assert written["error"]["code"] == -32000
assert "kaboom" in written["error"]["message"]
def test_dispatch_unknown_long_method_still_goes_inline(server):
"""Method name not in _LONG_HANDLERS takes the sync path even if handler is slow."""
server._methods["some.method"] = lambda rid, params: server._ok(rid, {"ok": True})
resp = server.dispatch({"id": "r4", "method": "some.method", "params": {}})
assert resp["result"] == {"ok": True}

View file

@ -2,7 +2,7 @@ import json
import signal
import sys
from tui_gateway.server import handle_request, resolve_skin, write_json
from tui_gateway.server import dispatch, resolve_skin, write_json
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@ -28,7 +28,7 @@ def main():
sys.exit(0)
continue
resp = handle_request(req)
resp = dispatch(req)
if resp is not None:
if not write_json(resp):
sys.exit(0)

View file

@ -1,4 +1,5 @@
import atexit
import concurrent.futures
import copy
import json
import os
@ -36,6 +37,23 @@ _cfg_cache: dict | None = None
_cfg_mtime: float | None = None
_SLASH_WORKER_TIMEOUT_S = max(5.0, float(os.environ.get("HERMES_TUI_SLASH_TIMEOUT_S", "45") or 45))
# ── Async RPC dispatch (#12546) ──────────────────────────────────────
# A handful of handlers block the dispatcher loop in entry.py for seconds
# to minutes (slash.exec, cli.exec, shell.exec, session.resume,
# session.branch). While they're running, inbound RPCs — notably
# approval.respond and session.interrupt — sit unread in the stdin pipe.
# We route only those slow handlers onto a small thread pool; everything
# else stays on the main thread so ordering stays sane for the fast path.
# write_json is already _stdout_lock-guarded, so concurrent response
# writes are safe.
_LONG_HANDLERS = frozenset({"cli.exec", "session.branch", "session.resume", "shell.exec", "slash.exec"})
_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=max(2, int(os.environ.get("HERMES_TUI_RPC_POOL_WORKERS", "4") or 4)),
thread_name_prefix="tui-rpc",
)
atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True))
# Reserve real stdout for JSON-RPC only; redirect Python's stdout to stderr
# so stray print() from libraries/tools becomes harmless gateway.stderr instead
# of corrupting the JSON protocol.
@ -200,6 +218,29 @@ def handle_request(req: dict) -> dict | None:
return fn(req.get("id"), req.get("params", {}))
def dispatch(req: dict) -> dict | None:
"""Route inbound RPCs — long handlers to the pool, everything else inline.
Returns a response dict when handled inline. Returns None when the
handler was scheduled on the pool; the worker writes its own
response via write_json when done.
"""
if req.get("method") not in _LONG_HANDLERS:
return handle_request(req)
def run():
try:
resp = handle_request(req)
except Exception as exc:
resp = _err(req.get("id"), -32000, f"handler error: {exc}")
if resp is not None:
write_json(resp)
_pool.submit(run)
return None
def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None:
ready = session.get("agent_ready")
if ready is not None and not ready.wait(timeout=timeout):

View file

@ -4,9 +4,12 @@ import { providerDisplayNames } from '../domain/providers.js'
describe('providerDisplayNames', () => {
it('returns bare names when all are unique', () => {
expect(providerDisplayNames([{ name: 'Anthropic', slug: 'anthropic' }, { name: 'OpenAI', slug: 'openai' }])).toEqual(
['Anthropic', 'OpenAI']
)
expect(
providerDisplayNames([
{ name: 'Anthropic', slug: 'anthropic' },
{ name: 'OpenAI', slug: 'openai' }
])
).toEqual(['Anthropic', 'OpenAI'])
})
it('appends slug to every collision so the disambiguation is symmetric', () => {

View file

@ -46,7 +46,6 @@ const pushNote = pushUnique(6)
const pushTool = pushUnique(8)
export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: GatewayEvent) => void {
const { dequeue, queueEditRef, sendQueued } = ctx.composer
const { rpc } = ctx.gateway
const { STARTUP_RESUME_ID, newSession, resumeById, setCatalog } = ctx.session
const { bellOnComplete, stdout, sys } = ctx.system
@ -394,16 +393,6 @@ export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev:
patchUiState(state => ({ ...state, usage: { ...state.usage, ...ev.payload!.usage } }))
}
if (queueEditRef.current !== null) {
return
}
const next = dequeue()
if (next) {
sendQueued(next)
}
return
}

View file

@ -193,11 +193,6 @@ export interface InputHandlerResult {
}
export interface GatewayEventHandlerContext {
composer: {
dequeue: () => string | undefined
queueEditRef: MutableRefObject<null | number>
sendQueued: (text: string) => void
}
gateway: GatewayServices
session: {
STARTUP_RESUME_ID: string

View file

@ -7,7 +7,6 @@ import type {
SudoRespondResponse,
VoiceRecordResponse
} from '../gatewayTypes.js'
import { writeOsc52Clipboard } from '../lib/osc52.js'
import { getInputSelection } from './inputSelectionStore.js'

View file

@ -380,12 +380,13 @@ export function useMainApp(gw: GatewayClient) {
sys
})
const prevSidRef = useRef<null | string>(null)
// Drain one queued message whenever the session settles (busy → false):
// agent turn ends, interrupt, shell.exec finishes, error recovered, or the
// session first comes up with pre-queued messages. Without this, shell.exec
// and error paths never emit message.complete, so anything enqueued while
// `!sleep` / a failed turn was running would stay stuck forever.
useEffect(() => {
const prev = prevSidRef.current
prevSidRef.current = ui.sid
if (prev !== null || !ui.sid || ui.busy || composerRefs.queueEditRef.current !== null) {
if (!ui.sid || ui.busy || composerRefs.queueEditRef.current !== null) {
return
}
@ -416,7 +417,6 @@ export function useMainApp(gw: GatewayClient) {
const onEvent = useMemo(
() =>
createGatewayEventHandler({
composer: { dequeue: composerActions.dequeue, queueEditRef: composerRefs.queueEditRef, sendQueued },
gateway,
session: {
STARTUP_RESUME_ID,
@ -432,11 +432,8 @@ export function useMainApp(gw: GatewayClient) {
[
appendMessage,
bellOnComplete,
composerActions,
composerRefs,
gateway,
panel,
sendQueued,
session.newSession,
session.resetSession,
session.resumeById,

View file

@ -181,7 +181,10 @@ export function ModelPicker({ gw, onCancel, onSelect, sessionId, t }: ModelPicke
const idx = off + i
return (
<Text color={providerIdx === idx ? t.color.cornsilk : t.color.dim} key={providers[idx]?.slug ?? `row-${idx}`}>
<Text
color={providerIdx === idx ? t.color.cornsilk : t.color.dim}
key={providers[idx]?.slug ?? `row-${idx}`}
>
{providerIdx === idx ? '▸ ' : ' '}
{i + 1}. {row}
</Text>
@ -212,7 +215,10 @@ export function ModelPicker({ gw, onCancel, onSelect, sessionId, t }: ModelPicke
const idx = off + i
return (
<Text color={modelIdx === idx ? t.color.cornsilk : t.color.dim} key={`${provider?.slug ?? 'prov'}:${idx}:${row}`}>
<Text
color={modelIdx === idx ? t.color.cornsilk : t.color.dim}
key={`${provider?.slug ?? 'prov'}:${idx}:${row}`}
>
{modelIdx === idx ? '▸ ' : ' '}
{i + 1}. {row}
</Text>

View file

@ -155,31 +155,21 @@ export function ConfirmPrompt({ onCancel, onConfirm, req, t }: ConfirmPromptProp
const [sel, setSel] = useState(0)
useInput((ch, key) => {
if (key.escape || (key.ctrl && ch.toLowerCase() === 'c')) {
onCancel()
return
}
const lower = ch.toLowerCase()
if (key.escape || (key.ctrl && lower === 'c') || lower === 'n') {
return onCancel()
}
if (lower === 'y') {
onConfirm()
return
return onConfirm()
}
if (lower === 'n') {
onCancel()
return
}
if (key.upArrow && sel > 0) {
if (key.upArrow) {
setSel(0)
}
if (key.downArrow && sel < 1) {
if (key.downArrow) {
setSel(1)
}
@ -189,12 +179,10 @@ export function ConfirmPrompt({ onCancel, onConfirm, req, t }: ConfirmPromptProp
})
const accent = req.danger ? t.color.error : t.color.warn
const confirmLabel = req.confirmLabel ?? 'Yes'
const cancelLabel = req.cancelLabel ?? 'No'
const rows = [
{ color: t.color.cornsilk, label: cancelLabel },
{ color: req.danger ? t.color.error : t.color.cornsilk, label: confirmLabel }
{ color: t.color.cornsilk, label: req.cancelLabel ?? 'No' },
{ color: req.danger ? t.color.error : t.color.cornsilk, label: req.confirmLabel ?? 'Yes' }
]
return (

View file

@ -1,5 +1,3 @@
export const STARTUP_RESUME_ID = (process.env.HERMES_TUI_RESUME ?? '').trim()
export const MOUSE_TRACKING = !/^(?:1|true|yes|on)$/i.test((process.env.HERMES_TUI_DISABLE_MOUSE ?? '').trim())
export const NO_CONFIRM_DESTRUCTIVE = /^(?:1|true|yes|on)$/i.test(
(process.env.HERMES_TUI_NO_CONFIRM ?? '').trim()
)
export const NO_CONFIRM_DESTRUCTIVE = /^(?:1|true|yes|on)$/i.test((process.env.HERMES_TUI_NO_CONFIRM ?? '').trim())

View file

@ -10,8 +10,7 @@ export const fmtCwdBranch = (cwd: string, branch: null | string, max = 40) => {
return shortCwd(cwd, max)
}
const b = branch.length > 16 ? `${branch.slice(-15)}` : branch
const tag = ` (${b})`
const tag = ` (${branch.length > 16 ? `${branch.slice(-15)}` : branch})`
return `${shortCwd(cwd, Math.max(8, max - tag.length))}${tag}`
}

View file

@ -5,13 +5,7 @@ export const providerDisplayNames = (providers: readonly { name: string; slug: s
counts.set(p.name, (counts.get(p.name) ?? 0) + 1)
}
return providers.map(p => {
const dup = (counts.get(p.name) ?? 0) > 1
if (!dup || !p.slug || p.slug === p.name) {
return p.name
}
return `${p.name} (${p.slug})`
})
return providers.map(p =>
(counts.get(p.name) ?? 0) > 1 && p.slug && p.slug !== p.name ? `${p.name} (${p.slug})` : p.name
)
}