From 086343854dbf723acfc529d2580c260cc3713d0b Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Tue, 30 Jun 2026 02:58:45 -0500 Subject: [PATCH] refactor(desktop): split use-message-stream into folder + utils Extract the standalone gateway-event helpers (session-info patch derivation, completion-error detection, todo-payload routing, delegate_task -> subagent spec mapping, + the stream-flush/subagent-event constants) out of the 1,285-line hook into a colocated, tested use-message-stream/utils.ts. index.ts keeps the stateful streaming hook and consumes the helpers. Pure restructuring, no behaviour change; folder index keeps the import path intact. index.ts: 1,285 -> ~1,120. Adds unit tests for the pure helpers. --- .../index.ts} | 189 ++---------------- .../hooks/use-message-stream/utils.test.ts | 66 ++++++ .../session/hooks/use-message-stream/utils.ts | 179 +++++++++++++++++ 3 files changed, 257 insertions(+), 177 deletions(-) rename apps/desktop/src/app/session/hooks/{use-message-stream.ts => use-message-stream/index.ts} (87%) create mode 100644 apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts create mode 100644 apps/desktop/src/app/session/hooks/use-message-stream/utils.ts diff --git a/apps/desktop/src/app/session/hooks/use-message-stream.ts b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts similarity index 87% rename from apps/desktop/src/app/session/hooks/use-message-stream.ts rename to apps/desktop/src/app/session/hooks/use-message-stream/index.ts index a455bae06cc..51136a7451f 100644 --- a/apps/desktop/src/app/session/hooks/use-message-stream.ts +++ b/apps/desktop/src/app/session/hooks/use-message-stream/index.ts @@ -2,8 +2,8 @@ import type { QueryClient } from '@tanstack/react-query' import { type MutableRefObject, useCallback, useEffect, useRef } from 'react' import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream' -import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer' +import { closeAgentTerminalByProc } from '@/app/right-sidebar/terminal/terminals' import { translateNow } from '@/i18n' import { appendAssistantTextPart, @@ -61,7 +61,17 @@ import { recordToolDiff } from '@/store/tool-diffs' import { notifyWorkspaceChanged, toolMayMutateFiles } from '@/store/workspace-events' import type { RpcEvent } from '@/types/hermes' -import type { ClientSessionState } from '../../types' +import type { ClientSessionState } from '../../../types' + +import { + completionErrorText, + delegateTaskPayloads, + hasSessionInfoStatePatch, + sessionInfoStatePatch, + STREAM_DELTA_FLUSH_MS, + SUBAGENT_EVENT_TYPES, + toTodoPayload +} from './utils' interface MessageStreamOptions { activeSessionIdRef: MutableRefObject @@ -86,181 +96,6 @@ interface QueuedStreamDeltas { reasoning: string } -type SessionRuntimeStatePatch = Partial< - Pick< - ClientSessionState, - 'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo' - > -> - -function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch { - const patch: SessionRuntimeStatePatch = {} - - if (typeof payload?.model === 'string') { - patch.model = payload.model || '' - } - - if (typeof payload?.provider === 'string') { - patch.provider = payload.provider || '' - } - - if (typeof payload?.cwd === 'string') { - patch.cwd = payload.cwd - } - - if (typeof payload?.branch === 'string') { - patch.branch = payload.branch - } - - if (typeof payload?.personality === 'string') { - patch.personality = normalizePersonalityValue(payload.personality) - } - - if (typeof payload?.reasoning_effort === 'string') { - patch.reasoningEffort = payload.reasoning_effort - } - - if (typeof payload?.service_tier === 'string') { - patch.serviceTier = payload.service_tier - } - - if (typeof payload?.fast === 'boolean') { - patch.fast = payload.fast - } - - if (typeof payload?.yolo === 'boolean') { - patch.yolo = payload.yolo - } - - return patch -} - -function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean { - return Object.keys(patch).length > 0 -} - -// Minimum gap between two assistant-text flushes during a stream. Was 16ms -// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every -// token got its own React commit + Streamdown markdown re-parse, scaling -// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens -// batch into one commit at 60 tok/sec without introducing visible lag on the -// streaming text (still 30 fps of visible text growth). Big perceived -// smoothness win on long messages with big trailing paragraphs; see -// `scripts/profile-typing-lag.md` for the measurement work behind this. -const STREAM_DELTA_FLUSH_MS = 33 - -// Gateway/provider failures sometimes arrive as message.complete text instead -// of an explicit error event. Treat matches as inline assistant errors so they -// persist like real error events and don't get erased by hydrate fallback. -const COMPLETION_ERROR_PATTERNS = [ - /^API call failed after \d+ retries:/i, - /^HTTP\s+\d{3}\b/i, - /^(Provider|Gateway)\s+error:/i -] - -function completionErrorText(finalText: string): string | null { - const text = finalText.trim() - - return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null -} - -const SUBAGENT_EVENT_TYPES = new Set([ - 'subagent.spawn_requested', - 'subagent.start', - 'subagent.thinking', - 'subagent.tool', - 'subagent.progress', - 'subagent.complete' -]) - -// Anonymous progress events that carry todos but no name still belong to the -// todo stream; named todo events are obviously routed there too. -function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined { - if (!payload) { - return undefined - } - - const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos')) - - return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined -} - -function asRecord(value: unknown): Record { - return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {} -} - -function parseMaybeRecord(value: unknown): Record { - if (typeof value === 'string') { - try { - return asRecord(JSON.parse(value)) - } catch { - return {} - } - } - - return asRecord(value) -} - -const firstString = (...candidates: unknown[]): string => { - for (const v of candidates) { - if (typeof v === 'string' && v) { - return v - } - } - - return '' -} - -function delegateTaskPayloads( - payload: GatewayEventPayload | undefined, - phase: 'running' | 'complete', - sourceEventType?: string -): Record[] { - if (payload?.name !== 'delegate_task') { - return [] - } - - const args = parseMaybeRecord(payload.args ?? payload.input) - const result = parseMaybeRecord(payload.result) - const rawTasks = Array.isArray(args.tasks) ? args.tasks : [] - const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args] - const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running' - const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task' - const progressText = firstString(payload.preview, payload.message, payload.context) - - const eventType = - phase === 'complete' - ? 'subagent.complete' - : sourceEventType === 'tool.start' - ? 'subagent.start' - : 'subagent.progress' - - return tasks.map((task, index) => { - const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task' - const summary = firstString(result.summary, payload.summary, payload.message) - - return { - depth: 0, - duration_seconds: payload.duration_s, - goal, - status, - subagent_id: `delegate-tool:${toolId}:${index}`, - summary: summary || undefined, - task_count: tasks.length, - task_index: index, - text: eventType === 'subagent.progress' ? progressText || goal : undefined, - tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined, - tool_preview: eventType === 'subagent.start' ? progressText : undefined, - toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [], - event_type: eventType, - output_tail: - phase === 'complete' && summary - ? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }] - : undefined - } - }) -} - export function useMessageStream({ activeSessionIdRef, hydrateFromStoredSession, diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts b/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts new file mode 100644 index 00000000000..47994355074 --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/utils.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from 'vitest' + +import type { GatewayEventPayload } from '@/lib/chat-messages' + +import { + completionErrorText, + delegateTaskPayloads, + hasSessionInfoStatePatch, + sessionInfoStatePatch, + toTodoPayload +} from './utils' + +const payload = (over: Record): GatewayEventPayload => over as GatewayEventPayload + +describe('completionErrorText', () => { + it('flags provider/HTTP/retry failures, ignores normal text', () => { + expect(completionErrorText('API call failed after 3 retries: boom')).toMatch(/^API call failed/) + expect(completionErrorText('HTTP 500 upstream')).toMatch(/^HTTP 500/) + expect(completionErrorText('Gateway error: nope')).toMatch(/^Gateway error/) + expect(completionErrorText('here is your answer')).toBeNull() + expect(completionErrorText(' ')).toBeNull() + }) +}) + +describe('toTodoPayload', () => { + it('routes named todo and anonymous todos-bearing events to the todo stream', () => { + expect(toTodoPayload(payload({ name: 'todo' }))?.tool_id).toBe('todo-live') + expect(toTodoPayload(payload({ todos: [] }))?.name).toBe('todo') + expect(toTodoPayload(payload({ name: 'web_search' }))).toBeUndefined() + expect(toTodoPayload(undefined)).toBeUndefined() + }) +}) + +describe('sessionInfoStatePatch / hasSessionInfoStatePatch', () => { + it('extracts only present runtime fields', () => { + const patch = sessionInfoStatePatch(payload({ model: 'gpt', fast: true, branch: 'main' })) + expect(patch).toMatchObject({ model: 'gpt', fast: true, branch: 'main' }) + expect(hasSessionInfoStatePatch(patch)).toBe(true) + expect(hasSessionInfoStatePatch(sessionInfoStatePatch(payload({})))).toBe(false) + }) +}) + +describe('delegateTaskPayloads', () => { + it('returns [] for non-delegate events', () => { + expect(delegateTaskPayloads(payload({ name: 'web_search' }), 'running')).toEqual([]) + }) + + it('maps a running tool.start to a subagent.start spec', () => { + const [spec] = delegateTaskPayloads( + payload({ name: 'delegate_task', tool_id: 't1', args: { goal: 'do it' } }), + 'running', + 'tool.start' + ) + + expect(spec).toMatchObject({ event_type: 'subagent.start', goal: 'do it', status: 'running' }) + }) + + it('maps completion (with error) to a failed subagent.complete', () => { + const [spec] = delegateTaskPayloads( + payload({ name: 'delegate_task', error: 'boom', result: { summary: 'failed run' } }), + 'complete' + ) + + expect(spec).toMatchObject({ event_type: 'subagent.complete', status: 'failed' }) + }) +}) diff --git a/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts b/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts new file mode 100644 index 00000000000..d9a90b36692 --- /dev/null +++ b/apps/desktop/src/app/session/hooks/use-message-stream/utils.ts @@ -0,0 +1,179 @@ +import type { GatewayEventPayload } from '@/lib/chat-messages' +import { normalizePersonalityValue } from '@/lib/chat-runtime' + +import type { ClientSessionState } from '../../../types' + +type SessionRuntimeStatePatch = Partial< + Pick< + ClientSessionState, + 'branch' | 'cwd' | 'fast' | 'model' | 'personality' | 'provider' | 'reasoningEffort' | 'serviceTier' | 'yolo' + > +> + +export function sessionInfoStatePatch(payload: GatewayEventPayload | undefined): SessionRuntimeStatePatch { + const patch: SessionRuntimeStatePatch = {} + + if (typeof payload?.model === 'string') { + patch.model = payload.model || '' + } + + if (typeof payload?.provider === 'string') { + patch.provider = payload.provider || '' + } + + if (typeof payload?.cwd === 'string') { + patch.cwd = payload.cwd + } + + if (typeof payload?.branch === 'string') { + patch.branch = payload.branch + } + + if (typeof payload?.personality === 'string') { + patch.personality = normalizePersonalityValue(payload.personality) + } + + if (typeof payload?.reasoning_effort === 'string') { + patch.reasoningEffort = payload.reasoning_effort + } + + if (typeof payload?.service_tier === 'string') { + patch.serviceTier = payload.service_tier + } + + if (typeof payload?.fast === 'boolean') { + patch.fast = payload.fast + } + + if (typeof payload?.yolo === 'boolean') { + patch.yolo = payload.yolo + } + + return patch +} + +export function hasSessionInfoStatePatch(patch: SessionRuntimeStatePatch): boolean { + return Object.keys(patch).length > 0 +} + +// Minimum gap between two assistant-text flushes during a stream. Was 16ms +// (rAF only), which at typical LLM token rates of ~30-80 tok/sec meant every +// token got its own React commit + Streamdown markdown re-parse, scaling +// linearly with the growing last-block length. Bumping to 33ms lets ~2 tokens +// batch into one commit at 60 tok/sec without introducing visible lag on the +// streaming text (still 30 fps of visible text growth). Big perceived +// smoothness win on long messages with big trailing paragraphs; see +// `scripts/profile-typing-lag.md` for the measurement work behind this. +export const STREAM_DELTA_FLUSH_MS = 33 + +// Gateway/provider failures sometimes arrive as message.complete text instead +// of an explicit error event. Treat matches as inline assistant errors so they +// persist like real error events and don't get erased by hydrate fallback. +const COMPLETION_ERROR_PATTERNS = [ + /^API call failed after \d+ retries:/i, + /^HTTP\s+\d{3}\b/i, + /^(Provider|Gateway)\s+error:/i +] + +export function completionErrorText(finalText: string): string | null { + const text = finalText.trim() + + return text && COMPLETION_ERROR_PATTERNS.some(re => re.test(text)) ? text : null +} + +export const SUBAGENT_EVENT_TYPES = new Set([ + 'subagent.spawn_requested', + 'subagent.start', + 'subagent.thinking', + 'subagent.tool', + 'subagent.progress', + 'subagent.complete' +]) + +// Anonymous progress events that carry todos but no name still belong to the +// todo stream; named todo events are obviously routed there too. +export function toTodoPayload(payload: GatewayEventPayload | undefined): GatewayEventPayload | undefined { + if (!payload) { + return undefined + } + + const isTodo = payload.name === 'todo' || (!payload.name && Object.hasOwn(payload, 'todos')) + + return isTodo ? { ...payload, name: 'todo', tool_id: payload.tool_id || 'todo-live' } : undefined +} + +function asRecord(value: unknown): Record { + return value && typeof value === 'object' && !Array.isArray(value) ? (value as Record) : {} +} + +function parseMaybeRecord(value: unknown): Record { + if (typeof value === 'string') { + try { + return asRecord(JSON.parse(value)) + } catch { + return {} + } + } + + return asRecord(value) +} + +const firstString = (...candidates: unknown[]): string => { + for (const v of candidates) { + if (typeof v === 'string' && v) { + return v + } + } + + return '' +} + +export function delegateTaskPayloads( + payload: GatewayEventPayload | undefined, + phase: 'running' | 'complete', + sourceEventType?: string +): Record[] { + if (payload?.name !== 'delegate_task') { + return [] + } + + const args = parseMaybeRecord(payload.args ?? payload.input) + const result = parseMaybeRecord(payload.result) + const rawTasks = Array.isArray(args.tasks) ? args.tasks : [] + const tasks = rawTasks.length ? rawTasks.map(parseMaybeRecord) : [args] + const status = phase === 'complete' ? (payload.error ? 'failed' : 'completed') : 'running' + const toolId = payload.tool_id || payload.tool_call_id || payload.id || 'delegate_task' + const progressText = firstString(payload.preview, payload.message, payload.context) + + const eventType = + phase === 'complete' + ? 'subagent.complete' + : sourceEventType === 'tool.start' + ? 'subagent.start' + : 'subagent.progress' + + return tasks.map((task, index) => { + const goal = firstString(task.goal, args.goal, payload.context) || 'Delegated task' + const summary = firstString(result.summary, payload.summary, payload.message) + + return { + depth: 0, + duration_seconds: payload.duration_s, + goal, + status, + subagent_id: `delegate-tool:${toolId}:${index}`, + summary: summary || undefined, + task_count: tasks.length, + task_index: index, + text: eventType === 'subagent.progress' ? progressText || goal : undefined, + tool_name: eventType === 'subagent.start' ? 'delegate_task' : undefined, + tool_preview: eventType === 'subagent.start' ? progressText : undefined, + toolsets: Array.isArray(task.toolsets) ? task.toolsets : Array.isArray(args.toolsets) ? args.toolsets : [], + event_type: eventType, + output_tail: + phase === 'complete' && summary + ? [{ is_error: Boolean(payload.error), preview: summary, tool: 'delegate_task' }] + : undefined + } + }) +}