hermes-agent/ui-tui/src/app/createGatewayEventHandler.ts
2026-04-16 19:14:05 -05:00

435 lines
12 KiB
TypeScript

import { STREAM_BATCH_MS } from '../config/timing.js'
import { introMsg, toTranscriptMessages } from '../domain/messages.js'
import type { CommandsCatalogResponse, GatewayEvent, GatewaySkin, SessionResumeResponse } from '../gatewayTypes.js'
import { asRpcResult, rpcErrorMessage } from '../lib/rpc.js'
import { formatToolCall } from '../lib/text.js'
import { fromSkin } from '../theme.js'
import type { SubagentProgress } from '../types.js'
import type { GatewayEventHandlerContext } from './interfaces.js'
import { patchOverlayState } from './overlayStore.js'
import { turnController } from './turnController.js'
import { getUiState, patchUiState } from './uiStore.js'
const ERRLIKE_RE = /\b(error|traceback|exception|failed|spawn)\b/i
const statusFromBusy = () => (getUiState().busy ? 'running…' : 'ready')
const applySkin = (s: GatewaySkin) =>
patchUiState({ theme: fromSkin(s.colors ?? {}, s.branding ?? {}, s.banner_logo ?? '', s.banner_hero ?? '') })
const dropBgTask = (taskId: string) =>
patchUiState(state => {
const next = new Set(state.bgTasks)
next.delete(taskId)
return { ...state, bgTasks: next }
})
const statusToneFrom = (kind: string): 'error' | 'info' | 'warn' =>
kind === 'error' ? 'error' : kind === 'warn' || kind === 'approval' ? 'warn' : 'info'
const pushUnique =
(max: number) =>
<T>(xs: T[], x: T): T[] =>
xs.at(-1) === x ? xs : [...xs, x].slice(-max)
const pushThinking = pushUnique(6)
const pushNote = pushUnique(6)
const pushTool = pushUnique(8)
export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: GatewayEvent) => void {
const { dequeue, queueEditRef, sendQueued } = ctx.composer
const { gw, rpc } = ctx.gateway
const { STARTUP_RESUME_ID, colsRef, newSession, resetSession, setCatalog } = ctx.session
const { bellOnComplete, stdout, sys } = ctx.system
const { appendMessage, setHistoryItems } = ctx.transcript
let pendingThinkingStatus = ''
let thinkingStatusTimer: null | ReturnType<typeof setTimeout> = null
const setStatus = (status: string) => {
pendingThinkingStatus = ''
if (thinkingStatusTimer) {
clearTimeout(thinkingStatusTimer)
thinkingStatusTimer = null
}
patchUiState({ status })
}
const scheduleThinkingStatus = (status: string) => {
pendingThinkingStatus = status
if (thinkingStatusTimer) {
return
}
thinkingStatusTimer = setTimeout(() => {
thinkingStatusTimer = null
patchUiState({ status: pendingThinkingStatus || statusFromBusy() })
}, STREAM_BATCH_MS)
}
const restoreStatusAfter = (ms: number) => {
turnController.clearStatusTimer()
turnController.statusTimer = setTimeout(() => {
turnController.statusTimer = null
patchUiState({ status: statusFromBusy() })
}, ms)
}
const keepCompletedElseRunning = (s: SubagentProgress['status']) => (s === 'completed' ? s : 'running')
const handleReady = (skin?: GatewaySkin) => {
if (skin) {
applySkin(skin)
}
rpc<CommandsCatalogResponse>('commands.catalog', {})
.then(r => {
if (!r?.pairs) {
return
}
setCatalog({
canon: (r.canon ?? {}) as Record<string, string>,
categories: r.categories ?? [],
pairs: r.pairs as [string, string][],
skillCount: (r.skill_count ?? 0) as number,
sub: (r.sub ?? {}) as Record<string, string[]>
})
if (r.warning) {
turnController.pushActivity(String(r.warning), 'warn')
}
})
.catch((e: unknown) => turnController.pushActivity(`command catalog unavailable: ${rpcErrorMessage(e)}`, 'warn'))
if (!STARTUP_RESUME_ID) {
patchUiState({ status: 'forging session…' })
newSession()
return
}
patchUiState({ status: 'resuming…' })
gw.request<SessionResumeResponse>('session.resume', { cols: colsRef.current, session_id: STARTUP_RESUME_ID })
.then(raw => {
const r = asRpcResult<SessionResumeResponse>(raw)
if (!r) {
throw new Error('invalid response: session.resume')
}
resetSession()
patchUiState({
info: r.info ?? null,
sid: r.session_id,
status: 'ready',
usage: r.info?.usage ?? getUiState().usage
})
setHistoryItems(
r.info ? [introMsg(r.info), ...toTranscriptMessages(r.messages)] : toTranscriptMessages(r.messages)
)
})
.catch((e: unknown) => {
sys(`resume failed: ${rpcErrorMessage(e)}`)
patchUiState({ status: 'forging session…' })
newSession('started a new session')
})
}
return (ev: GatewayEvent) => {
const sid = getUiState().sid
if (ev.session_id && sid && ev.session_id !== sid && !ev.type.startsWith('gateway.')) {
return
}
switch (ev.type) {
case 'gateway.ready':
handleReady(ev.payload?.skin)
return
case 'skin.changed':
if (ev.payload) {
applySkin(ev.payload)
}
return
case 'session.info': {
const info = ev.payload
patchUiState(state => ({
...state,
info,
status: state.status === 'starting agent…' ? 'ready' : state.status,
usage: info.usage ? { ...state.usage, ...info.usage } : state.usage
}))
setHistoryItems(prev => prev.map(m => (m.kind === 'intro' ? { ...m, info } : m)))
return
}
case 'thinking.delta': {
const text = ev.payload?.text
if (text !== undefined) {
scheduleThinkingStatus(text ? String(text) : statusFromBusy())
}
return
}
case 'message.start':
turnController.startMessage()
return
case 'status.update': {
const p = ev.payload
if (!p?.text) {
return
}
setStatus(p.text)
if (!p.kind || p.kind === 'status') {
return
}
if (turnController.lastStatusNote !== p.text) {
turnController.lastStatusNote = p.text
turnController.pushActivity(p.text, statusToneFrom(p.kind))
}
restoreStatusAfter(4000)
return
}
case 'gateway.stderr': {
const line = String(ev.payload.line).slice(0, 120)
turnController.pushActivity(line, ERRLIKE_RE.test(line) ? 'error' : 'warn')
return
}
case 'gateway.start_timeout': {
const { cwd, python } = ev.payload ?? {}
const trace = python || cwd ? ` · ${String(python || '')} ${String(cwd || '')}`.trim() : ''
setStatus('gateway startup timeout')
turnController.pushActivity(`gateway startup timed out${trace} · /logs to inspect`, 'error')
return
}
case 'gateway.protocol_error':
setStatus('protocol warning')
restoreStatusAfter(4000)
if (!turnController.protocolWarned) {
turnController.protocolWarned = true
turnController.pushActivity('protocol noise detected · /logs to inspect', 'warn')
}
if (ev.payload?.preview) {
turnController.pushActivity(`protocol noise: ${String(ev.payload.preview).slice(0, 120)}`, 'warn')
}
return
case 'reasoning.delta':
if (ev.payload?.text) {
turnController.recordReasoningDelta(ev.payload.text)
}
return
case 'reasoning.available':
turnController.recordReasoningAvailable(String(ev.payload?.text ?? ''))
return
case 'tool.progress':
if (ev.payload?.preview && ev.payload.name) {
turnController.recordToolProgress(ev.payload.name, ev.payload.preview)
}
return
case 'tool.generating':
if (ev.payload?.name) {
turnController.pushTrail(`drafting ${ev.payload.name}`)
}
return
case 'tool.start':
turnController.recordToolStart(ev.payload.tool_id, ev.payload.name ?? 'tool', ev.payload.context ?? '')
return
case 'tool.complete':
turnController.recordToolComplete(ev.payload.tool_id, ev.payload.name, ev.payload.error, ev.payload.summary)
if (ev.payload.inline_diff) {
sys(ev.payload.inline_diff)
}
return
case 'clarify.request':
patchOverlayState({
clarify: { choices: ev.payload.choices, question: ev.payload.question, requestId: ev.payload.request_id }
})
setStatus('waiting for input…')
return
case 'approval.request':
patchOverlayState({ approval: { command: ev.payload.command, description: ev.payload.description } })
setStatus('approval needed')
return
case 'sudo.request':
patchOverlayState({ sudo: { requestId: ev.payload.request_id } })
setStatus('sudo password needed')
return
case 'secret.request':
patchOverlayState({
secret: { envVar: ev.payload.env_var, prompt: ev.payload.prompt, requestId: ev.payload.request_id }
})
setStatus('secret input needed')
return
case 'background.complete':
dropBgTask(ev.payload.task_id)
sys(`[bg ${ev.payload.task_id}] ${ev.payload.text}`)
return
case 'btw.complete':
dropBgTask('btw:x')
sys(`[btw] ${ev.payload.text}`)
return
case 'subagent.start':
turnController.upsertSubagent(ev.payload, () => ({ status: 'running' }))
return
case 'subagent.thinking': {
const text = String(ev.payload.text ?? '').trim()
if (!text) {
return
}
turnController.upsertSubagent(ev.payload, c => ({
status: keepCompletedElseRunning(c.status),
thinking: pushThinking(c.thinking, text)
}))
return
}
case 'subagent.tool': {
const line = formatToolCall(
ev.payload.tool_name ?? 'delegate_task',
ev.payload.tool_preview ?? ev.payload.text ?? ''
)
turnController.upsertSubagent(ev.payload, c => ({
status: keepCompletedElseRunning(c.status),
tools: pushTool(c.tools, line)
}))
return
}
case 'subagent.progress': {
const text = String(ev.payload.text ?? '').trim()
if (!text) {
return
}
turnController.upsertSubagent(ev.payload, c => ({
notes: pushNote(c.notes, text),
status: keepCompletedElseRunning(c.status)
}))
return
}
case 'subagent.complete':
turnController.upsertSubagent(ev.payload, c => ({
durationSeconds: ev.payload.duration_seconds ?? c.durationSeconds,
status: ev.payload.status ?? 'completed',
summary: ev.payload.summary || ev.payload.text || c.summary
}))
return
case 'message.delta':
turnController.recordMessageDelta(ev.payload ?? {})
return
case 'message.complete': {
const { finalText, savedReasoning, savedReasoningTokens, savedTools, savedToolTokens, wasInterrupted } =
turnController.recordMessageComplete(ev.payload ?? {})
if (!wasInterrupted) {
appendMessage({
role: 'assistant',
text: finalText,
thinking: savedReasoning || undefined,
thinkingTokens: savedReasoning ? savedReasoningTokens : undefined,
toolTokens: savedTools.length ? savedToolTokens : undefined,
tools: savedTools.length ? savedTools : undefined
})
if (bellOnComplete && stdout?.isTTY) {
stdout.write('\x07')
}
}
setStatus('ready')
if (ev.payload?.usage) {
patchUiState(state => ({ ...state, usage: { ...state.usage, ...ev.payload!.usage } }))
}
if (queueEditRef.current !== null) {
return
}
const next = dequeue()
if (next) {
sendQueued(next)
}
return
}
case 'error':
turnController.recordError()
turnController.pushActivity(String(ev.payload?.message || 'unknown error'), 'error')
sys(`error: ${ev.payload?.message}`)
setStatus('ready')
}
}
}