From 086343854dbf723acfc529d2580c260cc3713d0b Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Tue, 30 Jun 2026 02:58:45 -0500 Subject: [PATCH 1/2] refactor(desktop): split use-message-stream into folder + utils Extract the standalone gateway-event helpers (session-info patch derivation, completion-error detection, todo-payload routing, delegate_task -> subagent spec mapping, + the stream-flush/subagent-event constants) out of the 1,285-line hook into a colocated, tested use-message-stream/utils.ts. index.ts keeps the stateful streaming hook and consumes the helpers. Pure restructuring, no behaviour change; folder index keeps the import path intact. index.ts: 1,285 -> ~1,120. Adds unit tests for the pure helpers. --- .../index.ts} | 189 ++---------------- .../hooks/use-message-stream/utils.test.ts | 66 ++++++ .../session/hooks/use-message-stream/utils.ts | 179 +++++++++++++++++ 3 files changed, 257 insertions(+), 177 deletions(-) rename apps/desktop/src/app/session/hooks/{use-message-stream.ts => use-message-stream/index.ts} (87%) create mode 100644 apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts create mode 100644 apps/desktop/src/app/session/hooks/use-message-stream/utils.ts diff --git a/apps/desktop/src/app/session/hooks/use-message-stream.ts b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts similarity index 87% rename from apps/desktop/src/app/session/hooks/use-message-stream.ts rename to apps/desktop/src/app/session/hooks/use-message-stream/index.ts index a455bae06cc..51136a7451f 100644 --- a/apps/desktop/src/app/session/hooks/use-message-stream.ts +++ b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts @@ -2,8 +2,8 @@ import type { QueryClient } from '@tanstack/react-query' import { type MutableRefObject, useCallback, useEffect, useRef } from 'react' import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream' -import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer' +import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { translateNow } from '@/i18n' import { appendAssistantTextPart, @@ -61,7 +61,17 @@ import { recordToolDiff } from '@/store/tool-diffs' import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events' import type { RpcEvent } from '@/types/hermes' -import type { ClientSessionState } from '../../types' +import type { ClientSessionState } from '../../../types' + +import { + completionErrorText, + delegateTaskPayloads, + hasSessionInfoStatePatch, + sessionInfoStatePatch, + STREAM_DELTA_FLUSH_MS, + SUBAGENT_EVENT_TYPES, + toTodoPayload +} from './utils' interface MessageStreamOptions { activeSessionIdRef: MutableRefObject @@ -86,181 +96,6 @@ interface QueuedStreamDeltas { reasoning: string } -type SessionRuntimeStatePatch = Partial< - Pick< - ClientSessionState, - 'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo' - > -> - -function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch { - const patch: SessionRuntimeStatePatch = {} - - if (typeof payload?.model === 'string') { - patch.model = payload.model || '' - } - - if (typeof payload?.provider === 'string') { - patch.provider = payload.provider || '' - } - - if (typeof payload?.cwd === 'string') { - patch.cwd = payload.cwd - } - - if (typeof payload?.branch === 'string') { - patch.branch = payload.branch - } - - if (typeof payload?.personality === 'string') { - patch.personality = normalizePersonalityValue(payload.personality) - } - - if (typeof payload?.reasoning_effort === 'string') { - patch.reasoningEffort = payload.reasoning_effort - } - - if (typeof payload?.service_tier === 'string') { - patch.serviceTier = payload.service_tier - } - - if (typeof payload?.fast === 'boolean') { - patch.fast = payload.fast - } - - if (typeof payload?.yolo === 'boolean') { - patch.yolo = payload.yolo - } - - return patch -} - -function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean { - return Object.keys(patch).length > 0 -} - -// Minimum gap between two assistant-text flushes during a stream. Was 16ms -// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every -// token got its own React commit + Streamdown markdown re-parse, scaling -// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens -// batch into one commit at 60 tok/sec without introducing visible lag on the -// streaming text (still 30 fps of visible text growth). Big perceived -// smoothness win on long messages with big trailing paragraphs; see -// `scripts/profile-typing-lag.md` for the measurement work behind this. -const STREAM_DELTA_FLUSH_MS = 33 - -// Gateway/provider failures sometimes arrive as message.complete text instead -// of an explicit error event. Treat matches as inline assistant errors so they -// persist like real error events and don't get erased by hydrate fallback. -const COMPLETION_ERROR_PATTERNS = [ - /^API call failed after \d+ retries:/i, - /^HTTP\s+\d{3}\b/i, - /^(Provider|Gateway)\s+error:/i -] - -function completionErrorText(finalText: string): string | null { - const text = finalText.trim() - - return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null -} - -const SUBAGENT_EVENT_TYPES = new Set([ - 'subagent.spawn_requested', - 'subagent.start', - 'subagent.thinking', - 'subagent.tool', - 'subagent.progress', - 'subagent.complete' -]) - -// Anonymous progress events that carry todos but no name still belong to the -// todo stream; named todo events are obviously routed there too. -function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined { - if (!payload) { - return undefined - } - - const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos')) - - return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined -} - -function asRecord(value: unknown): Record { - return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {} -} - -function parseMaybeRecord(value: unknown): Record { - if (typeof value === 'string') { - try { - return asRecord(JSON.parse(value)) - } catch { - return {} - } - } - - return asRecord(value) -} - -const firstString = (...candidates: unknown[]): string => { - for (const v of candidates) { - if (typeof v === 'string' && v) { - return v - } - } - - return '' -} - -function delegateTaskPayloads( - payload: GatewayEventPayload | undefined, - phase: 'running' | 'complete', - sourceEventType?: string -): Record[] { - if (payload?.name !== 'delegate_task') { - return [] - } - - const args = parseMaybeRecord(payload.args ?? payload.input) - const result = parseMaybeRecord(payload.result) - const rawTasks = Array.isArray(args.tasks) ? args.tasks : [] - const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args] - const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running' - const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task' - const progressText = firstString(payload.preview, payload.message, payload.context) - - const eventType = - phase === 'complete' - ? 'subagent.complete' - : sourceEventType === 'tool.start' - ? 'subagent.start' - : 'subagent.progress' - - return tasks.map((task, index) => { - const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task' - const summary = firstString(result.summary, payload.summary, payload.message) - - return { - depth: 0, - duration_seconds: payload.duration_s, - goal, - status, - subagent_id: `delegate-tool:${toolId}:${index}`, - summary: summary || undefined, - task_count: tasks.length, - task_index: index, - text: eventType === 'subagent.progress' ? progressText || goal : undefined, - tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined, - tool_preview: eventType === 'subagent.start' ? progressText : undefined, - toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [], - event_type: eventType, - output_tail: - phase === 'complete' && summary - ? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }] - : undefined - } - }) -} - export function useMessageStream({ activeSessionIdRef, hydrateFromStoredSession, diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts b/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts new file mode 100644 index 00000000000..47994355074 --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from 'vitest' + +import type { GatewayEventPayload } from '@/lib/chat-messages' + +import { + completionErrorText, + delegateTaskPayloads, + hasSessionInfoStatePatch, + sessionInfoStatePatch, + toTodoPayload +} from './utils' + +const payload = (over: Record): GatewayEventPayload => over as GatewayEventPayload + +describe('completionErrorText', () => { + it('flags provider/HTTP/retry failures, ignores normal text', () => { + expect(completionErrorText('API call failed after 3 retries: boom')).toMatch(/^API call failed/) + expect(completionErrorText('HTTP 500 upstream')).toMatch(/^HTTP 500/) + expect(completionErrorText('Gateway error: nope')).toMatch(/^Gateway error/) + expect(completionErrorText('here is your answer')).toBeNull() + expect(completionErrorText(' ')).toBeNull() + }) +}) + +describe('toTodoPayload', () => { + it('routes named todo and anonymous todos-bearing events to the todo stream', () => { + expect(toTodoPayload(payload({ name: 'todo' }))?.tool_id).toBe('todo-live') + expect(toTodoPayload(payload({ todos: [] }))?.name).toBe('todo') + expect(toTodoPayload(payload({ name: 'web_search' }))).toBeUndefined() + expect(toTodoPayload(undefined)).toBeUndefined() + }) +}) + +describe('sessionInfoStatePatch / hasSessionInfoStatePatch', () => { + it('extracts only present runtime fields', () => { + const patch = sessionInfoStatePatch(payload({ model: 'gpt', fast: true, branch: 'main' })) + expect(patch).toMatchObject({ model: 'gpt', fast: true, branch: 'main' }) + expect(hasSessionInfoStatePatch(patch)).toBe(true) + expect(hasSessionInfoStatePatch(sessionInfoStatePatch(payload({})))).toBe(false) + }) +}) + +describe('delegateTaskPayloads', () => { + it('returns [] for non-delegate events', () => { + expect(delegateTaskPayloads(payload({ name: 'web_search' }), 'running')).toEqual([]) + }) + + it('maps a running tool.start to a subagent.start spec', () => { + const [spec] = delegateTaskPayloads( + payload({ name: 'delegate_task', tool_id: 't1', args: { goal: 'do it' } }), + 'running', + 'tool.start' + ) + + expect(spec).toMatchObject({ event_type: 'subagent.start', goal: 'do it', status: 'running' }) + }) + + it('maps completion (with error) to a failed subagent.complete', () => { + const [spec] = delegateTaskPayloads( + payload({ name: 'delegate_task', error: 'boom', result: { summary: 'failed run' } }), + 'complete' + ) + + expect(spec).toMatchObject({ event_type: 'subagent.complete', status: 'failed' }) + }) +}) diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts b/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts new file mode 100644 index 00000000000..d9a90b36692 --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts @@ -0,0 +1,179 @@ +import type { GatewayEventPayload } from '@/lib/chat-messages' +import { normalizePersonalityValue } from '@/lib/chat-runtime' + +import type { ClientSessionState } from '../../../types' + +type SessionRuntimeStatePatch = Partial< + Pick< + ClientSessionState, + 'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo' + > +> + +export function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch { + const patch: SessionRuntimeStatePatch = {} + + if (typeof payload?.model === 'string') { + patch.model = payload.model || '' + } + + if (typeof payload?.provider === 'string') { + patch.provider = payload.provider || '' + } + + if (typeof payload?.cwd === 'string') { + patch.cwd = payload.cwd + } + + if (typeof payload?.branch === 'string') { + patch.branch = payload.branch + } + + if (typeof payload?.personality === 'string') { + patch.personality = normalizePersonalityValue(payload.personality) + } + + if (typeof payload?.reasoning_effort === 'string') { + patch.reasoningEffort = payload.reasoning_effort + } + + if (typeof payload?.service_tier === 'string') { + patch.serviceTier = payload.service_tier + } + + if (typeof payload?.fast === 'boolean') { + patch.fast = payload.fast + } + + if (typeof payload?.yolo === 'boolean') { + patch.yolo = payload.yolo + } + + return patch +} + +export function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean { + return Object.keys(patch).length > 0 +} + +// Minimum gap between two assistant-text flushes during a stream. Was 16ms +// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every +// token got its own React commit + Streamdown markdown re-parse, scaling +// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens +// batch into one commit at 60 tok/sec without introducing visible lag on the +// streaming text (still 30 fps of visible text growth). Big perceived +// smoothness win on long messages with big trailing paragraphs; see +// `scripts/profile-typing-lag.md` for the measurement work behind this. +export const STREAM_DELTA_FLUSH_MS = 33 + +// Gateway/provider failures sometimes arrive as message.complete text instead +// of an explicit error event. Treat matches as inline assistant errors so they +// persist like real error events and don't get erased by hydrate fallback. +const COMPLETION_ERROR_PATTERNS = [ + /^API call failed after \d+ retries:/i, + /^HTTP\s+\d{3}\b/i, + /^(Provider|Gateway)\s+error:/i +] + +export function completionErrorText(finalText: string): string | null { + const text = finalText.trim() + + return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null +} + +export const SUBAGENT_EVENT_TYPES = new Set([ + 'subagent.spawn_requested', + 'subagent.start', + 'subagent.thinking', + 'subagent.tool', + 'subagent.progress', + 'subagent.complete' +]) + +// Anonymous progress events that carry todos but no name still belong to the +// todo stream; named todo events are obviously routed there too. +export function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined { + if (!payload) { + return undefined + } + + const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos')) + + return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined +} + +function asRecord(value: unknown): Record { + return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {} +} + +function parseMaybeRecord(value: unknown): Record { + if (typeof value === 'string') { + try { + return asRecord(JSON.parse(value)) + } catch { + return {} + } + } + + return asRecord(value) +} + +const firstString = (...candidates: unknown[]): string => { + for (const v of candidates) { + if (typeof v === 'string' && v) { + return v + } + } + + return '' +} + +export function delegateTaskPayloads( + payload: GatewayEventPayload | undefined, + phase: 'running' | 'complete', + sourceEventType?: string +): Record[] { + if (payload?.name !== 'delegate_task') { + return [] + } + + const args = parseMaybeRecord(payload.args ?? payload.input) + const result = parseMaybeRecord(payload.result) + const rawTasks = Array.isArray(args.tasks) ? args.tasks : [] + const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args] + const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running' + const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task' + const progressText = firstString(payload.preview, payload.message, payload.context) + + const eventType = + phase === 'complete' + ? 'subagent.complete' + : sourceEventType === 'tool.start' + ? 'subagent.start' + : 'subagent.progress' + + return tasks.map((task, index) => { + const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task' + const summary = firstString(result.summary, payload.summary, payload.message) + + return { + depth: 0, + duration_seconds: payload.duration_s, + goal, + status, + subagent_id: `delegate-tool:${toolId}:${index}`, + summary: summary || undefined, + task_count: tasks.length, + task_index: index, + text: eventType === 'subagent.progress' ? progressText || goal : undefined, + tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined, + tool_preview: eventType === 'subagent.start' ? progressText : undefined, + toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [], + event_type: eventType, + output_tail: + phase === 'complete' && summary + ? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }] + : undefined + } + }) +} From 51a710e57e931c65a6e44eed694ca437af5ae06d Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Tue, 30 Jun 2026 03:11:14 -0500 Subject: [PATCH 2/2] refactor(desktop): extract gateway-event dispatcher into its own sub-hook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The remaining bulk of useMessageStream was handleGatewayEvent — a ~550-line event-type dispatcher. Lift it into a colocated useGatewayEventHandler sub-hook (use-message-stream/gateway-event.ts): the values it closed over (sibling streaming callbacks + the 3 stable refs the deps array omitted + options) become a typed GatewayEventDeps object; the dispatcher body moves verbatim. Pure restructuring, no behaviour change (utils tests still green). index.ts: 1,120 -> 540. --- .../hooks/use-message-stream/gateway-event.ts | 649 ++++++++++++++++++ .../session/hooks/use-message-stream/index.ts | 618 +---------------- 2 files changed, 668 insertions(+), 599 deletions(-) create mode 100644 apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts b/apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts new file mode 100644 index 00000000000..8bb4010937f --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts @@ -0,0 +1,649 @@ +import type { QueryClient } from '@tanstack/react-query' +import { type MutableRefObject, useCallback } from 'react' + +import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream' +import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer' +import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' +import { translateNow } from '@/i18n' +import { type GatewayEventPayload, textPart } from '@/lib/chat-messages' +import { coerceGatewayText, coerceThinkingText, normalizePersonalityValue } from '@/lib/chat-runtime' +import { playCompletionSound } from '@/lib/completion-sound' +import { gatewayEventRequiresSessionId } from '@/lib/gateway-events' +import { triggerHaptic } from '@/lib/haptics' +import { isProviderSetupErrorMessage } from '@/lib/provider-setup-errors' +import { clearClarifyRequest, setClarifyRequest } from '@/store/clarify' +import { setSessionCompacting } from '@/store/compaction' +import { refreshBackgroundProcesses } from '@/store/composer-status' +import { $gateway } from '@/store/gateway' +import { dispatchNativeNotification } from '@/store/native-notifications' +import { notify } from '@/store/notifications' +import { requestDesktopOnboarding } from '@/store/onboarding' +import { flashPetActivity, markPetUnread, setPetActivity } from '@/store/pet' +import { followActiveSessionCwd } from '@/store/projects' +import { clearAllPrompts, setApprovalRequest, setSecretRequest, setSudoRequest } from '@/store/prompts' +import { + $currentCwd, + setCurrentBranch, + setCurrentCwd, + setCurrentFastMode, + setCurrentModel, + setCurrentPersonality, + setCurrentProvider, + setCurrentReasoningEffort, + setCurrentServiceTier, + setCurrentUsage, + setSessions, + setTurnStartedAt, + setYoloActive +} from '@/store/session' +import { clearSessionSubagents, pruneDelegateFallbackSubagents, upsertSubagent } from '@/store/subagents' +import { recordToolDiff } from '@/store/tool-diffs' +import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events' +import type { RpcEvent } from '@/types/hermes' + +import type { ClientSessionState } from '../../../types' + +import { hasSessionInfoStatePatch, sessionInfoStatePatch, SUBAGENT_EVENT_TYPES, toTodoPayload } from './utils' + +interface GatewayEventDeps { + activeSessionIdRef: MutableRefObject + compactedTurnRef: MutableRefObject> + lastCwdInfoSessionRef: MutableRefObject + nativeSubagentSessionsRef: MutableRefObject> + appendAssistantDelta: (sessionId: string, delta: string) => void + appendReasoningDelta: (sessionId: string, delta: string, replace?: boolean) => void + completeAssistantMessage: (sessionId: string, text: string) => void + failAssistantMessage: (sessionId: string, errorMessage: string) => void + flushQueuedDeltas: (sessionId?: string) => void + queryClient: QueryClient + refreshHermesConfig: () => Promise + sessionInterrupted: (sessionId: string) => boolean + updateSessionState: ( + sessionId: string, + updater: (state: ClientSessionState) => ClientSessionState, + storedSessionId?: string | null + ) => ClientSessionState + upsertToolCall: ( + sessionId: string, + payload: GatewayEventPayload | undefined, + phase: 'running' | 'complete', + sourceEventType?: string + ) => void +} + +/** The gateway-event dispatcher, extracted from useMessageStream. */ +export function useGatewayEventHandler(deps: GatewayEventDeps) { + const { + appendAssistantDelta, + appendReasoningDelta, + activeSessionIdRef, + compactedTurnRef, + lastCwdInfoSessionRef, + nativeSubagentSessionsRef, + completeAssistantMessage, + failAssistantMessage, + flushQueuedDeltas, + queryClient, + refreshHermesConfig, + sessionInterrupted, + updateSessionState, + upsertToolCall + } = deps + + return useCallback( + (event: RpcEvent) => { + const payload = event.payload as GatewayEventPayload | undefined + const explicitSid = event.session_id || '' + + if (!explicitSid && gatewayEventRequiresSessionId(event.type)) { + return + } + + const sessionId = explicitSid || activeSessionIdRef.current + const isActiveEvent = !!sessionId && sessionId === activeSessionIdRef.current + + if (event.type === 'gateway.ready') { + return + } else if (event.type === 'session.info') { + // Apply session-scoped fields when the event targets the active + // session, OR when it's a global broadcast and we have no session. + const apply = explicitSid ? isActiveEvent : !activeSessionIdRef.current + const statePatch = sessionInfoStatePatch(payload) + const hasStatePatch = hasSessionInfoStatePatch(statePatch) + const modelChanged = typeof payload?.model === 'string' + const providerChanged = typeof payload?.provider === 'string' + const runningChanged = typeof payload?.running === 'boolean' + + if (apply) { + if (modelChanged) { + setCurrentModel(payload!.model || '') + } + + if (providerChanged) { + setCurrentProvider(payload!.provider || '') + } + + if (typeof payload?.cwd === 'string') { + // The active session's agent can relocate itself (new repo/worktree + // via the terminal). When the SAME active session's cwd actually + // moves, follow it — refresh the project tree + scope so the sidebar + // tracks the live thread. A fresh selection (different session id) + // is a switch, not a move, so it refreshes data without yanking scope. + const cwdMoved = payload.cwd !== $currentCwd.get() + const sameSession = !!sessionId && sessionId === lastCwdInfoSessionRef.current + + lastCwdInfoSessionRef.current = sessionId + setCurrentCwd(payload.cwd) + + if (cwdMoved && sameSession) { + void followActiveSessionCwd(payload.cwd) + } + } + + if (typeof payload?.branch === 'string') { + setCurrentBranch(payload.branch) + } + + if (typeof payload?.personality === 'string') { + setCurrentPersonality(normalizePersonalityValue(payload.personality)) + } + + if (typeof payload?.reasoning_effort === 'string') { + setCurrentReasoningEffort(payload.reasoning_effort) + } + + if (typeof payload?.service_tier === 'string') { + setCurrentServiceTier(payload.service_tier) + } + + if (typeof payload?.fast === 'boolean') { + setCurrentFastMode(payload.fast) + } + + if (typeof payload?.yolo === 'boolean') { + setYoloActive(payload.yolo) + } + } + + if (sessionId && hasStatePatch) { + updateSessionState(sessionId, state => ({ + ...state, + ...statePatch, + branch: statePatch.branch ?? state.branch, + cwd: statePatch.cwd ?? state.cwd + })) + } + + if (apply) { + if (runningChanged && sessionId) { + updateSessionState(sessionId, state => { + const busy = Boolean(payload!.running) + + if (state.busy === busy && (busy || !state.awaitingResponse)) { + return state + } + + if (busy) { + return { + ...state, + busy, + turnStartedAt: state.turnStartedAt ?? Date.now() + } + } + + if (state.awaitingResponse && !state.sawAssistantPayload) { + return state + } + + return { + ...state, + awaitingResponse: false, + busy, + pendingBranchGroup: null, + streamId: null, + turnStartedAt: null + } + }) + } + } + + if (payload?.usage && (!explicitSid || isActiveEvent)) { + setCurrentUsage(current => ({ ...current, ...payload.usage })) + } + + if (typeof payload?.credential_warning === 'string' && payload.credential_warning) { + requestDesktopOnboarding(payload.credential_warning) + } + + void refreshHermesConfig() + + if (modelChanged || providerChanged) { + void queryClient.invalidateQueries({ + queryKey: explicitSid && sessionId ? ['model-options', sessionId] : ['model-options'] + }) + } + } else if (event.type === 'message.start') { + if (!sessionId) { + return + } + + flushQueuedDeltas(sessionId) + clearSessionSubagents(sessionId) + setSessionCompacting(sessionId, false) + compactedTurnRef.current.delete(sessionId) + nativeSubagentSessionsRef.current.delete(sessionId) + + if (isActiveEvent) { + triggerHaptic('streamStart') + } + + updateSessionState(sessionId, state => ({ + ...state, + busy: true, + awaitingResponse: true, + sawAssistantPayload: false, + interrupted: false, + turnStartedAt: Date.now() + })) + + if (isActiveEvent) { + setTurnStartedAt(Date.now()) + } + } else if (event.type === 'message.delta') { + if (sessionId) { + appendAssistantDelta(sessionId, coerceGatewayText(payload?.text)) + } + } else if (event.type === 'thinking.delta') { + // thinking.delta carries the kawaii spinner status (face + verb from + // KawaiiSpinner), not real reasoning. The bottom-of-thread loading + // indicator already covers that UX, so we ignore these events to + // avoid a duplicative "Thinking" disclosure showing spinner text. + } else if (event.type === 'reasoning.delta') { + if (sessionId) { + appendReasoningDelta(sessionId, coerceThinkingText(payload?.text)) + } + + if (isActiveEvent) { + setPetActivity({ reasoning: true }) + } + } else if (event.type === 'reasoning.available') { + if (sessionId) { + appendReasoningDelta(sessionId, coerceThinkingText(payload?.text), true) + } + + if (isActiveEvent) { + setPetActivity({ reasoning: true }) + } + } else if (event.type === 'moa.reference') { + // MoA reference-model output — surface as a labelled thinking chunk + // (tagged with the source model) before the aggregator's response, so + // the mixture-of-agents process is visible. Reuses the reasoning + // disclosure rather than introducing a parallel surface. + if (sessionId) { + const label = coerceGatewayText(payload?.label) || 'reference' + const idx = typeof payload?.index === 'number' ? payload.index : undefined + const cnt = typeof payload?.count === 'number' ? payload.count : undefined + const header = idx && cnt ? `◇ Reference ${idx}/${cnt} — ${label}` : `◇ Reference — ${label}` + const body = coerceThinkingText(payload?.text) + appendReasoningDelta(sessionId, `${header}\n${body}\n\n`, true) + } + + if (isActiveEvent) { + setPetActivity({ reasoning: true }) + } + } else if (event.type === 'moa.aggregating') { + // Status transition only; the aggregator's reply arrives via the normal + // message stream. No reasoning/transcript mutation here. + if (isActiveEvent) { + setPetActivity({ reasoning: true }) + } + } else if (event.type === 'message.complete') { + if (!sessionId) { + return + } + + // Turn ended — drop any blocking prompt still open for THIS session + // (e.g. interrupted, or the approval already resolved). Scoped to the + // session so a background turn finishing can't wipe the active chat's + // prompt, and vice versa. + clearAllPrompts(sessionId) + clearClarifyRequest(undefined, sessionId) + setSessionCompacting(sessionId, false) + + flushQueuedDeltas(sessionId) + + playCompletionSound() + + const finalText = coerceGatewayText(payload?.text) || coerceGatewayText(payload?.rendered) + completeAssistantMessage(sessionId, finalText) + + if (isActiveEvent) { + setTurnStartedAt(null) + + // Pet beat: a finished turn always celebrates — go straight to the + // jump, never linger on the run/reason pose. One atom update (clears + // toolRunning/reasoning AND sets celebrate together) so no stray "run" + // frame leaks to the sprite — including the popped-out overlay, which + // mirrors each activity change. The jump runs ~2 loops, then settles. + flashPetActivity({ celebrate: true, reasoning: false, toolRunning: false }, 2200) + + // Light up the pet's mail icon if the user wasn't looking when the turn + // finished — a glanceable "new message" hint on the popped-out overlay. + // Cleared when they open the app via the mail icon or refocus the window. + if (typeof document !== 'undefined' && !document.hasFocus()) { + markPetUnread() + } + } + + if (payload?.usage) { + setCurrentUsage(current => ({ ...current, ...payload.usage })) + } + } else if (event.type === 'session.title') { + // Live auto-title push (titler runs async, after the turn's refresh). + const storedId = typeof payload?.session_id === 'string' ? payload.session_id : '' + const nextTitle = typeof payload?.title === 'string' ? payload.title.trim() : '' + + if (storedId && nextTitle) { + setSessions(prev => + prev.map(s => (s.id === storedId || s._lineage_root_id === storedId ? { ...s, title: nextTitle } : s)) + ) + } + } else if (event.type === 'tool.start' || event.type === 'tool.progress' || event.type === 'tool.generating') { + if (!sessionId) { + return + } + + flushQueuedDeltas(sessionId) + upsertToolCall(sessionId, toTodoPayload(payload) ?? payload, 'running', event.type) + + if (isActiveEvent) { + setPetActivity({ reasoning: false, toolRunning: true }) + } + } else if (event.type === 'tool.complete') { + if (sessionId) { + flushQueuedDeltas(sessionId) + upsertToolCall(sessionId, toTodoPayload(payload) ?? payload, 'complete', event.type) + + if (isActiveEvent) { + setPetActivity({ toolRunning: false }) + } + + // A pending clarify blocks the turn, so the first tool.complete after + // one is the clarify resolving — drop the "needs input" flag here so + // the sidebar indicator clears as soon as it's answered, not only at + // message.complete. + updateSessionState(sessionId, state => (state.needsInput ? { ...state, needsInput: false } : state)) + + // terminal/process tool calls are the only things that spawn or reap + // background processes — sync the composer status stack right after. + if (!sessionInterrupted(sessionId) && (payload?.name === 'terminal' || payload?.name === 'process')) { + void refreshBackgroundProcesses(sessionId) + } + } + + if (typeof payload?.inline_diff === 'string' && payload.inline_diff.trim()) { + recordToolDiff(payload.tool_id || payload.name || '', payload.inline_diff) + } + + // A file-mutating tool just finished — nudge the git-mirroring surfaces + // (coding rail, review pane, file tree) to refresh. Event-driven, not + // polled: fires exactly when the agent touches the tree. + if (payload && toolMayMutateFiles(payload)) { + notifyWorkspaceChanged() + } + } else if (SUBAGENT_EVENT_TYPES.has(event.type)) { + if (sessionId && payload && !sessionInterrupted(sessionId)) { + if (!nativeSubagentSessionsRef.current.has(sessionId)) { + pruneDelegateFallbackSubagents(sessionId) + } + + nativeSubagentSessionsRef.current.add(sessionId) + upsertSubagent( + sessionId, + payload as Record, + event.type === 'subagent.spawn_requested' || event.type === 'subagent.start', + event.type + ) + } + } else if (event.type === 'clarify.request') { + // Surface the clarify tool's overlay. The Python side is blocked on + // `clarify.respond`, so without this handler the agent would hang + // forever (see tools/clarify_tool.py + tui_gateway/server.py:_block). + // + // Store the request for whichever session raised it — even a background + // one. clarify.request is a one-shot event; if we dropped it for an + // unfocused session, that session would block on `clarify.respond` + // indefinitely and re-focusing it could never recover (the event is + // gone). Parking it per-session lets the user answer once they switch + // over; the inline ClarifyTool reads the active session's entry. + const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' + const question = typeof payload?.question === 'string' ? payload.question : '' + + if (requestId && question) { + setClarifyRequest({ + requestId, + question, + choices: Array.isArray(payload?.choices) ? payload!.choices!.filter(c => typeof c === 'string') : null, + sessionId: sessionId ?? null + }) + + // The transcript only renders the active session, so a background + // clarify is otherwise invisible (the row just keeps spinning like + // it's working). Flag the session so the sidebar shows a persistent + // "needs input" indicator on its row — works for the active session + // too, and survives alt-tab / window blur (unlike a toast). + if (sessionId) { + updateSessionState(sessionId, state => ({ ...state, needsInput: true })) + } + + dispatchNativeNotification({ + body: question, + kind: 'input', + sessionId, + title: translateNow('notifications.native.inputTitle') + }) + } + } else if (event.type === 'approval.request') { + // Dangerous-command / execute_code approval. The Python side is blocked + // in _await_gateway_decision() until approval.respond lands; without + // this the agent stalls until its 5-min timeout and the tool is BLOCKED. + // Park it per-session (like clarify) so a *background* profile's turn can + // raise it and wait — the sidebar flags "needs input" and the inline bar + // surfaces once the user focuses that chat. + const command = typeof payload?.command === 'string' ? payload.command : '' + const description = typeof payload?.description === 'string' ? payload.description : 'dangerous command' + + setApprovalRequest({ + // false only when a tirith warning forbids it; backend omits the field otherwise. + allowPermanent: payload?.allow_permanent !== false, + command, + description, + sessionId: sessionId ?? null + }) + + if (sessionId) { + updateSessionState(sessionId, state => ({ ...state, needsInput: true })) + } + + dispatchNativeNotification({ + actions: [ + { id: 'approve', text: translateNow('notifications.native.approveAction') }, + { id: 'reject', text: translateNow('notifications.native.rejectAction') } + ], + body: command || description, + kind: 'approval', + sessionId, + title: translateNow('notifications.native.approvalTitle') + }) + } else if (event.type === 'sudo.request') { + // Sudo password capture (tools/terminal_tool.py). Blocked on + // sudo.respond {request_id, password}. + const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' + + if (requestId) { + setSudoRequest({ requestId, sessionId: sessionId ?? null }) + + if (sessionId) { + updateSessionState(sessionId, state => ({ ...state, needsInput: true })) + } + + dispatchNativeNotification({ + body: translateNow('notifications.native.inputBody'), + kind: 'input', + sessionId, + title: translateNow('notifications.native.inputTitle') + }) + } + } else if (event.type === 'secret.request') { + // Skill credential capture (tools/skills_tool.py). Blocked on + // secret.respond {request_id, value}. + const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' + + if (requestId) { + const envVar = typeof payload?.env_var === 'string' ? payload.env_var : '' + const promptText = typeof payload?.prompt === 'string' ? payload.prompt : '' + + setSecretRequest({ + requestId, + envVar, + prompt: promptText, + sessionId: sessionId ?? null + }) + + if (sessionId) { + updateSessionState(sessionId, state => ({ ...state, needsInput: true })) + } + + dispatchNativeNotification({ + body: promptText || envVar || translateNow('notifications.native.inputBody'), + kind: 'input', + sessionId, + title: translateNow('notifications.native.inputTitle') + }) + } + } else if (event.type === 'terminal.read.request') { + // read_terminal tool: serialize the renderer's xterm buffer and answer + // immediately (Python blocks on the respond). Empty text = no live pane. + const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' + + if (requestId) { + const start = typeof payload?.start === 'number' ? payload.start : undefined + const count = typeof payload?.count === 'number' ? payload.count : undefined + const result = readActiveTerminal({ start, count }) + + void $gateway.get()?.request('terminal.read.respond', { + request_id: requestId, + text: result ? JSON.stringify(result) : '' + }) + } + } else if (event.type === 'agent.terminal.output') { + // Live chunk from a background process → its read-only agent terminal tab. + writeAgentTerminalChunk(payload?.process_id ?? '', payload?.chunk ?? '') + } else if (event.type === 'terminal.close') { + // Agent closed its own read-only tab via the desktop-gated close_terminal tool. + // The process is untouched — this only drops the view. + closeAgentTerminalByProc(payload?.process_id ?? '') + } else if (event.type === 'status.update') { + if (sessionId && payload?.kind === 'compacting') { + setSessionCompacting(sessionId, true) + compactedTurnRef.current.add(sessionId) + } else if (sessionId && payload?.kind === 'process') { + // The gateway's notification poller announces background process + // completions / watch matches here — re-sync the status stack. + void refreshBackgroundProcesses(sessionId) + } + } else if (event.type === 'review.summary') { + // Self-improvement background review saved something to memory/skills + // and emitted a persistent summary (Python formats it as + // "💾 Self-improvement review: …"). The CLI prints this via + // prompt_toolkit and the Ink TUI renders it as a system line; the + // desktop has neither, so without this handler the skill/memory + // change happens silently. Surface it as a persistent system message + // in the transcript so the user is always informed — it must not be a + // transient toast that can be missed. + const text = coerceGatewayText(payload?.text).trim() + + if (text && sessionId) { + flushQueuedDeltas(sessionId) + updateSessionState(sessionId, state => ({ + ...state, + messages: [ + ...state.messages, + { + id: `review-summary-${Date.now()}`, + role: 'system', + parts: [textPart(text)], + timestamp: Math.floor(Date.now() / 1000) + } + ] + })) + } + } else if (event.type === 'error') { + const errorMessage = payload?.message || 'Hermes reported an error' + const looksLikeProviderSetup = isProviderSetupErrorMessage(errorMessage) + + // A turn that errors out has also ended — drop any open blocking prompt + // for this session so an approval/sudo/secret overlay can't linger past + // the failed turn (same intent as the message.complete clear). + if (sessionId) { + clearAllPrompts(sessionId) + clearClarifyRequest(undefined, sessionId) + setSessionCompacting(sessionId, false) + compactedTurnRef.current.delete(sessionId) + } + + if (isActiveEvent) { + setPetActivity({ reasoning: false, toolRunning: false }) + flashPetActivity({ error: true }) + } + + dispatchNativeNotification({ + body: errorMessage, + kind: 'turnError', + sessionId, + title: translateNow('notifications.native.turnErrorTitle') + }) + + if (looksLikeProviderSetup) { + requestDesktopOnboarding(errorMessage) + } else { + // Toast globally, not just when the failing thread is focused: a + // turn-ending error (e.g. out of funds) blocks every thread, so the + // inline error alone is too easy to miss. The stable id collapses the + // same error from multiple blocked threads into one toast. + notify({ + id: `gateway-error:${errorMessage}`, + kind: 'error', + title: 'Hermes error', + message: errorMessage + }) + } + + if (sessionId) { + flushQueuedDeltas(sessionId) + failAssistantMessage(sessionId, errorMessage) + } + + if (isActiveEvent) { + setTurnStartedAt(null) + } + } + }, + [ + appendAssistantDelta, + appendReasoningDelta, + activeSessionIdRef, + compactedTurnRef, + completeAssistantMessage, + failAssistantMessage, + flushQueuedDeltas, + lastCwdInfoSessionRef, + nativeSubagentSessionsRef, + queryClient, + refreshHermesConfig, + sessionInterrupted, + updateSessionState, + upsertToolCall + ] + ) +} diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/index.ts b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts index 51136a7451f..65a203a215e 100644 --- a/apps/desktop/src/app/session/hooks/use-message-stream/index.ts +++ b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts @@ -1,9 +1,6 @@ import type { QueryClient } from '@tanstack/react-query' import { type MutableRefObject, useCallback, useEffect, useRef } from 'react' -import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream' -import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer' -import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { translateNow } from '@/i18n' import { appendAssistantTextPart, @@ -15,63 +12,23 @@ import { type GatewayEventPayload, reasoningPart, renderMediaTags, - textPart, upsertToolPart } from '@/lib/chat-messages' -import { coerceGatewayText, coerceThinkingText, normalizePersonalityValue } from '@/lib/chat-runtime' -import { playCompletionSound } from '@/lib/completion-sound' -import { gatewayEventRequiresSessionId } from '@/lib/gateway-events' import { dedupeGeneratedImageEchoesInParts, generatedImageEchoSources, stripGeneratedImageEchoes } from '@/lib/generated-images' -import { triggerHaptic } from '@/lib/haptics' -import { isProviderSetupErrorMessage } from '@/lib/provider-setup-errors' import { parseTodos } from '@/lib/todos' -import { clearClarifyRequest, setClarifyRequest } from '@/store/clarify' -import { setSessionCompacting } from '@/store/compaction' -import { refreshBackgroundProcesses } from '@/store/composer-status' -import { $gateway } from '@/store/gateway' import { dispatchNativeNotification } from '@/store/native-notifications' -import { notify } from '@/store/notifications' -import { requestDesktopOnboarding } from '@/store/onboarding' -import { flashPetActivity, markPetUnread, setPetActivity } from '@/store/pet' -import { followActiveSessionCwd } from '@/store/projects' -import { clearAllPrompts, setApprovalRequest, setSecretRequest, setSudoRequest } from '@/store/prompts' -import { - $currentCwd, - setCurrentBranch, - setCurrentCwd, - setCurrentFastMode, - setCurrentModel, - setCurrentPersonality, - setCurrentProvider, - setCurrentReasoningEffort, - setCurrentServiceTier, - setCurrentUsage, - setSessions, - setTurnStartedAt, - setYoloActive -} from '@/store/session' import { broadcastSessionsChanged } from '@/store/session-sync' -import { clearSessionSubagents, pruneDelegateFallbackSubagents, upsertSubagent } from '@/store/subagents' +import { upsertSubagent } from '@/store/subagents' import { setSessionTodos } from '@/store/todos' -import { recordToolDiff } from '@/store/tool-diffs' -import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events' -import type { RpcEvent } from '@/types/hermes' import type { ClientSessionState } from '../../../types' -import { - completionErrorText, - delegateTaskPayloads, - hasSessionInfoStatePatch, - sessionInfoStatePatch, - STREAM_DELTA_FLUSH_MS, - SUBAGENT_EVENT_TYPES, - toTodoPayload -} from './utils' +import { useGatewayEventHandler } from './gateway-event' +import { completionErrorText, delegateTaskPayloads, STREAM_DELTA_FLUSH_MS } from './utils' interface MessageStreamOptions { activeSessionIdRef: MutableRefObject @@ -556,559 +513,22 @@ export function useMessageStream({ [updateSessionState] ) - const handleGatewayEvent = useCallback( - (event: RpcEvent) => { - const payload = event.payload as GatewayEventPayload | undefined - const explicitSid = event.session_id || '' - - if (!explicitSid && gatewayEventRequiresSessionId(event.type)) { - return - } - - const sessionId = explicitSid || activeSessionIdRef.current - const isActiveEvent = !!sessionId && sessionId === activeSessionIdRef.current - - if (event.type === 'gateway.ready') { - return - } else if (event.type === 'session.info') { - // Apply session-scoped fields when the event targets the active - // session, OR when it's a global broadcast and we have no session. - const apply = explicitSid ? isActiveEvent : !activeSessionIdRef.current - const statePatch = sessionInfoStatePatch(payload) - const hasStatePatch = hasSessionInfoStatePatch(statePatch) - const modelChanged = typeof payload?.model === 'string' - const providerChanged = typeof payload?.provider === 'string' - const runningChanged = typeof payload?.running === 'boolean' - - if (apply) { - if (modelChanged) { - setCurrentModel(payload!.model || '') - } - - if (providerChanged) { - setCurrentProvider(payload!.provider || '') - } - - if (typeof payload?.cwd === 'string') { - // The active session's agent can relocate itself (new repo/worktree - // via the terminal). When the SAME active session's cwd actually - // moves, follow it — refresh the project tree + scope so the sidebar - // tracks the live thread. A fresh selection (different session id) - // is a switch, not a move, so it refreshes data without yanking scope. - const cwdMoved = payload.cwd !== $currentCwd.get() - const sameSession = !!sessionId && sessionId === lastCwdInfoSessionRef.current - - lastCwdInfoSessionRef.current = sessionId - setCurrentCwd(payload.cwd) - - if (cwdMoved && sameSession) { - void followActiveSessionCwd(payload.cwd) - } - } - - if (typeof payload?.branch === 'string') { - setCurrentBranch(payload.branch) - } - - if (typeof payload?.personality === 'string') { - setCurrentPersonality(normalizePersonalityValue(payload.personality)) - } - - if (typeof payload?.reasoning_effort === 'string') { - setCurrentReasoningEffort(payload.reasoning_effort) - } - - if (typeof payload?.service_tier === 'string') { - setCurrentServiceTier(payload.service_tier) - } - - if (typeof payload?.fast === 'boolean') { - setCurrentFastMode(payload.fast) - } - - if (typeof payload?.yolo === 'boolean') { - setYoloActive(payload.yolo) - } - } - - if (sessionId && hasStatePatch) { - updateSessionState(sessionId, state => ({ - ...state, - ...statePatch, - branch: statePatch.branch ?? state.branch, - cwd: statePatch.cwd ?? state.cwd - })) - } - - if (apply) { - if (runningChanged && sessionId) { - updateSessionState(sessionId, state => { - const busy = Boolean(payload!.running) - - if (state.busy === busy && (busy || !state.awaitingResponse)) { - return state - } - - if (busy) { - return { - ...state, - busy, - turnStartedAt: state.turnStartedAt ?? Date.now() - } - } - - if (state.awaitingResponse && !state.sawAssistantPayload) { - return state - } - - return { - ...state, - awaitingResponse: false, - busy, - pendingBranchGroup: null, - streamId: null, - turnStartedAt: null - } - }) - } - } - - if (payload?.usage && (!explicitSid || isActiveEvent)) { - setCurrentUsage(current => ({ ...current, ...payload.usage })) - } - - if (typeof payload?.credential_warning === 'string' && payload.credential_warning) { - requestDesktopOnboarding(payload.credential_warning) - } - - void refreshHermesConfig() - - if (modelChanged || providerChanged) { - void queryClient.invalidateQueries({ - queryKey: explicitSid && sessionId ? ['model-options', sessionId] : ['model-options'] - }) - } - } else if (event.type === 'message.start') { - if (!sessionId) { - return - } - - flushQueuedDeltas(sessionId) - clearSessionSubagents(sessionId) - setSessionCompacting(sessionId, false) - compactedTurnRef.current.delete(sessionId) - nativeSubagentSessionsRef.current.delete(sessionId) - - if (isActiveEvent) { - triggerHaptic('streamStart') - } - - updateSessionState(sessionId, state => ({ - ...state, - busy: true, - awaitingResponse: true, - sawAssistantPayload: false, - interrupted: false, - turnStartedAt: Date.now() - })) - - if (isActiveEvent) { - setTurnStartedAt(Date.now()) - } - } else if (event.type === 'message.delta') { - if (sessionId) { - appendAssistantDelta(sessionId, coerceGatewayText(payload?.text)) - } - } else if (event.type === 'thinking.delta') { - // thinking.delta carries the kawaii spinner status (face + verb from - // KawaiiSpinner), not real reasoning. The bottom-of-thread loading - // indicator already covers that UX, so we ignore these events to - // avoid a duplicative "Thinking" disclosure showing spinner text. - } else if (event.type === 'reasoning.delta') { - if (sessionId) { - appendReasoningDelta(sessionId, coerceThinkingText(payload?.text)) - } - - if (isActiveEvent) { - setPetActivity({ reasoning: true }) - } - } else if (event.type === 'reasoning.available') { - if (sessionId) { - appendReasoningDelta(sessionId, coerceThinkingText(payload?.text), true) - } - - if (isActiveEvent) { - setPetActivity({ reasoning: true }) - } - } else if (event.type === 'moa.reference') { - // MoA reference-model output — surface as a labelled thinking chunk - // (tagged with the source model) before the aggregator's response, so - // the mixture-of-agents process is visible. Reuses the reasoning - // disclosure rather than introducing a parallel surface. - if (sessionId) { - const label = coerceGatewayText(payload?.label) || 'reference' - const idx = typeof payload?.index === 'number' ? payload.index : undefined - const cnt = typeof payload?.count === 'number' ? payload.count : undefined - const header = idx && cnt ? `◇ Reference ${idx}/${cnt} — ${label}` : `◇ Reference — ${label}` - const body = coerceThinkingText(payload?.text) - appendReasoningDelta(sessionId, `${header}\n${body}\n\n`, true) - } - - if (isActiveEvent) { - setPetActivity({ reasoning: true }) - } - } else if (event.type === 'moa.aggregating') { - // Status transition only; the aggregator's reply arrives via the normal - // message stream. No reasoning/transcript mutation here. - if (isActiveEvent) { - setPetActivity({ reasoning: true }) - } - } else if (event.type === 'message.complete') { - if (!sessionId) { - return - } - - // Turn ended — drop any blocking prompt still open for THIS session - // (e.g. interrupted, or the approval already resolved). Scoped to the - // session so a background turn finishing can't wipe the active chat's - // prompt, and vice versa. - clearAllPrompts(sessionId) - clearClarifyRequest(undefined, sessionId) - setSessionCompacting(sessionId, false) - - flushQueuedDeltas(sessionId) - - playCompletionSound() - - const finalText = coerceGatewayText(payload?.text) || coerceGatewayText(payload?.rendered) - completeAssistantMessage(sessionId, finalText) - - if (isActiveEvent) { - setTurnStartedAt(null) - - // Pet beat: a finished turn always celebrates — go straight to the - // jump, never linger on the run/reason pose. One atom update (clears - // toolRunning/reasoning AND sets celebrate together) so no stray "run" - // frame leaks to the sprite — including the popped-out overlay, which - // mirrors each activity change. The jump runs ~2 loops, then settles. - flashPetActivity({ celebrate: true, reasoning: false, toolRunning: false }, 2200) - - // Light up the pet's mail icon if the user wasn't looking when the turn - // finished — a glanceable "new message" hint on the popped-out overlay. - // Cleared when they open the app via the mail icon or refocus the window. - if (typeof document !== 'undefined' && !document.hasFocus()) { - markPetUnread() - } - } - - if (payload?.usage) { - setCurrentUsage(current => ({ ...current, ...payload.usage })) - } - } else if (event.type === 'session.title') { - // Live auto-title push (titler runs async, after the turn's refresh). - const storedId = typeof payload?.session_id === 'string' ? payload.session_id : '' - const nextTitle = typeof payload?.title === 'string' ? payload.title.trim() : '' - - if (storedId && nextTitle) { - setSessions(prev => - prev.map(s => (s.id === storedId || s._lineage_root_id === storedId ? { ...s, title: nextTitle } : s)) - ) - } - } else if (event.type === 'tool.start' || event.type === 'tool.progress' || event.type === 'tool.generating') { - if (!sessionId) { - return - } - - flushQueuedDeltas(sessionId) - upsertToolCall(sessionId, toTodoPayload(payload) ?? payload, 'running', event.type) - - if (isActiveEvent) { - setPetActivity({ reasoning: false, toolRunning: true }) - } - } else if (event.type === 'tool.complete') { - if (sessionId) { - flushQueuedDeltas(sessionId) - upsertToolCall(sessionId, toTodoPayload(payload) ?? payload, 'complete', event.type) - - if (isActiveEvent) { - setPetActivity({ toolRunning: false }) - } - - // A pending clarify blocks the turn, so the first tool.complete after - // one is the clarify resolving — drop the "needs input" flag here so - // the sidebar indicator clears as soon as it's answered, not only at - // message.complete. - updateSessionState(sessionId, state => (state.needsInput ? { ...state, needsInput: false } : state)) - - // terminal/process tool calls are the only things that spawn or reap - // background processes — sync the composer status stack right after. - if (!sessionInterrupted(sessionId) && (payload?.name === 'terminal' || payload?.name === 'process')) { - void refreshBackgroundProcesses(sessionId) - } - } - - if (typeof payload?.inline_diff === 'string' && payload.inline_diff.trim()) { - recordToolDiff(payload.tool_id || payload.name || '', payload.inline_diff) - } - - // A file-mutating tool just finished — nudge the git-mirroring surfaces - // (coding rail, review pane, file tree) to refresh. Event-driven, not - // polled: fires exactly when the agent touches the tree. - if (payload && toolMayMutateFiles(payload)) { - notifyWorkspaceChanged() - } - } else if (SUBAGENT_EVENT_TYPES.has(event.type)) { - if (sessionId && payload && !sessionInterrupted(sessionId)) { - if (!nativeSubagentSessionsRef.current.has(sessionId)) { - pruneDelegateFallbackSubagents(sessionId) - } - - nativeSubagentSessionsRef.current.add(sessionId) - upsertSubagent( - sessionId, - payload as Record, - event.type === 'subagent.spawn_requested' || event.type === 'subagent.start', - event.type - ) - } - } else if (event.type === 'clarify.request') { - // Surface the clarify tool's overlay. The Python side is blocked on - // `clarify.respond`, so without this handler the agent would hang - // forever (see tools/clarify_tool.py + tui_gateway/server.py:_block). - // - // Store the request for whichever session raised it — even a background - // one. clarify.request is a one-shot event; if we dropped it for an - // unfocused session, that session would block on `clarify.respond` - // indefinitely and re-focusing it could never recover (the event is - // gone). Parking it per-session lets the user answer once they switch - // over; the inline ClarifyTool reads the active session's entry. - const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' - const question = typeof payload?.question === 'string' ? payload.question : '' - - if (requestId && question) { - setClarifyRequest({ - requestId, - question, - choices: Array.isArray(payload?.choices) ? payload!.choices!.filter(c => typeof c === 'string') : null, - sessionId: sessionId ?? null - }) - - // The transcript only renders the active session, so a background - // clarify is otherwise invisible (the row just keeps spinning like - // it's working). Flag the session so the sidebar shows a persistent - // "needs input" indicator on its row — works for the active session - // too, and survives alt-tab / window blur (unlike a toast). - if (sessionId) { - updateSessionState(sessionId, state => ({ ...state, needsInput: true })) - } - - dispatchNativeNotification({ - body: question, - kind: 'input', - sessionId, - title: translateNow('notifications.native.inputTitle') - }) - } - } else if (event.type === 'approval.request') { - // Dangerous-command / execute_code approval. The Python side is blocked - // in _await_gateway_decision() until approval.respond lands; without - // this the agent stalls until its 5-min timeout and the tool is BLOCKED. - // Park it per-session (like clarify) so a *background* profile's turn can - // raise it and wait — the sidebar flags "needs input" and the inline bar - // surfaces once the user focuses that chat. - const command = typeof payload?.command === 'string' ? payload.command : '' - const description = typeof payload?.description === 'string' ? payload.description : 'dangerous command' - - setApprovalRequest({ - // false only when a tirith warning forbids it; backend omits the field otherwise. - allowPermanent: payload?.allow_permanent !== false, - command, - description, - sessionId: sessionId ?? null - }) - - if (sessionId) { - updateSessionState(sessionId, state => ({ ...state, needsInput: true })) - } - - dispatchNativeNotification({ - actions: [ - { id: 'approve', text: translateNow('notifications.native.approveAction') }, - { id: 'reject', text: translateNow('notifications.native.rejectAction') } - ], - body: command || description, - kind: 'approval', - sessionId, - title: translateNow('notifications.native.approvalTitle') - }) - } else if (event.type === 'sudo.request') { - // Sudo password capture (tools/terminal_tool.py). Blocked on - // sudo.respond {request_id, password}. - const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' - - if (requestId) { - setSudoRequest({ requestId, sessionId: sessionId ?? null }) - - if (sessionId) { - updateSessionState(sessionId, state => ({ ...state, needsInput: true })) - } - - dispatchNativeNotification({ - body: translateNow('notifications.native.inputBody'), - kind: 'input', - sessionId, - title: translateNow('notifications.native.inputTitle') - }) - } - } else if (event.type === 'secret.request') { - // Skill credential capture (tools/skills_tool.py). Blocked on - // secret.respond {request_id, value}. - const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' - - if (requestId) { - const envVar = typeof payload?.env_var === 'string' ? payload.env_var : '' - const promptText = typeof payload?.prompt === 'string' ? payload.prompt : '' - - setSecretRequest({ - requestId, - envVar, - prompt: promptText, - sessionId: sessionId ?? null - }) - - if (sessionId) { - updateSessionState(sessionId, state => ({ ...state, needsInput: true })) - } - - dispatchNativeNotification({ - body: promptText || envVar || translateNow('notifications.native.inputBody'), - kind: 'input', - sessionId, - title: translateNow('notifications.native.inputTitle') - }) - } - } else if (event.type === 'terminal.read.request') { - // read_terminal tool: serialize the renderer's xterm buffer and answer - // immediately (Python blocks on the respond). Empty text = no live pane. - const requestId = typeof payload?.request_id === 'string' ? payload.request_id : '' - - if (requestId) { - const start = typeof payload?.start === 'number' ? payload.start : undefined - const count = typeof payload?.count === 'number' ? payload.count : undefined - const result = readActiveTerminal({ start, count }) - - void $gateway.get()?.request('terminal.read.respond', { - request_id: requestId, - text: result ? JSON.stringify(result) : '' - }) - } - } else if (event.type === 'agent.terminal.output') { - // Live chunk from a background process → its read-only agent terminal tab. - writeAgentTerminalChunk(payload?.process_id ?? '', payload?.chunk ?? '') - } else if (event.type === 'terminal.close') { - // Agent closed its own read-only tab via the desktop-gated close_terminal tool. - // The process is untouched — this only drops the view. - closeAgentTerminalByProc(payload?.process_id ?? '') - } else if (event.type === 'status.update') { - if (sessionId && payload?.kind === 'compacting') { - setSessionCompacting(sessionId, true) - compactedTurnRef.current.add(sessionId) - } else if (sessionId && payload?.kind === 'process') { - // The gateway's notification poller announces background process - // completions / watch matches here — re-sync the status stack. - void refreshBackgroundProcesses(sessionId) - } - } else if (event.type === 'review.summary') { - // Self-improvement background review saved something to memory/skills - // and emitted a persistent summary (Python formats it as - // "💾 Self-improvement review: …"). The CLI prints this via - // prompt_toolkit and the Ink TUI renders it as a system line; the - // desktop has neither, so without this handler the skill/memory - // change happens silently. Surface it as a persistent system message - // in the transcript so the user is always informed — it must not be a - // transient toast that can be missed. - const text = coerceGatewayText(payload?.text).trim() - - if (text && sessionId) { - flushQueuedDeltas(sessionId) - updateSessionState(sessionId, state => ({ - ...state, - messages: [ - ...state.messages, - { - id: `review-summary-${Date.now()}`, - role: 'system', - parts: [textPart(text)], - timestamp: Math.floor(Date.now() / 1000) - } - ] - })) - } - } else if (event.type === 'error') { - const errorMessage = payload?.message || 'Hermes reported an error' - const looksLikeProviderSetup = isProviderSetupErrorMessage(errorMessage) - - // A turn that errors out has also ended — drop any open blocking prompt - // for this session so an approval/sudo/secret overlay can't linger past - // the failed turn (same intent as the message.complete clear). - if (sessionId) { - clearAllPrompts(sessionId) - clearClarifyRequest(undefined, sessionId) - setSessionCompacting(sessionId, false) - compactedTurnRef.current.delete(sessionId) - } - - if (isActiveEvent) { - setPetActivity({ reasoning: false, toolRunning: false }) - flashPetActivity({ error: true }) - } - - dispatchNativeNotification({ - body: errorMessage, - kind: 'turnError', - sessionId, - title: translateNow('notifications.native.turnErrorTitle') - }) - - if (looksLikeProviderSetup) { - requestDesktopOnboarding(errorMessage) - } else { - // Toast globally, not just when the failing thread is focused: a - // turn-ending error (e.g. out of funds) blocks every thread, so the - // inline error alone is too easy to miss. The stable id collapses the - // same error from multiple blocked threads into one toast. - notify({ - id: `gateway-error:${errorMessage}`, - kind: 'error', - title: 'Hermes error', - message: errorMessage - }) - } - - if (sessionId) { - flushQueuedDeltas(sessionId) - failAssistantMessage(sessionId, errorMessage) - } - - if (isActiveEvent) { - setTurnStartedAt(null) - } - } - }, - [ - appendAssistantDelta, - appendReasoningDelta, - activeSessionIdRef, - completeAssistantMessage, - failAssistantMessage, - flushQueuedDeltas, - queryClient, - refreshHermesConfig, - sessionInterrupted, - updateSessionState, - upsertToolCall - ] - ) + const handleGatewayEvent = useGatewayEventHandler({ + appendAssistantDelta, + appendReasoningDelta, + activeSessionIdRef, + compactedTurnRef, + lastCwdInfoSessionRef, + nativeSubagentSessionsRef, + completeAssistantMessage, + failAssistantMessage, + flushQueuedDeltas, + queryClient, + refreshHermesConfig, + sessionInterrupted, + updateSessionState, + upsertToolCall + }) return { appendAssistantDelta,