Merge pull request #55501 from NousResearch/bb/desktop-split-message-stream

refactor(desktop): split use-message-stream (utils + gateway-event sub-hook)
This commit is contained in:
brooklyn! 2026-06-30 03:22:21 -05:00 committed by GitHub
commit 90f59ecdbb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 830 additions and 681 deletions

View file

@ -1,34 +1,16 @@
import type { QueryClient } from '@tanstack/react-query'
import { type MutableRefObject, useCallback, useEffect, useRef } from 'react'
import { type MutableRefObject, useCallback } from 'react'
import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream'
import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals'
import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer'
import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals'
import { translateNow } from '@/i18n'
import {
appendAssistantTextPart,
appendReasoningPart,
assistantTextPart,
type ChatMessage,
type ChatMessagePart,
chatMessageText,
type GatewayEventPayload,
reasoningPart,
renderMediaTags,
textPart,
upsertToolPart
} from '@/lib/chat-messages'
import { type GatewayEventPayload, textPart } from '@/lib/chat-messages'
import { coerceGatewayText, coerceThinkingText, normalizePersonalityValue } from '@/lib/chat-runtime'
import { playCompletionSound } from '@/lib/completion-sound'
import { gatewayEventRequiresSessionId } from '@/lib/gateway-events'
import {
dedupeGeneratedImageEchoesInParts,
generatedImageEchoSources,
stripGeneratedImageEchoes
} from '@/lib/generated-images'
import { triggerHaptic } from '@/lib/haptics'
import { isProviderSetupErrorMessage } from '@/lib/provider-setup-errors'
import { parseTodos } from '@/lib/todos'
import { clearClarifyRequest, setClarifyRequest } from '@/store/clarify'
import { setSessionCompacting } from '@/store/compaction'
import { refreshBackgroundProcesses } from '@/store/composer-status'
@ -54,674 +36,61 @@ import {
setTurnStartedAt,
setYoloActive
} from '@/store/session'
import { broadcastSessionsChanged } from '@/store/session-sync'
import { clearSessionSubagents, pruneDelegateFallbackSubagents, upsertSubagent } from '@/store/subagents'
import { setSessionTodos } from '@/store/todos'
import { recordToolDiff } from '@/store/tool-diffs'
import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events'
import type { RpcEvent } from '@/types/hermes'
import type { ClientSessionState } from '../../types'
import type { ClientSessionState } from '../../../types'
interface MessageStreamOptions {
import { hasSessionInfoStatePatch, sessionInfoStatePatch, SUBAGENT_EVENT_TYPES, toTodoPayload } from './utils'
interface GatewayEventDeps {
activeSessionIdRef: MutableRefObject<string | null>
hydrateFromStoredSession: (
attempts?: number,
storedSessionId?: string | null,
runtimeSessionId?: string | null
) => Promise<void>
compactedTurnRef: MutableRefObject<Set<string>>
lastCwdInfoSessionRef: MutableRefObject<string | null>
nativeSubagentSessionsRef: MutableRefObject<Set<string>>
appendAssistantDelta: (sessionId: string, delta: string) => void
appendReasoningDelta: (sessionId: string, delta: string, replace?: boolean) => void
completeAssistantMessage: (sessionId: string, text: string) => void
failAssistantMessage: (sessionId: string, errorMessage: string) => void
flushQueuedDeltas: (sessionId?: string) => void
queryClient: QueryClient
refreshHermesConfig: () => Promise<void>
refreshSessions: () => Promise<void>
sessionStateByRuntimeIdRef: MutableRefObject<Map<string, ClientSessionState>>
sessionInterrupted: (sessionId: string) => boolean
updateSessionState: (
sessionId: string,
updater: (state: ClientSessionState) => ClientSessionState,
storedSessionId?: string | null
) => ClientSessionState
upsertToolCall: (
sessionId: string,
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
) => void
}
interface QueuedStreamDeltas {
assistant: string
reasoning: string
}
type SessionRuntimeStatePatch = Partial<
Pick<
ClientSessionState,
'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo'
>
>
function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch {
const patch: SessionRuntimeStatePatch = {}
if (typeof payload?.model === 'string') {
patch.model = payload.model || ''
}
if (typeof payload?.provider === 'string') {
patch.provider = payload.provider || ''
}
if (typeof payload?.cwd === 'string') {
patch.cwd = payload.cwd
}
if (typeof payload?.branch === 'string') {
patch.branch = payload.branch
}
if (typeof payload?.personality === 'string') {
patch.personality = normalizePersonalityValue(payload.personality)
}
if (typeof payload?.reasoning_effort === 'string') {
patch.reasoningEffort = payload.reasoning_effort
}
if (typeof payload?.service_tier === 'string') {
patch.serviceTier = payload.service_tier
}
if (typeof payload?.fast === 'boolean') {
patch.fast = payload.fast
}
if (typeof payload?.yolo === 'boolean') {
patch.yolo = payload.yolo
}
return patch
}
function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean {
return Object.keys(patch).length > 0
}
// Minimum gap between two assistant-text flushes during a stream. Was 16ms
// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every
// token got its own React commit + Streamdown markdown re-parse, scaling
// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens
// batch into one commit at 60 tok/sec without introducing visible lag on the
// streaming text (still 30 fps of visible text growth). Big perceived
// smoothness win on long messages with big trailing paragraphs; see
// `scripts/profile-typing-lag.md` for the measurement work behind this.
const STREAM_DELTA_FLUSH_MS = 33
// Gateway/provider failures sometimes arrive as message.complete text instead
// of an explicit error event. Treat matches as inline assistant errors so they
// persist like real error events and don't get erased by hydrate fallback.
const COMPLETION_ERROR_PATTERNS = [
/^API call failed after \d+ retries:/i,
/^HTTP\s+\d{3}\b/i,
/^(Provider|Gateway)\s+error:/i
]
function completionErrorText(finalText: string): string | null {
const text = finalText.trim()
return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null
}
const SUBAGENT_EVENT_TYPES = new Set([
'subagent.spawn_requested',
'subagent.start',
'subagent.thinking',
'subagent.tool',
'subagent.progress',
'subagent.complete'
])
// Anonymous progress events that carry todos but no name still belong to the
// todo stream; named todo events are obviously routed there too.
function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined {
if (!payload) {
return undefined
}
const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos'))
return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined
}
function asRecord(value: unknown): Record<string, unknown> {
return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record<string, unknown>) : {}
}
function parseMaybeRecord(value: unknown): Record<string, unknown> {
if (typeof value === 'string') {
try {
return asRecord(JSON.parse(value))
} catch {
return {}
}
}
return asRecord(value)
}
const firstString = (...candidates: unknown[]): string => {
for (const v of candidates) {
if (typeof v === 'string' && v) {
return v
}
}
return ''
}
function delegateTaskPayloads(
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
): Record<string, unknown>[] {
if (payload?.name !== 'delegate_task') {
return []
}
const args = parseMaybeRecord(payload.args ?? payload.input)
const result = parseMaybeRecord(payload.result)
const rawTasks = Array.isArray(args.tasks) ? args.tasks : []
const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args]
const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running'
const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task'
const progressText = firstString(payload.preview, payload.message, payload.context)
const eventType =
phase === 'complete'
? 'subagent.complete'
: sourceEventType === 'tool.start'
? 'subagent.start'
: 'subagent.progress'
return tasks.map((task, index) => {
const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task'
const summary = firstString(result.summary, payload.summary, payload.message)
return {
depth: 0,
duration_seconds: payload.duration_s,
goal,
status,
subagent_id: `delegate-tool:${toolId}:${index}`,
summary: summary || undefined,
task_count: tasks.length,
task_index: index,
text: eventType === 'subagent.progress' ? progressText || goal : undefined,
tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined,
tool_preview: eventType === 'subagent.start' ? progressText : undefined,
toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [],
event_type: eventType,
output_tail:
phase === 'complete' && summary
? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }]
: undefined
}
})
}
export function useMessageStream({
activeSessionIdRef,
hydrateFromStoredSession,
queryClient,
refreshHermesConfig,
refreshSessions,
sessionStateByRuntimeIdRef,
updateSessionState
}: MessageStreamOptions) {
const sessionInterrupted = useCallback(
(sessionId: string) => sessionStateByRuntimeIdRef.current.get(sessionId)?.interrupted ?? false,
[sessionStateByRuntimeIdRef]
)
// Patch the in-flight assistant message (or seed it). Centralises the
// streamId/groupId bookkeeping every event callback would otherwise repeat.
const mutateStream = useCallback(
(
sessionId: string,
transform: (parts: ChatMessagePart[], message: ChatMessage) => ChatMessagePart[],
seed: () => ChatMessagePart[],
opts: {
pending?: (message: ChatMessage) => boolean
} = {}
) => {
const apply = () => {
updateSessionState(sessionId, state => {
// After a stop, drop any late deltas / tool events for the
// cancelled turn so they don't keep growing the (now finalized)
// assistant bubble or, worse, seed a brand-new bubble that
// appears to belong to the next user message.
if (state.interrupted) {
return state
}
const streamId = state.streamId ?? `assistant-stream-${Date.now()}`
const groupId = state.pendingBranchGroup ?? undefined
const prev = state.messages
let nextMessages: ChatMessage[]
if (!prev.some(m => m.id === streamId)) {
nextMessages = [
...prev,
{
id: streamId,
role: 'assistant',
parts: seed(),
pending: true,
branchGroupId: groupId
}
]
} else {
nextMessages = prev.map(m =>
m.id === streamId
? {
...m,
parts: transform(m.parts, m),
pending: opts.pending ? opts.pending(m) : true
}
: m
)
}
return {
...state,
messages: nextMessages,
streamId,
sawAssistantPayload: true,
awaitingResponse: false
}
})
}
apply()
},
[updateSessionState]
)
const queuedDeltasRef = useRef<Map<string, QueuedStreamDeltas>>(new Map())
const flushHandleRef = useRef<number | null>(null)
const lastFlushAtRef = useRef<number>(0)
const nativeSubagentSessionsRef = useRef<Set<string>>(new Set())
// Turns that auto-compacted: skip post-turn hydrate so live scrollback survives.
const compactedTurnRef = useRef<Set<string>>(new Set())
// Last session we applied a session.info cwd for — lets us tell an agent
// relocating the SAME session (follow it) from a session switch (don't yank).
const lastCwdInfoSessionRef = useRef<null | string>(null)
const flushQueuedDeltas = useCallback(
(sessionId?: string) => {
const queue = queuedDeltasRef.current
const ids = sessionId ? [sessionId] : [...queue.keys()]
for (const id of ids) {
const queued = queue.get(id)
if (!queued) {
continue
}
queue.delete(id)
if (queued.assistant) {
mutateStream(
id,
parts => dedupeGeneratedImageEchoesInParts(appendAssistantTextPart(parts, queued.assistant)),
() => [assistantTextPart(queued.assistant)]
)
}
if (queued.reasoning) {
mutateStream(
id,
parts => appendReasoningPart(parts, queued.reasoning),
() => [reasoningPart(queued.reasoning)]
)
}
}
},
[mutateStream]
)
const scheduleDeltaFlush = useCallback(() => {
if (flushHandleRef.current !== null) {
return
}
if (typeof window === 'undefined') {
flushQueuedDeltas()
return
}
// Enforce a floor on the gap between two flushes. Without it, an LLM
// emitting tokens slower than the rAF cadence (~30-80 tok/sec is typical)
// forces one React commit + Streamdown re-parse per token, and the
// last-block markdown re-parse cost is roughly linear in current block
// length. With this floor, slower streams still coalesce ~2 tokens per
// commit and the synthetic harness shows longtask counts drop from ~5/5s
// to ~1/5s on big sessions (see scripts/profile-typing-lag.md).
const sinceLast = performance.now() - lastFlushAtRef.current
const runFlush = () => {
flushHandleRef.current = null
lastFlushAtRef.current = performance.now()
flushQueuedDeltas()
}
if (sinceLast >= STREAM_DELTA_FLUSH_MS && typeof window.requestAnimationFrame === 'function') {
flushHandleRef.current = window.requestAnimationFrame(runFlush)
return
}
flushHandleRef.current = window.setTimeout(runFlush, Math.max(0, STREAM_DELTA_FLUSH_MS - sinceLast))
}, [flushQueuedDeltas])
const queueDelta = useCallback(
(sessionId: string, key: keyof QueuedStreamDeltas, delta: string) => {
if (!delta) {
return
}
const queued = queuedDeltasRef.current.get(sessionId) ?? { assistant: '', reasoning: '' }
queued[key] += delta
queuedDeltasRef.current.set(sessionId, queued)
scheduleDeltaFlush()
},
[scheduleDeltaFlush]
)
useEffect(
() => () => {
if (flushHandleRef.current !== null && typeof window !== 'undefined') {
if (typeof window.cancelAnimationFrame === 'function') {
window.cancelAnimationFrame(flushHandleRef.current)
} else {
window.clearTimeout(flushHandleRef.current)
}
}
flushHandleRef.current = null
flushQueuedDeltas()
},
[flushQueuedDeltas]
)
const appendAssistantDelta = useCallback(
(sessionId: string, delta: string) => {
if (!delta) {
return
}
queueDelta(sessionId, 'assistant', delta)
},
[queueDelta]
)
const appendReasoningDelta = useCallback(
(sessionId: string, delta: string, replace = false) => {
if (!delta) {
return
}
if (!replace) {
queueDelta(sessionId, 'reasoning', delta)
return
}
flushQueuedDeltas(sessionId)
mutateStream(
sessionId,
(parts, message) => {
if (replace && chatMessageText(message).trim()) {
return parts
}
if (replace) {
return [...parts.filter(part => part.type !== 'reasoning'), reasoningPart(delta)]
}
return appendReasoningPart(parts, delta)
},
() => [reasoningPart(delta)]
)
},
[flushQueuedDeltas, mutateStream, queueDelta]
)
const upsertToolCall = useCallback(
(
sessionId: string,
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
) => {
// Text deltas flush on a timer but tool events apply now; flush first so
// a tool part can't jump ahead of the text that preceded it.
flushQueuedDeltas(sessionId)
if (sessionInterrupted(sessionId)) {
return
}
// The composer status stack owns todo display now (no inline panel) —
// mirror every todo state the tool reports into its session store.
if (payload?.name === 'todo') {
const todos = parseTodos(payload.todos) ?? parseTodos(payload.result) ?? parseTodos(payload.args)
if (todos) {
setSessionTodos(sessionId, todos)
}
}
if (!nativeSubagentSessionsRef.current.has(sessionId)) {
for (const subagentPayload of delegateTaskPayloads(payload, phase, sourceEventType)) {
upsertSubagent(
sessionId,
subagentPayload,
true,
phase === 'complete' ? 'delegate.complete' : 'delegate.running'
)
}
}
mutateStream(
sessionId,
parts => dedupeGeneratedImageEchoesInParts(upsertToolPart(parts, payload, phase)),
() => upsertToolPart([], payload, phase),
{ pending: m => phase !== 'complete' || (m.pending ?? false) }
)
},
[flushQueuedDeltas, mutateStream, sessionInterrupted]
)
const completeAssistantMessage = useCallback(
(sessionId: string, text: string) => {
let shouldHydrate = false
const completedState = updateSessionState(sessionId, state => {
// Late completion from an already-cancelled turn: cancelRun has
// already finalized the bubble (kept the partial text, dropped it if
// empty). Re-running the dedupe below would replace the partial with
// the just-cancelled full text, so we settle and bail instead.
if (state.interrupted) {
return {
...state,
awaitingResponse: false,
busy: false,
needsInput: false,
pendingBranchGroup: null,
streamId: null,
turnStartedAt: null
}
}
const streamId = state.streamId
const finalText = renderMediaTags(text).trim()
const completionError = completionErrorText(finalText)
const normalize = (value: string) => value.replace(/\s+/g, ' ').trim()
const replaceTextPart = (parts: ChatMessagePart[]) => {
const visibleFinalText = stripGeneratedImageEchoes(finalText, generatedImageEchoSources(parts)).trim()
const dedupeReference = normalize(visibleFinalText)
const kept = parts.filter(part => {
if (part.type === 'text') {
return false
}
if (part.type !== 'reasoning' || !dedupeReference) {
return true
}
const r = normalize(part.text)
return !(r && (dedupeReference.startsWith(r) || r.startsWith(dedupeReference)))
})
return visibleFinalText ? [...kept, assistantTextPart(visibleFinalText)] : kept
}
const completeMessage = (message: ChatMessage): ChatMessage =>
completionError
? {
...message,
error: completionError,
parts: message.parts.filter(part => part.type !== 'text'),
pending: false
}
: {
...message,
parts: replaceTextPart(message.parts),
pending: false
}
const newAssistantFromCompletion = (): ChatMessage => ({
id: `assistant-${Date.now()}`,
role: 'assistant',
parts: completionError ? [] : [assistantTextPart(finalText)],
branchGroupId: state.pendingBranchGroup ?? undefined,
...(completionError && { error: completionError })
})
const prev = state.messages
let nextMessages = prev
if (streamId && prev.some(m => m.id === streamId)) {
nextMessages = prev.map(m => (m.id === streamId ? completeMessage(m) : m))
} else {
const fallbackIndex = [...prev]
.reverse()
.findIndex(message => message.role === 'assistant' && !message.hidden)
if (fallbackIndex >= 0) {
const index = prev.length - 1 - fallbackIndex
const existing = prev[index]
const existingText = chatMessageText(existing).trim()
if (existing.pending || (finalText && existingText === finalText)) {
nextMessages = prev.map((message, messageIndex) =>
messageIndex === index ? completeMessage(message) : message
)
} else if (finalText) {
nextMessages = [...prev, newAssistantFromCompletion()]
}
} else if (finalText) {
nextMessages = [...prev, newAssistantFromCompletion()]
}
}
const hasInlineError = nextMessages.some(m => m.role === 'assistant' && m.error && !m.hidden)
const lastVisible = [...nextMessages].reverse().find(m => !m.hidden)
const unresolvedUserTail = lastVisible?.role === 'user'
shouldHydrate =
!completionError && !hasInlineError && !unresolvedUserTail && (!state.sawAssistantPayload || !finalText)
return {
...state,
messages: nextMessages,
streamId: null,
pendingBranchGroup: null,
awaitingResponse: false,
busy: false,
needsInput: false,
turnStartedAt: null
}
})
void refreshSessions().catch(() => undefined)
// Sync the freshly-titled row to other windows (e.g. main, when the turn
// ran in the pop-out).
broadcastSessionsChanged()
if (compactedTurnRef.current.delete(sessionId)) {
shouldHydrate = false
}
if (shouldHydrate) {
void hydrateFromStoredSession(3, completedState.storedSessionId, sessionId)
}
dispatchNativeNotification({
body: text.slice(0, 140) || translateNow('notifications.native.turnDoneBody'),
kind: 'turnDone',
sessionId,
title: translateNow('notifications.native.turnDoneTitle')
})
},
[hydrateFromStoredSession, refreshSessions, updateSessionState]
)
const failAssistantMessage = useCallback(
(sessionId: string, errorMessage: string) => {
updateSessionState(sessionId, state => {
const streamId = state.streamId ?? `assistant-error-${Date.now()}`
const groupId = state.pendingBranchGroup ?? undefined
const prev = state.messages
const error = errorMessage.trim() || 'Hermes reported an error'
const nextMessages = prev.some(m => m.id === streamId)
? prev.map(message =>
message.id === streamId
? {
...message,
error,
pending: false
}
: message
)
: [
...prev,
{
id: streamId,
role: 'assistant' as const,
parts: [],
error,
pending: false,
branchGroupId: groupId
}
]
return {
...state,
messages: nextMessages,
streamId: null,
pendingBranchGroup: null,
sawAssistantPayload: true,
awaitingResponse: false,
busy: false,
needsInput: false,
turnStartedAt: null
}
})
},
[updateSessionState]
)
const handleGatewayEvent = useCallback(
/** The gateway-event dispatcher, extracted from useMessageStream. */
export function useGatewayEventHandler(deps: GatewayEventDeps) {
const {
appendAssistantDelta,
appendReasoningDelta,
activeSessionIdRef,
compactedTurnRef,
lastCwdInfoSessionRef,
nativeSubagentSessionsRef,
completeAssistantMessage,
failAssistantMessage,
flushQueuedDeltas,
queryClient,
refreshHermesConfig,
sessionInterrupted,
updateSessionState,
upsertToolCall
} = deps
return useCallback(
(event: RpcEvent) => {
const payload = event.payload as GatewayEventPayload | undefined
const explicitSid = event.session_id || ''
@ -1264,9 +633,12 @@ export function useMessageStream({
appendAssistantDelta,
appendReasoningDelta,
activeSessionIdRef,
compactedTurnRef,
completeAssistantMessage,
failAssistantMessage,
flushQueuedDeltas,
lastCwdInfoSessionRef,
nativeSubagentSessionsRef,
queryClient,
refreshHermesConfig,
sessionInterrupted,
@ -1274,12 +646,4 @@ export function useMessageStream({
upsertToolCall
]
)
return {
appendAssistantDelta,
appendReasoningDelta,
completeAssistantMessage,
handleGatewayEvent,
upsertToolCall
}
}

View file

@ -0,0 +1,540 @@
import type { QueryClient } from '@tanstack/react-query'
import { type MutableRefObject, useCallback, useEffect, useRef } from 'react'
import { translateNow } from '@/i18n'
import {
appendAssistantTextPart,
appendReasoningPart,
assistantTextPart,
type ChatMessage,
type ChatMessagePart,
chatMessageText,
type GatewayEventPayload,
reasoningPart,
renderMediaTags,
upsertToolPart
} from '@/lib/chat-messages'
import {
dedupeGeneratedImageEchoesInParts,
generatedImageEchoSources,
stripGeneratedImageEchoes
} from '@/lib/generated-images'
import { parseTodos } from '@/lib/todos'
import { dispatchNativeNotification } from '@/store/native-notifications'
import { broadcastSessionsChanged } from '@/store/session-sync'
import { upsertSubagent } from '@/store/subagents'
import { setSessionTodos } from '@/store/todos'
import type { ClientSessionState } from '../../../types'
import { useGatewayEventHandler } from './gateway-event'
import { completionErrorText, delegateTaskPayloads, STREAM_DELTA_FLUSH_MS } from './utils'
interface MessageStreamOptions {
activeSessionIdRef: MutableRefObject<string | null>
hydrateFromStoredSession: (
attempts?: number,
storedSessionId?: string | null,
runtimeSessionId?: string | null
) => Promise<void>
queryClient: QueryClient
refreshHermesConfig: () => Promise<void>
refreshSessions: () => Promise<void>
sessionStateByRuntimeIdRef: MutableRefObject<Map<string, ClientSessionState>>
updateSessionState: (
sessionId: string,
updater: (state: ClientSessionState) => ClientSessionState,
storedSessionId?: string | null
) => ClientSessionState
}
interface QueuedStreamDeltas {
assistant: string
reasoning: string
}
export function useMessageStream({
activeSessionIdRef,
hydrateFromStoredSession,
queryClient,
refreshHermesConfig,
refreshSessions,
sessionStateByRuntimeIdRef,
updateSessionState
}: MessageStreamOptions) {
const sessionInterrupted = useCallback(
(sessionId: string) => sessionStateByRuntimeIdRef.current.get(sessionId)?.interrupted ?? false,
[sessionStateByRuntimeIdRef]
)
// Patch the in-flight assistant message (or seed it). Centralises the
// streamId/groupId bookkeeping every event callback would otherwise repeat.
const mutateStream = useCallback(
(
sessionId: string,
transform: (parts: ChatMessagePart[], message: ChatMessage) => ChatMessagePart[],
seed: () => ChatMessagePart[],
opts: {
pending?: (message: ChatMessage) => boolean
} = {}
) => {
const apply = () => {
updateSessionState(sessionId, state => {
// After a stop, drop any late deltas / tool events for the
// cancelled turn so they don't keep growing the (now finalized)
// assistant bubble or, worse, seed a brand-new bubble that
// appears to belong to the next user message.
if (state.interrupted) {
return state
}
const streamId = state.streamId ?? `assistant-stream-${Date.now()}`
const groupId = state.pendingBranchGroup ?? undefined
const prev = state.messages
let nextMessages: ChatMessage[]
if (!prev.some(m => m.id === streamId)) {
nextMessages = [
...prev,
{
id: streamId,
role: 'assistant',
parts: seed(),
pending: true,
branchGroupId: groupId
}
]
} else {
nextMessages = prev.map(m =>
m.id === streamId
? {
...m,
parts: transform(m.parts, m),
pending: opts.pending ? opts.pending(m) : true
}
: m
)
}
return {
...state,
messages: nextMessages,
streamId,
sawAssistantPayload: true,
awaitingResponse: false
}
})
}
apply()
},
[updateSessionState]
)
const queuedDeltasRef = useRef<Map<string, QueuedStreamDeltas>>(new Map())
const flushHandleRef = useRef<number | null>(null)
const lastFlushAtRef = useRef<number>(0)
const nativeSubagentSessionsRef = useRef<Set<string>>(new Set())
// Turns that auto-compacted: skip post-turn hydrate so live scrollback survives.
const compactedTurnRef = useRef<Set<string>>(new Set())
// Last session we applied a session.info cwd for — lets us tell an agent
// relocating the SAME session (follow it) from a session switch (don't yank).
const lastCwdInfoSessionRef = useRef<null | string>(null)
const flushQueuedDeltas = useCallback(
(sessionId?: string) => {
const queue = queuedDeltasRef.current
const ids = sessionId ? [sessionId] : [...queue.keys()]
for (const id of ids) {
const queued = queue.get(id)
if (!queued) {
continue
}
queue.delete(id)
if (queued.assistant) {
mutateStream(
id,
parts => dedupeGeneratedImageEchoesInParts(appendAssistantTextPart(parts, queued.assistant)),
() => [assistantTextPart(queued.assistant)]
)
}
if (queued.reasoning) {
mutateStream(
id,
parts => appendReasoningPart(parts, queued.reasoning),
() => [reasoningPart(queued.reasoning)]
)
}
}
},
[mutateStream]
)
const scheduleDeltaFlush = useCallback(() => {
if (flushHandleRef.current !== null) {
return
}
if (typeof window === 'undefined') {
flushQueuedDeltas()
return
}
// Enforce a floor on the gap between two flushes. Without it, an LLM
// emitting tokens slower than the rAF cadence (~30-80 tok/sec is typical)
// forces one React commit + Streamdown re-parse per token, and the
// last-block markdown re-parse cost is roughly linear in current block
// length. With this floor, slower streams still coalesce ~2 tokens per
// commit and the synthetic harness shows longtask counts drop from ~5/5s
// to ~1/5s on big sessions (see scripts/profile-typing-lag.md).
const sinceLast = performance.now() - lastFlushAtRef.current
const runFlush = () => {
flushHandleRef.current = null
lastFlushAtRef.current = performance.now()
flushQueuedDeltas()
}
if (sinceLast >= STREAM_DELTA_FLUSH_MS && typeof window.requestAnimationFrame === 'function') {
flushHandleRef.current = window.requestAnimationFrame(runFlush)
return
}
flushHandleRef.current = window.setTimeout(runFlush, Math.max(0, STREAM_DELTA_FLUSH_MS - sinceLast))
}, [flushQueuedDeltas])
const queueDelta = useCallback(
(sessionId: string, key: keyof QueuedStreamDeltas, delta: string) => {
if (!delta) {
return
}
const queued = queuedDeltasRef.current.get(sessionId) ?? { assistant: '', reasoning: '' }
queued[key] += delta
queuedDeltasRef.current.set(sessionId, queued)
scheduleDeltaFlush()
},
[scheduleDeltaFlush]
)
useEffect(
() => () => {
if (flushHandleRef.current !== null && typeof window !== 'undefined') {
if (typeof window.cancelAnimationFrame === 'function') {
window.cancelAnimationFrame(flushHandleRef.current)
} else {
window.clearTimeout(flushHandleRef.current)
}
}
flushHandleRef.current = null
flushQueuedDeltas()
},
[flushQueuedDeltas]
)
const appendAssistantDelta = useCallback(
(sessionId: string, delta: string) => {
if (!delta) {
return
}
queueDelta(sessionId, 'assistant', delta)
},
[queueDelta]
)
const appendReasoningDelta = useCallback(
(sessionId: string, delta: string, replace = false) => {
if (!delta) {
return
}
if (!replace) {
queueDelta(sessionId, 'reasoning', delta)
return
}
flushQueuedDeltas(sessionId)
mutateStream(
sessionId,
(parts, message) => {
if (replace && chatMessageText(message).trim()) {
return parts
}
if (replace) {
return [...parts.filter(part => part.type !== 'reasoning'), reasoningPart(delta)]
}
return appendReasoningPart(parts, delta)
},
() => [reasoningPart(delta)]
)
},
[flushQueuedDeltas, mutateStream, queueDelta]
)
const upsertToolCall = useCallback(
(
sessionId: string,
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
) => {
// Text deltas flush on a timer but tool events apply now; flush first so
// a tool part can't jump ahead of the text that preceded it.
flushQueuedDeltas(sessionId)
if (sessionInterrupted(sessionId)) {
return
}
// The composer status stack owns todo display now (no inline panel) —
// mirror every todo state the tool reports into its session store.
if (payload?.name === 'todo') {
const todos = parseTodos(payload.todos) ?? parseTodos(payload.result) ?? parseTodos(payload.args)
if (todos) {
setSessionTodos(sessionId, todos)
}
}
if (!nativeSubagentSessionsRef.current.has(sessionId)) {
for (const subagentPayload of delegateTaskPayloads(payload, phase, sourceEventType)) {
upsertSubagent(
sessionId,
subagentPayload,
true,
phase === 'complete' ? 'delegate.complete' : 'delegate.running'
)
}
}
mutateStream(
sessionId,
parts => dedupeGeneratedImageEchoesInParts(upsertToolPart(parts, payload, phase)),
() => upsertToolPart([], payload, phase),
{ pending: m => phase !== 'complete' || (m.pending ?? false) }
)
},
[flushQueuedDeltas, mutateStream, sessionInterrupted]
)
const completeAssistantMessage = useCallback(
(sessionId: string, text: string) => {
let shouldHydrate = false
const completedState = updateSessionState(sessionId, state => {
// Late completion from an already-cancelled turn: cancelRun has
// already finalized the bubble (kept the partial text, dropped it if
// empty). Re-running the dedupe below would replace the partial with
// the just-cancelled full text, so we settle and bail instead.
if (state.interrupted) {
return {
...state,
awaitingResponse: false,
busy: false,
needsInput: false,
pendingBranchGroup: null,
streamId: null,
turnStartedAt: null
}
}
const streamId = state.streamId
const finalText = renderMediaTags(text).trim()
const completionError = completionErrorText(finalText)
const normalize = (value: string) => value.replace(/\s+/g, ' ').trim()
const replaceTextPart = (parts: ChatMessagePart[]) => {
const visibleFinalText = stripGeneratedImageEchoes(finalText, generatedImageEchoSources(parts)).trim()
const dedupeReference = normalize(visibleFinalText)
const kept = parts.filter(part => {
if (part.type === 'text') {
return false
}
if (part.type !== 'reasoning' || !dedupeReference) {
return true
}
const r = normalize(part.text)
return !(r && (dedupeReference.startsWith(r) || r.startsWith(dedupeReference)))
})
return visibleFinalText ? [...kept, assistantTextPart(visibleFinalText)] : kept
}
const completeMessage = (message: ChatMessage): ChatMessage =>
completionError
? {
...message,
error: completionError,
parts: message.parts.filter(part => part.type !== 'text'),
pending: false
}
: {
...message,
parts: replaceTextPart(message.parts),
pending: false
}
const newAssistantFromCompletion = (): ChatMessage => ({
id: `assistant-${Date.now()}`,
role: 'assistant',
parts: completionError ? [] : [assistantTextPart(finalText)],
branchGroupId: state.pendingBranchGroup ?? undefined,
...(completionError && { error: completionError })
})
const prev = state.messages
let nextMessages = prev
if (streamId && prev.some(m => m.id === streamId)) {
nextMessages = prev.map(m => (m.id === streamId ? completeMessage(m) : m))
} else {
const fallbackIndex = [...prev]
.reverse()
.findIndex(message => message.role === 'assistant' && !message.hidden)
if (fallbackIndex >= 0) {
const index = prev.length - 1 - fallbackIndex
const existing = prev[index]
const existingText = chatMessageText(existing).trim()
if (existing.pending || (finalText && existingText === finalText)) {
nextMessages = prev.map((message, messageIndex) =>
messageIndex === index ? completeMessage(message) : message
)
} else if (finalText) {
nextMessages = [...prev, newAssistantFromCompletion()]
}
} else if (finalText) {
nextMessages = [...prev, newAssistantFromCompletion()]
}
}
const hasInlineError = nextMessages.some(m => m.role === 'assistant' && m.error && !m.hidden)
const lastVisible = [...nextMessages].reverse().find(m => !m.hidden)
const unresolvedUserTail = lastVisible?.role === 'user'
shouldHydrate =
!completionError && !hasInlineError && !unresolvedUserTail && (!state.sawAssistantPayload || !finalText)
return {
...state,
messages: nextMessages,
streamId: null,
pendingBranchGroup: null,
awaitingResponse: false,
busy: false,
needsInput: false,
turnStartedAt: null
}
})
void refreshSessions().catch(() => undefined)
// Sync the freshly-titled row to other windows (e.g. main, when the turn
// ran in the pop-out).
broadcastSessionsChanged()
if (compactedTurnRef.current.delete(sessionId)) {
shouldHydrate = false
}
if (shouldHydrate) {
void hydrateFromStoredSession(3, completedState.storedSessionId, sessionId)
}
dispatchNativeNotification({
body: text.slice(0, 140) || translateNow('notifications.native.turnDoneBody'),
kind: 'turnDone',
sessionId,
title: translateNow('notifications.native.turnDoneTitle')
})
},
[hydrateFromStoredSession, refreshSessions, updateSessionState]
)
const failAssistantMessage = useCallback(
(sessionId: string, errorMessage: string) => {
updateSessionState(sessionId, state => {
const streamId = state.streamId ?? `assistant-error-${Date.now()}`
const groupId = state.pendingBranchGroup ?? undefined
const prev = state.messages
const error = errorMessage.trim() || 'Hermes reported an error'
const nextMessages = prev.some(m => m.id === streamId)
? prev.map(message =>
message.id === streamId
? {
...message,
error,
pending: false
}
: message
)
: [
...prev,
{
id: streamId,
role: 'assistant' as const,
parts: [],
error,
pending: false,
branchGroupId: groupId
}
]
return {
...state,
messages: nextMessages,
streamId: null,
pendingBranchGroup: null,
sawAssistantPayload: true,
awaitingResponse: false,
busy: false,
needsInput: false,
turnStartedAt: null
}
})
},
[updateSessionState]
)
const handleGatewayEvent = useGatewayEventHandler({
appendAssistantDelta,
appendReasoningDelta,
activeSessionIdRef,
compactedTurnRef,
lastCwdInfoSessionRef,
nativeSubagentSessionsRef,
completeAssistantMessage,
failAssistantMessage,
flushQueuedDeltas,
queryClient,
refreshHermesConfig,
sessionInterrupted,
updateSessionState,
upsertToolCall
})
return {
appendAssistantDelta,
appendReasoningDelta,
completeAssistantMessage,
handleGatewayEvent,
upsertToolCall
}
}

View file

@ -0,0 +1,66 @@
import { describe, expect, it } from 'vitest'
import type { GatewayEventPayload } from '@/lib/chat-messages'
import {
completionErrorText,
delegateTaskPayloads,
hasSessionInfoStatePatch,
sessionInfoStatePatch,
toTodoPayload
} from './utils'
const payload = (over: Record<string, unknown>): GatewayEventPayload => over as GatewayEventPayload
describe('completionErrorText', () => {
it('flags provider/HTTP/retry failures, ignores normal text', () => {
expect(completionErrorText('API call failed after 3 retries: boom')).toMatch(/^API call failed/)
expect(completionErrorText('HTTP 500 upstream')).toMatch(/^HTTP 500/)
expect(completionErrorText('Gateway error: nope')).toMatch(/^Gateway error/)
expect(completionErrorText('here is your answer')).toBeNull()
expect(completionErrorText(' ')).toBeNull()
})
})
describe('toTodoPayload', () => {
it('routes named todo and anonymous todos-bearing events to the todo stream', () => {
expect(toTodoPayload(payload({ name: 'todo' }))?.tool_id).toBe('todo-live')
expect(toTodoPayload(payload({ todos: [] }))?.name).toBe('todo')
expect(toTodoPayload(payload({ name: 'web_search' }))).toBeUndefined()
expect(toTodoPayload(undefined)).toBeUndefined()
})
})
describe('sessionInfoStatePatch / hasSessionInfoStatePatch', () => {
it('extracts only present runtime fields', () => {
const patch = sessionInfoStatePatch(payload({ model: 'gpt', fast: true, branch: 'main' }))
expect(patch).toMatchObject({ model: 'gpt', fast: true, branch: 'main' })
expect(hasSessionInfoStatePatch(patch)).toBe(true)
expect(hasSessionInfoStatePatch(sessionInfoStatePatch(payload({})))).toBe(false)
})
})
describe('delegateTaskPayloads', () => {
it('returns [] for non-delegate events', () => {
expect(delegateTaskPayloads(payload({ name: 'web_search' }), 'running')).toEqual([])
})
it('maps a running tool.start to a subagent.start spec', () => {
const [spec] = delegateTaskPayloads(
payload({ name: 'delegate_task', tool_id: 't1', args: { goal: 'do it' } }),
'running',
'tool.start'
)
expect(spec).toMatchObject({ event_type: 'subagent.start', goal: 'do it', status: 'running' })
})
it('maps completion (with error) to a failed subagent.complete', () => {
const [spec] = delegateTaskPayloads(
payload({ name: 'delegate_task', error: 'boom', result: { summary: 'failed run' } }),
'complete'
)
expect(spec).toMatchObject({ event_type: 'subagent.complete', status: 'failed' })
})
})

View file

@ -0,0 +1,179 @@
import type { GatewayEventPayload } from '@/lib/chat-messages'
import { normalizePersonalityValue } from '@/lib/chat-runtime'
import type { ClientSessionState } from '../../../types'
type SessionRuntimeStatePatch = Partial<
Pick<
ClientSessionState,
'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo'
>
>
export function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch {
const patch: SessionRuntimeStatePatch = {}
if (typeof payload?.model === 'string') {
patch.model = payload.model || ''
}
if (typeof payload?.provider === 'string') {
patch.provider = payload.provider || ''
}
if (typeof payload?.cwd === 'string') {
patch.cwd = payload.cwd
}
if (typeof payload?.branch === 'string') {
patch.branch = payload.branch
}
if (typeof payload?.personality === 'string') {
patch.personality = normalizePersonalityValue(payload.personality)
}
if (typeof payload?.reasoning_effort === 'string') {
patch.reasoningEffort = payload.reasoning_effort
}
if (typeof payload?.service_tier === 'string') {
patch.serviceTier = payload.service_tier
}
if (typeof payload?.fast === 'boolean') {
patch.fast = payload.fast
}
if (typeof payload?.yolo === 'boolean') {
patch.yolo = payload.yolo
}
return patch
}
export function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean {
return Object.keys(patch).length > 0
}
// Minimum gap between two assistant-text flushes during a stream. Was 16ms
// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every
// token got its own React commit + Streamdown markdown re-parse, scaling
// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens
// batch into one commit at 60 tok/sec without introducing visible lag on the
// streaming text (still 30 fps of visible text growth). Big perceived
// smoothness win on long messages with big trailing paragraphs; see
// `scripts/profile-typing-lag.md` for the measurement work behind this.
export const STREAM_DELTA_FLUSH_MS = 33
// Gateway/provider failures sometimes arrive as message.complete text instead
// of an explicit error event. Treat matches as inline assistant errors so they
// persist like real error events and don't get erased by hydrate fallback.
const COMPLETION_ERROR_PATTERNS = [
/^API call failed after \d+ retries:/i,
/^HTTP\s+\d{3}\b/i,
/^(Provider|Gateway)\s+error:/i
]
export function completionErrorText(finalText: string): string | null {
const text = finalText.trim()
return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null
}
export const SUBAGENT_EVENT_TYPES = new Set([
'subagent.spawn_requested',
'subagent.start',
'subagent.thinking',
'subagent.tool',
'subagent.progress',
'subagent.complete'
])
// Anonymous progress events that carry todos but no name still belong to the
// todo stream; named todo events are obviously routed there too.
export function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined {
if (!payload) {
return undefined
}
const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos'))
return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined
}
function asRecord(value: unknown): Record<string, unknown> {
return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record<string, unknown>) : {}
}
function parseMaybeRecord(value: unknown): Record<string, unknown> {
if (typeof value === 'string') {
try {
return asRecord(JSON.parse(value))
} catch {
return {}
}
}
return asRecord(value)
}
const firstString = (...candidates: unknown[]): string => {
for (const v of candidates) {
if (typeof v === 'string' && v) {
return v
}
}
return ''
}
export function delegateTaskPayloads(
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
): Record<string, unknown>[] {
if (payload?.name !== 'delegate_task') {
return []
}
const args = parseMaybeRecord(payload.args ?? payload.input)
const result = parseMaybeRecord(payload.result)
const rawTasks = Array.isArray(args.tasks) ? args.tasks : []
const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args]
const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running'
const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task'
const progressText = firstString(payload.preview, payload.message, payload.context)
const eventType =
phase === 'complete'
? 'subagent.complete'
: sourceEventType === 'tool.start'
? 'subagent.start'
: 'subagent.progress'
return tasks.map((task, index) => {
const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task'
const summary = firstString(result.summary, payload.summary, payload.message)
return {
depth: 0,
duration_seconds: payload.duration_s,
goal,
status,
subagent_id: `delegate-tool:${toolId}:${index}`,
summary: summary || undefined,
task_count: tasks.length,
task_index: index,
text: eventType === 'subagent.progress' ? progressText || goal : undefined,
tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined,
tool_preview: eventType === 'subagent.start' ? progressText : undefined,
toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [],
event_type: eventType,
output_tail:
phase === 'complete' && summary
? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }]
: undefined
}
})
}