refactor(desktop): split use-message-stream into folder + utils

Extract the standalone gateway-event helpers (session-info patch derivation,
completion-error detection, todo-payload routing, delegate_task -> subagent
spec mapping, + the stream-flush/subagent-event constants) out of the
1,285-line hook into a colocated, tested use-message-stream/utils.ts. index.ts
keeps the stateful streaming hook and consumes the helpers.

Pure restructuring, no behaviour change; folder index keeps the import path
intact. index.ts: 1,285 -> ~1,120. Adds unit tests for the pure helpers.
This commit is contained in:
Brooklyn Nicholson 2026-06-30 02:58:45 -05:00
parent 3a83b6bc5d
commit 086343854d
3 changed files with 257 additions and 177 deletions

View file

@ -2,8 +2,8 @@ import type { QueryClient } from '@tanstack/react-query'
import { type MutableRefObject, useCallback, useEffect, useRef } from 'react'
import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream'
import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals'
import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer'
import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals'
import { translateNow } from '@/i18n'
import {
appendAssistantTextPart,
@ -61,7 +61,17 @@ import { recordToolDiff } from '@/store/tool-diffs'
import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events'
import type { RpcEvent } from '@/types/hermes'
import type { ClientSessionState } from '../../types'
import type { ClientSessionState } from '../../../types'
import {
completionErrorText,
delegateTaskPayloads,
hasSessionInfoStatePatch,
sessionInfoStatePatch,
STREAM_DELTA_FLUSH_MS,
SUBAGENT_EVENT_TYPES,
toTodoPayload
} from './utils'
interface MessageStreamOptions {
activeSessionIdRef: MutableRefObject<string | null>
@ -86,181 +96,6 @@ interface QueuedStreamDeltas {
reasoning: string
}
type SessionRuntimeStatePatch = Partial<
Pick<
ClientSessionState,
'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo'
>
>
function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch {
const patch: SessionRuntimeStatePatch = {}
if (typeof payload?.model === 'string') {
patch.model = payload.model || ''
}
if (typeof payload?.provider === 'string') {
patch.provider = payload.provider || ''
}
if (typeof payload?.cwd === 'string') {
patch.cwd = payload.cwd
}
if (typeof payload?.branch === 'string') {
patch.branch = payload.branch
}
if (typeof payload?.personality === 'string') {
patch.personality = normalizePersonalityValue(payload.personality)
}
if (typeof payload?.reasoning_effort === 'string') {
patch.reasoningEffort = payload.reasoning_effort
}
if (typeof payload?.service_tier === 'string') {
patch.serviceTier = payload.service_tier
}
if (typeof payload?.fast === 'boolean') {
patch.fast = payload.fast
}
if (typeof payload?.yolo === 'boolean') {
patch.yolo = payload.yolo
}
return patch
}
function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean {
return Object.keys(patch).length > 0
}
// Minimum gap between two assistant-text flushes during a stream. Was 16ms
// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every
// token got its own React commit + Streamdown markdown re-parse, scaling
// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens
// batch into one commit at 60 tok/sec without introducing visible lag on the
// streaming text (still 30 fps of visible text growth). Big perceived
// smoothness win on long messages with big trailing paragraphs; see
// `scripts/profile-typing-lag.md` for the measurement work behind this.
const STREAM_DELTA_FLUSH_MS = 33
// Gateway/provider failures sometimes arrive as message.complete text instead
// of an explicit error event. Treat matches as inline assistant errors so they
// persist like real error events and don't get erased by hydrate fallback.
const COMPLETION_ERROR_PATTERNS = [
/^API call failed after \d+ retries:/i,
/^HTTP\s+\d{3}\b/i,
/^(Provider|Gateway)\s+error:/i
]
function completionErrorText(finalText: string): string | null {
const text = finalText.trim()
return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null
}
const SUBAGENT_EVENT_TYPES = new Set([
'subagent.spawn_requested',
'subagent.start',
'subagent.thinking',
'subagent.tool',
'subagent.progress',
'subagent.complete'
])
// Anonymous progress events that carry todos but no name still belong to the
// todo stream; named todo events are obviously routed there too.
function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined {
if (!payload) {
return undefined
}
const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos'))
return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined
}
function asRecord(value: unknown): Record<string, unknown> {
return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record<string, unknown>) : {}
}
function parseMaybeRecord(value: unknown): Record<string, unknown> {
if (typeof value === 'string') {
try {
return asRecord(JSON.parse(value))
} catch {
return {}
}
}
return asRecord(value)
}
const firstString = (...candidates: unknown[]): string => {
for (const v of candidates) {
if (typeof v === 'string' && v) {
return v
}
}
return ''
}
function delegateTaskPayloads(
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
): Record<string, unknown>[] {
if (payload?.name !== 'delegate_task') {
return []
}
const args = parseMaybeRecord(payload.args ?? payload.input)
const result = parseMaybeRecord(payload.result)
const rawTasks = Array.isArray(args.tasks) ? args.tasks : []
const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args]
const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running'
const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task'
const progressText = firstString(payload.preview, payload.message, payload.context)
const eventType =
phase === 'complete'
? 'subagent.complete'
: sourceEventType === 'tool.start'
? 'subagent.start'
: 'subagent.progress'
return tasks.map((task, index) => {
const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task'
const summary = firstString(result.summary, payload.summary, payload.message)
return {
depth: 0,
duration_seconds: payload.duration_s,
goal,
status,
subagent_id: `delegate-tool:${toolId}:${index}`,
summary: summary || undefined,
task_count: tasks.length,
task_index: index,
text: eventType === 'subagent.progress' ? progressText || goal : undefined,
tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined,
tool_preview: eventType === 'subagent.start' ? progressText : undefined,
toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [],
event_type: eventType,
output_tail:
phase === 'complete' && summary
? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }]
: undefined
}
})
}
export function useMessageStream({
activeSessionIdRef,
hydrateFromStoredSession,

View file

@ -0,0 +1,66 @@
import { describe, expect, it } from 'vitest'
import type { GatewayEventPayload } from '@/lib/chat-messages'
import {
completionErrorText,
delegateTaskPayloads,
hasSessionInfoStatePatch,
sessionInfoStatePatch,
toTodoPayload
} from './utils'
const payload = (over: Record<string, unknown>): GatewayEventPayload => over as GatewayEventPayload
describe('completionErrorText', () => {
it('flags provider/HTTP/retry failures, ignores normal text', () => {
expect(completionErrorText('API call failed after 3 retries: boom')).toMatch(/^API call failed/)
expect(completionErrorText('HTTP 500 upstream')).toMatch(/^HTTP 500/)
expect(completionErrorText('Gateway error: nope')).toMatch(/^Gateway error/)
expect(completionErrorText('here is your answer')).toBeNull()
expect(completionErrorText(' ')).toBeNull()
})
})
describe('toTodoPayload', () => {
it('routes named todo and anonymous todos-bearing events to the todo stream', () => {
expect(toTodoPayload(payload({ name: 'todo' }))?.tool_id).toBe('todo-live')
expect(toTodoPayload(payload({ todos: [] }))?.name).toBe('todo')
expect(toTodoPayload(payload({ name: 'web_search' }))).toBeUndefined()
expect(toTodoPayload(undefined)).toBeUndefined()
})
})
describe('sessionInfoStatePatch / hasSessionInfoStatePatch', () => {
it('extracts only present runtime fields', () => {
const patch = sessionInfoStatePatch(payload({ model: 'gpt', fast: true, branch: 'main' }))
expect(patch).toMatchObject({ model: 'gpt', fast: true, branch: 'main' })
expect(hasSessionInfoStatePatch(patch)).toBe(true)
expect(hasSessionInfoStatePatch(sessionInfoStatePatch(payload({})))).toBe(false)
})
})
describe('delegateTaskPayloads', () => {
it('returns [] for non-delegate events', () => {
expect(delegateTaskPayloads(payload({ name: 'web_search' }), 'running')).toEqual([])
})
it('maps a running tool.start to a subagent.start spec', () => {
const [spec] = delegateTaskPayloads(
payload({ name: 'delegate_task', tool_id: 't1', args: { goal: 'do it' } }),
'running',
'tool.start'
)
expect(spec).toMatchObject({ event_type: 'subagent.start', goal: 'do it', status: 'running' })
})
it('maps completion (with error) to a failed subagent.complete', () => {
const [spec] = delegateTaskPayloads(
payload({ name: 'delegate_task', error: 'boom', result: { summary: 'failed run' } }),
'complete'
)
expect(spec).toMatchObject({ event_type: 'subagent.complete', status: 'failed' })
})
})

View file

@ -0,0 +1,179 @@
import type { GatewayEventPayload } from '@/lib/chat-messages'
import { normalizePersonalityValue } from '@/lib/chat-runtime'
import type { ClientSessionState } from '../../../types'
type SessionRuntimeStatePatch = Partial<
Pick<
ClientSessionState,
'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo'
>
>
export function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch {
const patch: SessionRuntimeStatePatch = {}
if (typeof payload?.model === 'string') {
patch.model = payload.model || ''
}
if (typeof payload?.provider === 'string') {
patch.provider = payload.provider || ''
}
if (typeof payload?.cwd === 'string') {
patch.cwd = payload.cwd
}
if (typeof payload?.branch === 'string') {
patch.branch = payload.branch
}
if (typeof payload?.personality === 'string') {
patch.personality = normalizePersonalityValue(payload.personality)
}
if (typeof payload?.reasoning_effort === 'string') {
patch.reasoningEffort = payload.reasoning_effort
}
if (typeof payload?.service_tier === 'string') {
patch.serviceTier = payload.service_tier
}
if (typeof payload?.fast === 'boolean') {
patch.fast = payload.fast
}
if (typeof payload?.yolo === 'boolean') {
patch.yolo = payload.yolo
}
return patch
}
export function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean {
return Object.keys(patch).length > 0
}
// Minimum gap between two assistant-text flushes during a stream. Was 16ms
// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every
// token got its own React commit + Streamdown markdown re-parse, scaling
// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens
// batch into one commit at 60 tok/sec without introducing visible lag on the
// streaming text (still 30 fps of visible text growth). Big perceived
// smoothness win on long messages with big trailing paragraphs; see
// `scripts/profile-typing-lag.md` for the measurement work behind this.
export const STREAM_DELTA_FLUSH_MS = 33
// Gateway/provider failures sometimes arrive as message.complete text instead
// of an explicit error event. Treat matches as inline assistant errors so they
// persist like real error events and don't get erased by hydrate fallback.
const COMPLETION_ERROR_PATTERNS = [
/^API call failed after \d+ retries:/i,
/^HTTP\s+\d{3}\b/i,
/^(Provider|Gateway)\s+error:/i
]
export function completionErrorText(finalText: string): string | null {
const text = finalText.trim()
return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null
}
export const SUBAGENT_EVENT_TYPES = new Set([
'subagent.spawn_requested',
'subagent.start',
'subagent.thinking',
'subagent.tool',
'subagent.progress',
'subagent.complete'
])
// Anonymous progress events that carry todos but no name still belong to the
// todo stream; named todo events are obviously routed there too.
export function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined {
if (!payload) {
return undefined
}
const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos'))
return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined
}
function asRecord(value: unknown): Record<string, unknown> {
return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record<string, unknown>) : {}
}
function parseMaybeRecord(value: unknown): Record<string, unknown> {
if (typeof value === 'string') {
try {
return asRecord(JSON.parse(value))
} catch {
return {}
}
}
return asRecord(value)
}
const firstString = (...candidates: unknown[]): string => {
for (const v of candidates) {
if (typeof v === 'string' && v) {
return v
}
}
return ''
}
export function delegateTaskPayloads(
payload: GatewayEventPayload | undefined,
phase: 'running' | 'complete',
sourceEventType?: string
): Record<string, unknown>[] {
if (payload?.name !== 'delegate_task') {
return []
}
const args = parseMaybeRecord(payload.args ?? payload.input)
const result = parseMaybeRecord(payload.result)
const rawTasks = Array.isArray(args.tasks) ? args.tasks : []
const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args]
const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running'
const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task'
const progressText = firstString(payload.preview, payload.message, payload.context)
const eventType =
phase === 'complete'
? 'subagent.complete'
: sourceEventType === 'tool.start'
? 'subagent.start'
: 'subagent.progress'
return tasks.map((task, index) => {
const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task'
const summary = firstString(result.summary, payload.summary, payload.message)
return {
depth: 0,
duration_seconds: payload.duration_s,
goal,
status,
subagent_id: `delegate-tool:${toolId}:${index}`,
summary: summary || undefined,
task_count: tasks.length,
task_index: index,
text: eventType === 'subagent.progress' ? progressText || goal : undefined,
tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined,
tool_preview: eventType === 'subagent.start' ? progressText : undefined,
toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [],
event_type: eventType,
output_tail:
phase === 'complete' && summary
? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }]
: undefined
}
})
}