diff --git a/apps/desktop/src/app/session/hooks/use-message-stream.ts b/apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts similarity index 53% rename from apps/desktop/src/app/session/hooks/use-message-stream.ts rename to apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts index a455bae06cc..8bb4010937f 100644 --- a/apps/desktop/src/app/session/hooks/use-message-stream.ts +++ b/apps/desktop/src/app/session/hooks/use-message-stream/gateway-event.ts @@ -1,34 +1,16 @@ import type { QueryClient } from '@tanstack/react-query' -import { type MutableRefObject, useCallback, useEffect, useRef } from 'react' +import { type MutableRefObject, useCallback } from 'react' import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream' -import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer' +import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { translateNow } from '@/i18n' -import { - appendAssistantTextPart, - appendReasoningPart, - assistantTextPart, - type ChatMessage, - type ChatMessagePart, - chatMessageText, - type GatewayEventPayload, - reasoningPart, - renderMediaTags, - textPart, - upsertToolPart -} from '@/lib/chat-messages' +import { type GatewayEventPayload, textPart } from '@/lib/chat-messages' import { coerceGatewayText, coerceThinkingText, normalizePersonalityValue } from '@/lib/chat-runtime' import { playCompletionSound } from '@/lib/completion-sound' import { gatewayEventRequiresSessionId } from '@/lib/gateway-events' -import { - dedupeGeneratedImageEchoesInParts, - generatedImageEchoSources, - stripGeneratedImageEchoes -} from '@/lib/generated-images' import { triggerHaptic } from '@/lib/haptics' import { isProviderSetupErrorMessage } from '@/lib/provider-setup-errors' -import { parseTodos } from '@/lib/todos' import { clearClarifyRequest, setClarifyRequest } from '@/store/clarify' import { setSessionCompacting } from '@/store/compaction' import { refreshBackgroundProcesses } from '@/store/composer-status' @@ -54,674 +36,61 @@ import { setTurnStartedAt, setYoloActive } from '@/store/session' -import { broadcastSessionsChanged } from '@/store/session-sync' import { clearSessionSubagents, pruneDelegateFallbackSubagents, upsertSubagent } from '@/store/subagents' -import { setSessionTodos } from '@/store/todos' import { recordToolDiff } from '@/store/tool-diffs' import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events' import type { RpcEvent } from '@/types/hermes' -import type { ClientSessionState } from '../../types' +import type { ClientSessionState } from '../../../types' -interface MessageStreamOptions { +import { hasSessionInfoStatePatch, sessionInfoStatePatch, SUBAGENT_EVENT_TYPES, toTodoPayload } from './utils' + +interface GatewayEventDeps { activeSessionIdRef: MutableRefObject - hydrateFromStoredSession: ( - attempts?: number, - storedSessionId?: string | null, - runtimeSessionId?: string | null - ) => Promise + compactedTurnRef: MutableRefObject> + lastCwdInfoSessionRef: MutableRefObject + nativeSubagentSessionsRef: MutableRefObject> + appendAssistantDelta: (sessionId: string, delta: string) => void + appendReasoningDelta: (sessionId: string, delta: string, replace?: boolean) => void + completeAssistantMessage: (sessionId: string, text: string) => void + failAssistantMessage: (sessionId: string, errorMessage: string) => void + flushQueuedDeltas: (sessionId?: string) => void queryClient: QueryClient refreshHermesConfig: () => Promise - refreshSessions: () => Promise - sessionStateByRuntimeIdRef: MutableRefObject> + sessionInterrupted: (sessionId: string) => boolean updateSessionState: ( sessionId: string, updater: (state: ClientSessionState) => ClientSessionState, storedSessionId?: string | null ) => ClientSessionState + upsertToolCall: ( + sessionId: string, + payload: GatewayEventPayload | undefined, + phase: 'running' | 'complete', + sourceEventType?: string + ) => void } -interface QueuedStreamDeltas { - assistant: string - reasoning: string -} - -type SessionRuntimeStatePatch = Partial< - Pick< - ClientSessionState, - 'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo' - > -> - -function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch { - const patch: SessionRuntimeStatePatch = {} - - if (typeof payload?.model === 'string') { - patch.model = payload.model || '' - } - - if (typeof payload?.provider === 'string') { - patch.provider = payload.provider || '' - } - - if (typeof payload?.cwd === 'string') { - patch.cwd = payload.cwd - } - - if (typeof payload?.branch === 'string') { - patch.branch = payload.branch - } - - if (typeof payload?.personality === 'string') { - patch.personality = normalizePersonalityValue(payload.personality) - } - - if (typeof payload?.reasoning_effort === 'string') { - patch.reasoningEffort = payload.reasoning_effort - } - - if (typeof payload?.service_tier === 'string') { - patch.serviceTier = payload.service_tier - } - - if (typeof payload?.fast === 'boolean') { - patch.fast = payload.fast - } - - if (typeof payload?.yolo === 'boolean') { - patch.yolo = payload.yolo - } - - return patch -} - -function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean { - return Object.keys(patch).length > 0 -} - -// Minimum gap between two assistant-text flushes during a stream. Was 16ms -// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every -// token got its own React commit + Streamdown markdown re-parse, scaling -// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens -// batch into one commit at 60 tok/sec without introducing visible lag on the -// streaming text (still 30 fps of visible text growth). Big perceived -// smoothness win on long messages with big trailing paragraphs; see -// `scripts/profile-typing-lag.md` for the measurement work behind this. -const STREAM_DELTA_FLUSH_MS = 33 - -// Gateway/provider failures sometimes arrive as message.complete text instead -// of an explicit error event. Treat matches as inline assistant errors so they -// persist like real error events and don't get erased by hydrate fallback. -const COMPLETION_ERROR_PATTERNS = [ - /^API call failed after \d+ retries:/i, - /^HTTP\s+\d{3}\b/i, - /^(Provider|Gateway)\s+error:/i -] - -function completionErrorText(finalText: string): string | null { - const text = finalText.trim() - - return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null -} - -const SUBAGENT_EVENT_TYPES = new Set([ - 'subagent.spawn_requested', - 'subagent.start', - 'subagent.thinking', - 'subagent.tool', - 'subagent.progress', - 'subagent.complete' -]) - -// Anonymous progress events that carry todos but no name still belong to the -// todo stream; named todo events are obviously routed there too. -function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined { - if (!payload) { - return undefined - } - - const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos')) - - return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined -} - -function asRecord(value: unknown): Record { - return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {} -} - -function parseMaybeRecord(value: unknown): Record { - if (typeof value === 'string') { - try { - return asRecord(JSON.parse(value)) - } catch { - return {} - } - } - - return asRecord(value) -} - -const firstString = (...candidates: unknown[]): string => { - for (const v of candidates) { - if (typeof v === 'string' && v) { - return v - } - } - - return '' -} - -function delegateTaskPayloads( - payload: GatewayEventPayload | undefined, - phase: 'running' | 'complete', - sourceEventType?: string -): Record[] { - if (payload?.name !== 'delegate_task') { - return [] - } - - const args = parseMaybeRecord(payload.args ?? payload.input) - const result = parseMaybeRecord(payload.result) - const rawTasks = Array.isArray(args.tasks) ? args.tasks : [] - const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args] - const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running' - const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task' - const progressText = firstString(payload.preview, payload.message, payload.context) - - const eventType = - phase === 'complete' - ? 'subagent.complete' - : sourceEventType === 'tool.start' - ? 'subagent.start' - : 'subagent.progress' - - return tasks.map((task, index) => { - const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task' - const summary = firstString(result.summary, payload.summary, payload.message) - - return { - depth: 0, - duration_seconds: payload.duration_s, - goal, - status, - subagent_id: `delegate-tool:${toolId}:${index}`, - summary: summary || undefined, - task_count: tasks.length, - task_index: index, - text: eventType === 'subagent.progress' ? progressText || goal : undefined, - tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined, - tool_preview: eventType === 'subagent.start' ? progressText : undefined, - toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [], - event_type: eventType, - output_tail: - phase === 'complete' && summary - ? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }] - : undefined - } - }) -} - -export function useMessageStream({ - activeSessionIdRef, - hydrateFromStoredSession, - queryClient, - refreshHermesConfig, - refreshSessions, - sessionStateByRuntimeIdRef, - updateSessionState -}: MessageStreamOptions) { - const sessionInterrupted = useCallback( - (sessionId: string) => sessionStateByRuntimeIdRef.current.get(sessionId)?.interrupted ?? false, - [sessionStateByRuntimeIdRef] - ) - - // Patch the in-flight assistant message (or seed it). Centralises the - // streamId/groupId bookkeeping every event callback would otherwise repeat. - const mutateStream = useCallback( - ( - sessionId: string, - transform: (parts: ChatMessagePart[], message: ChatMessage) => ChatMessagePart[], - seed: () => ChatMessagePart[], - opts: { - pending?: (message: ChatMessage) => boolean - } = {} - ) => { - const apply = () => { - updateSessionState(sessionId, state => { - // After a stop, drop any late deltas / tool events for the - // cancelled turn so they don't keep growing the (now finalized) - // assistant bubble or, worse, seed a brand-new bubble that - // appears to belong to the next user message. - if (state.interrupted) { - return state - } - - const streamId = state.streamId ?? `assistant-stream-${Date.now()}` - const groupId = state.pendingBranchGroup ?? undefined - const prev = state.messages - let nextMessages: ChatMessage[] - - if (!prev.some(m => m.id === streamId)) { - nextMessages = [ - ...prev, - { - id: streamId, - role: 'assistant', - parts: seed(), - pending: true, - branchGroupId: groupId - } - ] - } else { - nextMessages = prev.map(m => - m.id === streamId - ? { - ...m, - parts: transform(m.parts, m), - pending: opts.pending ? opts.pending(m) : true - } - : m - ) - } - - return { - ...state, - messages: nextMessages, - streamId, - sawAssistantPayload: true, - awaitingResponse: false - } - }) - } - - apply() - }, - [updateSessionState] - ) - - const queuedDeltasRef = useRef>(new Map()) - const flushHandleRef = useRef(null) - const lastFlushAtRef = useRef(0) - const nativeSubagentSessionsRef = useRef>(new Set()) - // Turns that auto-compacted: skip post-turn hydrate so live scrollback survives. - const compactedTurnRef = useRef>(new Set()) - // Last session we applied a session.info cwd for — lets us tell an agent - // relocating the SAME session (follow it) from a session switch (don't yank). - const lastCwdInfoSessionRef = useRef(null) - - const flushQueuedDeltas = useCallback( - (sessionId?: string) => { - const queue = queuedDeltasRef.current - const ids = sessionId ? [sessionId] : [...queue.keys()] - - for (const id of ids) { - const queued = queue.get(id) - - if (!queued) { - continue - } - - queue.delete(id) - - if (queued.assistant) { - mutateStream( - id, - parts => dedupeGeneratedImageEchoesInParts(appendAssistantTextPart(parts, queued.assistant)), - () => [assistantTextPart(queued.assistant)] - ) - } - - if (queued.reasoning) { - mutateStream( - id, - parts => appendReasoningPart(parts, queued.reasoning), - () => [reasoningPart(queued.reasoning)] - ) - } - } - }, - [mutateStream] - ) - - const scheduleDeltaFlush = useCallback(() => { - if (flushHandleRef.current !== null) { - return - } - - if (typeof window === 'undefined') { - flushQueuedDeltas() - - return - } - - // Enforce a floor on the gap between two flushes. Without it, an LLM - // emitting tokens slower than the rAF cadence (~30-80 tok/sec is typical) - // forces one React commit + Streamdown re-parse per token, and the - // last-block markdown re-parse cost is roughly linear in current block - // length. With this floor, slower streams still coalesce ~2 tokens per - // commit and the synthetic harness shows longtask counts drop from ~5/5s - // to ~1/5s on big sessions (see scripts/profile-typing-lag.md). - const sinceLast = performance.now() - lastFlushAtRef.current - - const runFlush = () => { - flushHandleRef.current = null - lastFlushAtRef.current = performance.now() - flushQueuedDeltas() - } - - if (sinceLast >= STREAM_DELTA_FLUSH_MS && typeof window.requestAnimationFrame === 'function') { - flushHandleRef.current = window.requestAnimationFrame(runFlush) - - return - } - - flushHandleRef.current = window.setTimeout(runFlush, Math.max(0, STREAM_DELTA_FLUSH_MS - sinceLast)) - }, [flushQueuedDeltas]) - - const queueDelta = useCallback( - (sessionId: string, key: keyof QueuedStreamDeltas, delta: string) => { - if (!delta) { - return - } - - const queued = queuedDeltasRef.current.get(sessionId) ?? { assistant: '', reasoning: '' } - queued[key] += delta - queuedDeltasRef.current.set(sessionId, queued) - scheduleDeltaFlush() - }, - [scheduleDeltaFlush] - ) - - useEffect( - () => () => { - if (flushHandleRef.current !== null && typeof window !== 'undefined') { - if (typeof window.cancelAnimationFrame === 'function') { - window.cancelAnimationFrame(flushHandleRef.current) - } else { - window.clearTimeout(flushHandleRef.current) - } - } - - flushHandleRef.current = null - flushQueuedDeltas() - }, - [flushQueuedDeltas] - ) - - const appendAssistantDelta = useCallback( - (sessionId: string, delta: string) => { - if (!delta) { - return - } - - queueDelta(sessionId, 'assistant', delta) - }, - [queueDelta] - ) - - const appendReasoningDelta = useCallback( - (sessionId: string, delta: string, replace = false) => { - if (!delta) { - return - } - - if (!replace) { - queueDelta(sessionId, 'reasoning', delta) - - return - } - - flushQueuedDeltas(sessionId) - - mutateStream( - sessionId, - (parts, message) => { - if (replace && chatMessageText(message).trim()) { - return parts - } - - if (replace) { - return [...parts.filter(part => part.type !== 'reasoning'), reasoningPart(delta)] - } - - return appendReasoningPart(parts, delta) - }, - () => [reasoningPart(delta)] - ) - }, - [flushQueuedDeltas, mutateStream, queueDelta] - ) - - const upsertToolCall = useCallback( - ( - sessionId: string, - payload: GatewayEventPayload | undefined, - phase: 'running' | 'complete', - sourceEventType?: string - ) => { - // Text deltas flush on a timer but tool events apply now; flush first so - // a tool part can't jump ahead of the text that preceded it. - flushQueuedDeltas(sessionId) - - if (sessionInterrupted(sessionId)) { - return - } - - // The composer status stack owns todo display now (no inline panel) — - // mirror every todo state the tool reports into its session store. - if (payload?.name === 'todo') { - const todos = parseTodos(payload.todos) ?? parseTodos(payload.result) ?? parseTodos(payload.args) - - if (todos) { - setSessionTodos(sessionId, todos) - } - } - - if (!nativeSubagentSessionsRef.current.has(sessionId)) { - for (const subagentPayload of delegateTaskPayloads(payload, phase, sourceEventType)) { - upsertSubagent( - sessionId, - subagentPayload, - true, - phase === 'complete' ? 'delegate.complete' : 'delegate.running' - ) - } - } - - mutateStream( - sessionId, - parts => dedupeGeneratedImageEchoesInParts(upsertToolPart(parts, payload, phase)), - () => upsertToolPart([], payload, phase), - { pending: m => phase !== 'complete' || (m.pending ?? false) } - ) - }, - [flushQueuedDeltas, mutateStream, sessionInterrupted] - ) - - const completeAssistantMessage = useCallback( - (sessionId: string, text: string) => { - let shouldHydrate = false - - const completedState = updateSessionState(sessionId, state => { - // Late completion from an already-cancelled turn: cancelRun has - // already finalized the bubble (kept the partial text, dropped it if - // empty). Re-running the dedupe below would replace the partial with - // the just-cancelled full text, so we settle and bail instead. - if (state.interrupted) { - return { - ...state, - awaitingResponse: false, - busy: false, - needsInput: false, - pendingBranchGroup: null, - streamId: null, - turnStartedAt: null - } - } - - const streamId = state.streamId - const finalText = renderMediaTags(text).trim() - const completionError = completionErrorText(finalText) - const normalize = (value: string) => value.replace(/\s+/g, ' ').trim() - - const replaceTextPart = (parts: ChatMessagePart[]) => { - const visibleFinalText = stripGeneratedImageEchoes(finalText, generatedImageEchoSources(parts)).trim() - const dedupeReference = normalize(visibleFinalText) - - const kept = parts.filter(part => { - if (part.type === 'text') { - return false - } - - if (part.type !== 'reasoning' || !dedupeReference) { - return true - } - - const r = normalize(part.text) - - return !(r && (dedupeReference.startsWith(r) || r.startsWith(dedupeReference))) - }) - - return visibleFinalText ? [...kept, assistantTextPart(visibleFinalText)] : kept - } - - const completeMessage = (message: ChatMessage): ChatMessage => - completionError - ? { - ...message, - error: completionError, - parts: message.parts.filter(part => part.type !== 'text'), - pending: false - } - : { - ...message, - parts: replaceTextPart(message.parts), - pending: false - } - - const newAssistantFromCompletion = (): ChatMessage => ({ - id: `assistant-${Date.now()}`, - role: 'assistant', - parts: completionError ? [] : [assistantTextPart(finalText)], - branchGroupId: state.pendingBranchGroup ?? undefined, - ...(completionError && { error: completionError }) - }) - - const prev = state.messages - let nextMessages = prev - - if (streamId && prev.some(m => m.id === streamId)) { - nextMessages = prev.map(m => (m.id === streamId ? completeMessage(m) : m)) - } else { - const fallbackIndex = [...prev] - .reverse() - .findIndex(message => message.role === 'assistant' && !message.hidden) - - if (fallbackIndex >= 0) { - const index = prev.length - 1 - fallbackIndex - const existing = prev[index] - const existingText = chatMessageText(existing).trim() - - if (existing.pending || (finalText && existingText === finalText)) { - nextMessages = prev.map((message, messageIndex) => - messageIndex === index ? completeMessage(message) : message - ) - } else if (finalText) { - nextMessages = [...prev, newAssistantFromCompletion()] - } - } else if (finalText) { - nextMessages = [...prev, newAssistantFromCompletion()] - } - } - - const hasInlineError = nextMessages.some(m => m.role === 'assistant' && m.error && !m.hidden) - const lastVisible = [...nextMessages].reverse().find(m => !m.hidden) - const unresolvedUserTail = lastVisible?.role === 'user' - shouldHydrate = - !completionError && !hasInlineError && !unresolvedUserTail && (!state.sawAssistantPayload || !finalText) - - return { - ...state, - messages: nextMessages, - streamId: null, - pendingBranchGroup: null, - awaitingResponse: false, - busy: false, - needsInput: false, - turnStartedAt: null - } - }) - - void refreshSessions().catch(() => undefined) - // Sync the freshly-titled row to other windows (e.g. main, when the turn - // ran in the pop-out). - broadcastSessionsChanged() - - if (compactedTurnRef.current.delete(sessionId)) { - shouldHydrate = false - } - - if (shouldHydrate) { - void hydrateFromStoredSession(3, completedState.storedSessionId, sessionId) - } - - dispatchNativeNotification({ - body: text.slice(0, 140) || translateNow('notifications.native.turnDoneBody'), - kind: 'turnDone', - sessionId, - title: translateNow('notifications.native.turnDoneTitle') - }) - }, - [hydrateFromStoredSession, refreshSessions, updateSessionState] - ) - - const failAssistantMessage = useCallback( - (sessionId: string, errorMessage: string) => { - updateSessionState(sessionId, state => { - const streamId = state.streamId ?? `assistant-error-${Date.now()}` - const groupId = state.pendingBranchGroup ?? undefined - const prev = state.messages - const error = errorMessage.trim() || 'Hermes reported an error' - - const nextMessages = prev.some(m => m.id === streamId) - ? prev.map(message => - message.id === streamId - ? { - ...message, - error, - pending: false - } - : message - ) - : [ - ...prev, - { - id: streamId, - role: 'assistant' as const, - parts: [], - error, - pending: false, - branchGroupId: groupId - } - ] - - return { - ...state, - messages: nextMessages, - streamId: null, - pendingBranchGroup: null, - sawAssistantPayload: true, - awaitingResponse: false, - busy: false, - needsInput: false, - turnStartedAt: null - } - }) - }, - [updateSessionState] - ) - - const handleGatewayEvent = useCallback( +/** The gateway-event dispatcher, extracted from useMessageStream. */ +export function useGatewayEventHandler(deps: GatewayEventDeps) { + const { + appendAssistantDelta, + appendReasoningDelta, + activeSessionIdRef, + compactedTurnRef, + lastCwdInfoSessionRef, + nativeSubagentSessionsRef, + completeAssistantMessage, + failAssistantMessage, + flushQueuedDeltas, + queryClient, + refreshHermesConfig, + sessionInterrupted, + updateSessionState, + upsertToolCall + } = deps + + return useCallback( (event: RpcEvent) => { const payload = event.payload as GatewayEventPayload | undefined const explicitSid = event.session_id || '' @@ -1264,9 +633,12 @@ export function useMessageStream({ appendAssistantDelta, appendReasoningDelta, activeSessionIdRef, + compactedTurnRef, completeAssistantMessage, failAssistantMessage, flushQueuedDeltas, + lastCwdInfoSessionRef, + nativeSubagentSessionsRef, queryClient, refreshHermesConfig, sessionInterrupted, @@ -1274,12 +646,4 @@ export function useMessageStream({ upsertToolCall ] ) - - return { - appendAssistantDelta, - appendReasoningDelta, - completeAssistantMessage, - handleGatewayEvent, - upsertToolCall - } } diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/index.ts b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts new file mode 100644 index 00000000000..65a203a215e --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts @@ -0,0 +1,540 @@ +import type { QueryClient } from '@tanstack/react-query' +import { type MutableRefObject, useCallback, useEffect, useRef } from 'react' + +import { translateNow } from '@/i18n' +import { + appendAssistantTextPart, + appendReasoningPart, + assistantTextPart, + type ChatMessage, + type ChatMessagePart, + chatMessageText, + type GatewayEventPayload, + reasoningPart, + renderMediaTags, + upsertToolPart +} from '@/lib/chat-messages' +import { + dedupeGeneratedImageEchoesInParts, + generatedImageEchoSources, + stripGeneratedImageEchoes +} from '@/lib/generated-images' +import { parseTodos } from '@/lib/todos' +import { dispatchNativeNotification } from '@/store/native-notifications' +import { broadcastSessionsChanged } from '@/store/session-sync' +import { upsertSubagent } from '@/store/subagents' +import { setSessionTodos } from '@/store/todos' + +import type { ClientSessionState } from '../../../types' + +import { useGatewayEventHandler } from './gateway-event' +import { completionErrorText, delegateTaskPayloads, STREAM_DELTA_FLUSH_MS } from './utils' + +interface MessageStreamOptions { + activeSessionIdRef: MutableRefObject + hydrateFromStoredSession: ( + attempts?: number, + storedSessionId?: string | null, + runtimeSessionId?: string | null + ) => Promise + queryClient: QueryClient + refreshHermesConfig: () => Promise + refreshSessions: () => Promise + sessionStateByRuntimeIdRef: MutableRefObject> + updateSessionState: ( + sessionId: string, + updater: (state: ClientSessionState) => ClientSessionState, + storedSessionId?: string | null + ) => ClientSessionState +} + +interface QueuedStreamDeltas { + assistant: string + reasoning: string +} + +export function useMessageStream({ + activeSessionIdRef, + hydrateFromStoredSession, + queryClient, + refreshHermesConfig, + refreshSessions, + sessionStateByRuntimeIdRef, + updateSessionState +}: MessageStreamOptions) { + const sessionInterrupted = useCallback( + (sessionId: string) => sessionStateByRuntimeIdRef.current.get(sessionId)?.interrupted ?? false, + [sessionStateByRuntimeIdRef] + ) + + // Patch the in-flight assistant message (or seed it). Centralises the + // streamId/groupId bookkeeping every event callback would otherwise repeat. + const mutateStream = useCallback( + ( + sessionId: string, + transform: (parts: ChatMessagePart[], message: ChatMessage) => ChatMessagePart[], + seed: () => ChatMessagePart[], + opts: { + pending?: (message: ChatMessage) => boolean + } = {} + ) => { + const apply = () => { + updateSessionState(sessionId, state => { + // After a stop, drop any late deltas / tool events for the + // cancelled turn so they don't keep growing the (now finalized) + // assistant bubble or, worse, seed a brand-new bubble that + // appears to belong to the next user message. + if (state.interrupted) { + return state + } + + const streamId = state.streamId ?? `assistant-stream-${Date.now()}` + const groupId = state.pendingBranchGroup ?? undefined + const prev = state.messages + let nextMessages: ChatMessage[] + + if (!prev.some(m => m.id === streamId)) { + nextMessages = [ + ...prev, + { + id: streamId, + role: 'assistant', + parts: seed(), + pending: true, + branchGroupId: groupId + } + ] + } else { + nextMessages = prev.map(m => + m.id === streamId + ? { + ...m, + parts: transform(m.parts, m), + pending: opts.pending ? opts.pending(m) : true + } + : m + ) + } + + return { + ...state, + messages: nextMessages, + streamId, + sawAssistantPayload: true, + awaitingResponse: false + } + }) + } + + apply() + }, + [updateSessionState] + ) + + const queuedDeltasRef = useRef>(new Map()) + const flushHandleRef = useRef(null) + const lastFlushAtRef = useRef(0) + const nativeSubagentSessionsRef = useRef>(new Set()) + // Turns that auto-compacted: skip post-turn hydrate so live scrollback survives. + const compactedTurnRef = useRef>(new Set()) + // Last session we applied a session.info cwd for — lets us tell an agent + // relocating the SAME session (follow it) from a session switch (don't yank). + const lastCwdInfoSessionRef = useRef(null) + + const flushQueuedDeltas = useCallback( + (sessionId?: string) => { + const queue = queuedDeltasRef.current + const ids = sessionId ? [sessionId] : [...queue.keys()] + + for (const id of ids) { + const queued = queue.get(id) + + if (!queued) { + continue + } + + queue.delete(id) + + if (queued.assistant) { + mutateStream( + id, + parts => dedupeGeneratedImageEchoesInParts(appendAssistantTextPart(parts, queued.assistant)), + () => [assistantTextPart(queued.assistant)] + ) + } + + if (queued.reasoning) { + mutateStream( + id, + parts => appendReasoningPart(parts, queued.reasoning), + () => [reasoningPart(queued.reasoning)] + ) + } + } + }, + [mutateStream] + ) + + const scheduleDeltaFlush = useCallback(() => { + if (flushHandleRef.current !== null) { + return + } + + if (typeof window === 'undefined') { + flushQueuedDeltas() + + return + } + + // Enforce a floor on the gap between two flushes. Without it, an LLM + // emitting tokens slower than the rAF cadence (~30-80 tok/sec is typical) + // forces one React commit + Streamdown re-parse per token, and the + // last-block markdown re-parse cost is roughly linear in current block + // length. With this floor, slower streams still coalesce ~2 tokens per + // commit and the synthetic harness shows longtask counts drop from ~5/5s + // to ~1/5s on big sessions (see scripts/profile-typing-lag.md). + const sinceLast = performance.now() - lastFlushAtRef.current + + const runFlush = () => { + flushHandleRef.current = null + lastFlushAtRef.current = performance.now() + flushQueuedDeltas() + } + + if (sinceLast >= STREAM_DELTA_FLUSH_MS && typeof window.requestAnimationFrame === 'function') { + flushHandleRef.current = window.requestAnimationFrame(runFlush) + + return + } + + flushHandleRef.current = window.setTimeout(runFlush, Math.max(0, STREAM_DELTA_FLUSH_MS - sinceLast)) + }, [flushQueuedDeltas]) + + const queueDelta = useCallback( + (sessionId: string, key: keyof QueuedStreamDeltas, delta: string) => { + if (!delta) { + return + } + + const queued = queuedDeltasRef.current.get(sessionId) ?? { assistant: '', reasoning: '' } + queued[key] += delta + queuedDeltasRef.current.set(sessionId, queued) + scheduleDeltaFlush() + }, + [scheduleDeltaFlush] + ) + + useEffect( + () => () => { + if (flushHandleRef.current !== null && typeof window !== 'undefined') { + if (typeof window.cancelAnimationFrame === 'function') { + window.cancelAnimationFrame(flushHandleRef.current) + } else { + window.clearTimeout(flushHandleRef.current) + } + } + + flushHandleRef.current = null + flushQueuedDeltas() + }, + [flushQueuedDeltas] + ) + + const appendAssistantDelta = useCallback( + (sessionId: string, delta: string) => { + if (!delta) { + return + } + + queueDelta(sessionId, 'assistant', delta) + }, + [queueDelta] + ) + + const appendReasoningDelta = useCallback( + (sessionId: string, delta: string, replace = false) => { + if (!delta) { + return + } + + if (!replace) { + queueDelta(sessionId, 'reasoning', delta) + + return + } + + flushQueuedDeltas(sessionId) + + mutateStream( + sessionId, + (parts, message) => { + if (replace && chatMessageText(message).trim()) { + return parts + } + + if (replace) { + return [...parts.filter(part => part.type !== 'reasoning'), reasoningPart(delta)] + } + + return appendReasoningPart(parts, delta) + }, + () => [reasoningPart(delta)] + ) + }, + [flushQueuedDeltas, mutateStream, queueDelta] + ) + + const upsertToolCall = useCallback( + ( + sessionId: string, + payload: GatewayEventPayload | undefined, + phase: 'running' | 'complete', + sourceEventType?: string + ) => { + // Text deltas flush on a timer but tool events apply now; flush first so + // a tool part can't jump ahead of the text that preceded it. + flushQueuedDeltas(sessionId) + + if (sessionInterrupted(sessionId)) { + return + } + + // The composer status stack owns todo display now (no inline panel) — + // mirror every todo state the tool reports into its session store. + if (payload?.name === 'todo') { + const todos = parseTodos(payload.todos) ?? parseTodos(payload.result) ?? parseTodos(payload.args) + + if (todos) { + setSessionTodos(sessionId, todos) + } + } + + if (!nativeSubagentSessionsRef.current.has(sessionId)) { + for (const subagentPayload of delegateTaskPayloads(payload, phase, sourceEventType)) { + upsertSubagent( + sessionId, + subagentPayload, + true, + phase === 'complete' ? 'delegate.complete' : 'delegate.running' + ) + } + } + + mutateStream( + sessionId, + parts => dedupeGeneratedImageEchoesInParts(upsertToolPart(parts, payload, phase)), + () => upsertToolPart([], payload, phase), + { pending: m => phase !== 'complete' || (m.pending ?? false) } + ) + }, + [flushQueuedDeltas, mutateStream, sessionInterrupted] + ) + + const completeAssistantMessage = useCallback( + (sessionId: string, text: string) => { + let shouldHydrate = false + + const completedState = updateSessionState(sessionId, state => { + // Late completion from an already-cancelled turn: cancelRun has + // already finalized the bubble (kept the partial text, dropped it if + // empty). Re-running the dedupe below would replace the partial with + // the just-cancelled full text, so we settle and bail instead. + if (state.interrupted) { + return { + ...state, + awaitingResponse: false, + busy: false, + needsInput: false, + pendingBranchGroup: null, + streamId: null, + turnStartedAt: null + } + } + + const streamId = state.streamId + const finalText = renderMediaTags(text).trim() + const completionError = completionErrorText(finalText) + const normalize = (value: string) => value.replace(/\s+/g, ' ').trim() + + const replaceTextPart = (parts: ChatMessagePart[]) => { + const visibleFinalText = stripGeneratedImageEchoes(finalText, generatedImageEchoSources(parts)).trim() + const dedupeReference = normalize(visibleFinalText) + + const kept = parts.filter(part => { + if (part.type === 'text') { + return false + } + + if (part.type !== 'reasoning' || !dedupeReference) { + return true + } + + const r = normalize(part.text) + + return !(r && (dedupeReference.startsWith(r) || r.startsWith(dedupeReference))) + }) + + return visibleFinalText ? [...kept, assistantTextPart(visibleFinalText)] : kept + } + + const completeMessage = (message: ChatMessage): ChatMessage => + completionError + ? { + ...message, + error: completionError, + parts: message.parts.filter(part => part.type !== 'text'), + pending: false + } + : { + ...message, + parts: replaceTextPart(message.parts), + pending: false + } + + const newAssistantFromCompletion = (): ChatMessage => ({ + id: `assistant-${Date.now()}`, + role: 'assistant', + parts: completionError ? [] : [assistantTextPart(finalText)], + branchGroupId: state.pendingBranchGroup ?? undefined, + ...(completionError && { error: completionError }) + }) + + const prev = state.messages + let nextMessages = prev + + if (streamId && prev.some(m => m.id === streamId)) { + nextMessages = prev.map(m => (m.id === streamId ? completeMessage(m) : m)) + } else { + const fallbackIndex = [...prev] + .reverse() + .findIndex(message => message.role === 'assistant' && !message.hidden) + + if (fallbackIndex >= 0) { + const index = prev.length - 1 - fallbackIndex + const existing = prev[index] + const existingText = chatMessageText(existing).trim() + + if (existing.pending || (finalText && existingText === finalText)) { + nextMessages = prev.map((message, messageIndex) => + messageIndex === index ? completeMessage(message) : message + ) + } else if (finalText) { + nextMessages = [...prev, newAssistantFromCompletion()] + } + } else if (finalText) { + nextMessages = [...prev, newAssistantFromCompletion()] + } + } + + const hasInlineError = nextMessages.some(m => m.role === 'assistant' && m.error && !m.hidden) + const lastVisible = [...nextMessages].reverse().find(m => !m.hidden) + const unresolvedUserTail = lastVisible?.role === 'user' + shouldHydrate = + !completionError && !hasInlineError && !unresolvedUserTail && (!state.sawAssistantPayload || !finalText) + + return { + ...state, + messages: nextMessages, + streamId: null, + pendingBranchGroup: null, + awaitingResponse: false, + busy: false, + needsInput: false, + turnStartedAt: null + } + }) + + void refreshSessions().catch(() => undefined) + // Sync the freshly-titled row to other windows (e.g. main, when the turn + // ran in the pop-out). + broadcastSessionsChanged() + + if (compactedTurnRef.current.delete(sessionId)) { + shouldHydrate = false + } + + if (shouldHydrate) { + void hydrateFromStoredSession(3, completedState.storedSessionId, sessionId) + } + + dispatchNativeNotification({ + body: text.slice(0, 140) || translateNow('notifications.native.turnDoneBody'), + kind: 'turnDone', + sessionId, + title: translateNow('notifications.native.turnDoneTitle') + }) + }, + [hydrateFromStoredSession, refreshSessions, updateSessionState] + ) + + const failAssistantMessage = useCallback( + (sessionId: string, errorMessage: string) => { + updateSessionState(sessionId, state => { + const streamId = state.streamId ?? `assistant-error-${Date.now()}` + const groupId = state.pendingBranchGroup ?? undefined + const prev = state.messages + const error = errorMessage.trim() || 'Hermes reported an error' + + const nextMessages = prev.some(m => m.id === streamId) + ? prev.map(message => + message.id === streamId + ? { + ...message, + error, + pending: false + } + : message + ) + : [ + ...prev, + { + id: streamId, + role: 'assistant' as const, + parts: [], + error, + pending: false, + branchGroupId: groupId + } + ] + + return { + ...state, + messages: nextMessages, + streamId: null, + pendingBranchGroup: null, + sawAssistantPayload: true, + awaitingResponse: false, + busy: false, + needsInput: false, + turnStartedAt: null + } + }) + }, + [updateSessionState] + ) + + const handleGatewayEvent = useGatewayEventHandler({ + appendAssistantDelta, + appendReasoningDelta, + activeSessionIdRef, + compactedTurnRef, + lastCwdInfoSessionRef, + nativeSubagentSessionsRef, + completeAssistantMessage, + failAssistantMessage, + flushQueuedDeltas, + queryClient, + refreshHermesConfig, + sessionInterrupted, + updateSessionState, + upsertToolCall + }) + + return { + appendAssistantDelta, + appendReasoningDelta, + completeAssistantMessage, + handleGatewayEvent, + upsertToolCall + } +} diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts b/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts new file mode 100644 index 00000000000..47994355074 --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from 'vitest' + +import type { GatewayEventPayload } from '@/lib/chat-messages' + +import { + completionErrorText, + delegateTaskPayloads, + hasSessionInfoStatePatch, + sessionInfoStatePatch, + toTodoPayload +} from './utils' + +const payload = (over: Record): GatewayEventPayload => over as GatewayEventPayload + +describe('completionErrorText', () => { + it('flags provider/HTTP/retry failures, ignores normal text', () => { + expect(completionErrorText('API call failed after 3 retries: boom')).toMatch(/^API call failed/) + expect(completionErrorText('HTTP 500 upstream')).toMatch(/^HTTP 500/) + expect(completionErrorText('Gateway error: nope')).toMatch(/^Gateway error/) + expect(completionErrorText('here is your answer')).toBeNull() + expect(completionErrorText(' ')).toBeNull() + }) +}) + +describe('toTodoPayload', () => { + it('routes named todo and anonymous todos-bearing events to the todo stream', () => { + expect(toTodoPayload(payload({ name: 'todo' }))?.tool_id).toBe('todo-live') + expect(toTodoPayload(payload({ todos: [] }))?.name).toBe('todo') + expect(toTodoPayload(payload({ name: 'web_search' }))).toBeUndefined() + expect(toTodoPayload(undefined)).toBeUndefined() + }) +}) + +describe('sessionInfoStatePatch / hasSessionInfoStatePatch', () => { + it('extracts only present runtime fields', () => { + const patch = sessionInfoStatePatch(payload({ model: 'gpt', fast: true, branch: 'main' })) + expect(patch).toMatchObject({ model: 'gpt', fast: true, branch: 'main' }) + expect(hasSessionInfoStatePatch(patch)).toBe(true) + expect(hasSessionInfoStatePatch(sessionInfoStatePatch(payload({})))).toBe(false) + }) +}) + +describe('delegateTaskPayloads', () => { + it('returns [] for non-delegate events', () => { + expect(delegateTaskPayloads(payload({ name: 'web_search' }), 'running')).toEqual([]) + }) + + it('maps a running tool.start to a subagent.start spec', () => { + const [spec] = delegateTaskPayloads( + payload({ name: 'delegate_task', tool_id: 't1', args: { goal: 'do it' } }), + 'running', + 'tool.start' + ) + + expect(spec).toMatchObject({ event_type: 'subagent.start', goal: 'do it', status: 'running' }) + }) + + it('maps completion (with error) to a failed subagent.complete', () => { + const [spec] = delegateTaskPayloads( + payload({ name: 'delegate_task', error: 'boom', result: { summary: 'failed run' } }), + 'complete' + ) + + expect(spec).toMatchObject({ event_type: 'subagent.complete', status: 'failed' }) + }) +}) diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts b/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts new file mode 100644 index 00000000000..d9a90b36692 --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts @@ -0,0 +1,179 @@ +import type { GatewayEventPayload } from '@/lib/chat-messages' +import { normalizePersonalityValue } from '@/lib/chat-runtime' + +import type { ClientSessionState } from '../../../types' + +type SessionRuntimeStatePatch = Partial< + Pick< + ClientSessionState, + 'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo' + > +> + +export function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch { + const patch: SessionRuntimeStatePatch = {} + + if (typeof payload?.model === 'string') { + patch.model = payload.model || '' + } + + if (typeof payload?.provider === 'string') { + patch.provider = payload.provider || '' + } + + if (typeof payload?.cwd === 'string') { + patch.cwd = payload.cwd + } + + if (typeof payload?.branch === 'string') { + patch.branch = payload.branch + } + + if (typeof payload?.personality === 'string') { + patch.personality = normalizePersonalityValue(payload.personality) + } + + if (typeof payload?.reasoning_effort === 'string') { + patch.reasoningEffort = payload.reasoning_effort + } + + if (typeof payload?.service_tier === 'string') { + patch.serviceTier = payload.service_tier + } + + if (typeof payload?.fast === 'boolean') { + patch.fast = payload.fast + } + + if (typeof payload?.yolo === 'boolean') { + patch.yolo = payload.yolo + } + + return patch +} + +export function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean { + return Object.keys(patch).length > 0 +} + +// Minimum gap between two assistant-text flushes during a stream. Was 16ms +// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every +// token got its own React commit + Streamdown markdown re-parse, scaling +// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens +// batch into one commit at 60 tok/sec without introducing visible lag on the +// streaming text (still 30 fps of visible text growth). Big perceived +// smoothness win on long messages with big trailing paragraphs; see +// `scripts/profile-typing-lag.md` for the measurement work behind this. +export const STREAM_DELTA_FLUSH_MS = 33 + +// Gateway/provider failures sometimes arrive as message.complete text instead +// of an explicit error event. Treat matches as inline assistant errors so they +// persist like real error events and don't get erased by hydrate fallback. +const COMPLETION_ERROR_PATTERNS = [ + /^API call failed after \d+ retries:/i, + /^HTTP\s+\d{3}\b/i, + /^(Provider|Gateway)\s+error:/i +] + +export function completionErrorText(finalText: string): string | null { + const text = finalText.trim() + + return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null +} + +export const SUBAGENT_EVENT_TYPES = new Set([ + 'subagent.spawn_requested', + 'subagent.start', + 'subagent.thinking', + 'subagent.tool', + 'subagent.progress', + 'subagent.complete' +]) + +// Anonymous progress events that carry todos but no name still belong to the +// todo stream; named todo events are obviously routed there too. +export function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined { + if (!payload) { + return undefined + } + + const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos')) + + return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined +} + +function asRecord(value: unknown): Record { + return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {} +} + +function parseMaybeRecord(value: unknown): Record { + if (typeof value === 'string') { + try { + return asRecord(JSON.parse(value)) + } catch { + return {} + } + } + + return asRecord(value) +} + +const firstString = (...candidates: unknown[]): string => { + for (const v of candidates) { + if (typeof v === 'string' && v) { + return v + } + } + + return '' +} + +export function delegateTaskPayloads( + payload: GatewayEventPayload | undefined, + phase: 'running' | 'complete', + sourceEventType?: string +): Record[] { + if (payload?.name !== 'delegate_task') { + return [] + } + + const args = parseMaybeRecord(payload.args ?? payload.input) + const result = parseMaybeRecord(payload.result) + const rawTasks = Array.isArray(args.tasks) ? args.tasks : [] + const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args] + const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running' + const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task' + const progressText = firstString(payload.preview, payload.message, payload.context) + + const eventType = + phase === 'complete' + ? 'subagent.complete' + : sourceEventType === 'tool.start' + ? 'subagent.start' + : 'subagent.progress' + + return tasks.map((task, index) => { + const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task' + const summary = firstString(result.summary, payload.summary, payload.message) + + return { + depth: 0, + duration_seconds: payload.duration_s, + goal, + status, + subagent_id: `delegate-tool:${toolId}:${index}`, + summary: summary || undefined, + task_count: tasks.length, + task_index: index, + text: eventType === 'subagent.progress' ? progressText || goal : undefined, + tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined, + tool_preview: eventType === 'subagent.start' ? progressText : undefined, + toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [], + event_type: eventType, + output_tail: + phase === 'complete' && summary + ? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }] + : undefined + } + }) +}