fix(desktop): stop stranding queued prompts across backend bounces

A prompt typed mid-turn ("ghost bubble") could stick forever and never
send when the backend restarted/reconnected during the turn. Two fragile
assumptions in the composer queue drain caused it:

1. Drain fired ONLY on an observed busy true→false edge. A remount/
   reconnect resets `previousBusyRef` to the current busy value, so the
   settle edge is swallowed and the queue never drains. Replace
   `shouldAutoDrainOnSettle` with the edge-independent `shouldAutoDrain`
   (idle + non-empty), driven on the settle edge, on mount/reconnect, and
   after a re-key. The drain lock still serializes sends.

2. The queue is keyed by `queueSessionKey || sessionId`. When a backend
   resume mints a new runtime session id for the same conversation, the
   entry strands under the dead key. Pass the *stable* stored id as
   `queueSessionKey` so the composer can tell runtime churn from a real
   session switch, and `migrateQueuedPrompts` re-keys pending entries on a
   runtime-id change only (never on a deliberate switch).

Also make the drain resilient to a thrown/rejected onSubmit (e.g. a stale-
session 404): the entry stays queued and is retried on the next idle, with
a per-entry attempt cap (MAX_AUTO_DRAIN_ATTEMPTS) to avoid spin-loops and a
quiet toast once it gives up. A manual send clears the backoff.

Tests: composer-queue covers edge-free drain + re-key migration;
use-prompt-actions covers rejected-drain-keeps-entry + idle retry sends.
This commit is contained in:
Brooklyn Nicholson 2026-06-13 00:20:51 -05:00
parent 7d183f6497
commit bf090deed3
10 changed files with 218 additions and 70 deletions

View file

@ -43,13 +43,16 @@ import {
import {
$queuedPromptsBySession,
enqueueQueuedPrompt,
MAX_AUTO_DRAIN_ATTEMPTS,
migrateQueuedPrompts,
promoteQueuedPrompt,
type QueuedPromptEntry,
removeQueuedPrompt,
shouldAutoDrainOnSettle,
shouldAutoDrain,
updateQueuedPrompt
} from '@/store/composer-queue'
import { $statusItemsBySession } from '@/store/composer-status'
import { notify } from '@/store/notifications'
import { $gatewayState, $messages, setSessionPickerOpen } from '@/store/session'
import { $threadScrolledUp } from '@/store/thread-scroll'
import { useTheme } from '@/themes'
@ -196,11 +199,14 @@ export function ChatBar({
const composerSurfaceRef = useRef<HTMLDivElement | null>(null)
const editorRef = useRef<HTMLDivElement | null>(null)
const draftRef = useRef(draft)
const previousBusyRef = useRef(busy)
const pendingDraftPersistRef = useRef<{ scope: string | null; text: string } | null>(null)
const activeQueueSessionKeyRef = useRef(activeQueueSessionKey)
activeQueueSessionKeyRef.current = activeQueueSessionKey
const prevQueueKeyRef = useRef(activeQueueSessionKey)
const drainingQueueRef = useRef(false)
// Per-entry auto-drain failure counts; bounds retries so a persistent 404
// can't spin-loop. Cleared on success; reset naturally on remount/reconnect.
const drainFailuresRef = useRef(new Map<string, number>())
const urlInputRef = useRef<HTMLInputElement | null>(null)
const [urlOpen, setUrlOpen] = useState(false)
@ -1326,6 +1332,7 @@ export function ChatBar({
return false
}
drainFailuresRef.current.delete(entry.id)
removeQueuedPrompt(activeQueueSessionKey, entry.id)
resetBrowseState(sessionId)
@ -1337,16 +1344,17 @@ export function ChatBar({
[activeQueueSessionKey, onSubmit, queuedPrompts, sessionId]
)
const drainNextQueued = useCallback(
() =>
runDrain(entries => {
const skip = queueEdit?.entryId
const pickDrainHead = useCallback(
(entries: QueuedPromptEntry[]) => {
const skip = queueEditRef.current?.entryId
return skip ? entries.find(e => e.id !== skip) : entries[0]
}),
[queueEdit, runDrain]
return skip ? entries.find(e => e.id !== skip) : entries[0]
},
[] // reads the edit id off a ref so the lock-holder always sees the latest
)
const drainNextQueued = useCallback(() => runDrain(pickDrainHead), [pickDrainHead, runDrain])
const sendQueuedNow = useCallback(
(id: string) => {
if (!activeQueueSessionKey || id === queueEdit?.entryId) {
@ -1364,30 +1372,72 @@ export function ChatBar({
return true
}
// A manual send clears the auto-drain backoff so a stuck entry the user
// taps gets a fresh attempt (and re-enables auto-retry on success).
drainFailuresRef.current.delete(id)
return runDrain(entries => entries.find(e => e.id === id))
},
[activeQueueSessionKey, busy, onCancel, queueEdit, runDrain]
)
// Auto-drain on busy → false (turn settled). Queued turns always flow once
// the session is idle again — whether the turn finished naturally or the
// user interrupted it. Interrupting to reach a queued message is the whole
// point of the queue, so we never suppress the drain. To cancel queued
// turns, the user deletes them from the panel.
useEffect(() => {
const wasBusy = previousBusyRef.current
previousBusyRef.current = busy
if (
shouldAutoDrainOnSettle({
isBusy: busy,
queueLength: queuedPrompts.length,
wasBusy
})
) {
void drainNextQueued()
// Edge-independent auto-drain: send the head whenever the session is idle and
// the queue is non-empty, bounding retries so a thrown/rejected onSubmit (e.g.
// a stale-session 404) can't strand the entry permanently nor spin-loop. The
// drain lock serializes sends; a remount/reconnect resets the failure counts.
const autoDrainNext = useCallback(() => {
if (busy || drainingQueueRef.current || !activeQueueSessionKey) {
return
}
}, [busy, drainNextQueued, queuedPrompts.length])
const entry = pickDrainHead(queuedPrompts)
if (!entry || (drainFailuresRef.current.get(entry.id) ?? 0) >= MAX_AUTO_DRAIN_ATTEMPTS) {
return
}
const onFail = () => {
const fails = (drainFailuresRef.current.get(entry.id) ?? 0) + 1
drainFailuresRef.current.set(entry.id, fails)
if (fails >= MAX_AUTO_DRAIN_ATTEMPTS) {
notify({
id: 'composer-queue-stuck',
kind: 'error',
title: t.composer.queueStuckTitle,
message: t.composer.queueStuckBody
})
}
}
void runDrain(() => entry)
.then(sent => void (sent ? undefined : onFail()))
.catch(onFail)
}, [activeQueueSessionKey, busy, pickDrainHead, queuedPrompts, runDrain, t])
// Re-key on a runtime session-id change. A stable stored id (queueSessionKey)
// never churns, so a change there is a real session switch and must NOT
// migrate; only the runtime-derived key (queueSessionKey falsy → key is
// sessionId) churns on a backend bounce/resume of the same conversation.
useEffect(() => {
const prev = prevQueueKeyRef.current
prevQueueKeyRef.current = activeQueueSessionKey
if (queueSessionKey || !prev || !activeQueueSessionKey || prev === activeQueueSessionKey) {
return
}
migrateQueuedPrompts(prev, activeQueueSessionKey)
}, [activeQueueSessionKey, queueSessionKey])
// Queued turns flow whenever the session is idle — on the busy→false settle
// edge, on mount/reconnect, and after a re-key — so a swallowed edge can't
// strand them. To cancel queued turns, the user deletes them from the panel.
useEffect(() => {
if (shouldAutoDrain({ isBusy: busy, queueLength: queuedPrompts.length })) {
autoDrainNext()
}
}, [activeQueueSessionKey, autoDrainNext, busy, queuedPrompts.length])
// Queue-edit cleanup: on session swap the scope effect already stashed the
// edit snapshot; only restore into the composer when still on the same scope.

View file

@ -436,7 +436,7 @@ export function ChatView({
onSteer={onSteer}
onSubmit={onSubmit}
onTranscribeAudio={onTranscribeAudio}
queueSessionKey={selectedSessionId || activeSessionId}
queueSessionKey={selectedSessionId}
sessionId={activeSessionId}
state={chatBarState}
/>

View file

@ -325,6 +325,45 @@ describe('usePromptActions submit / queue drain semantics', () => {
})
})
it('a rejected fromQueue drain returns false (entry stays queued) and a later retry sends it', async () => {
// A stale-session 404 must not strand the queued entry: submitPrompt returns
// false on failure so the composer keeps it, and the edge-independent
// auto-drain re-attempts once the session is idle again. storedSessionId is
// null so the session.resume recovery path is skipped and the error surfaces.
let attempt = 0
const requestGateway = vi.fn(async (method: string) => {
if (method === 'prompt.submit') {
attempt += 1
if (attempt === 1) {
throw new Error('404: {"detail":"Session not found"}')
}
}
return {} as never
})
let handle: HarnessHandle | null = null
render(
<Harness
onReady={h => (handle = h)}
refreshSessions={async () => undefined}
requestGateway={requestGateway}
storedSessionId={null}
/>
)
const first = await handle!.submitText('please send me', { fromQueue: true })
expect(first).toBe(false)
const second = await handle!.submitText('please send me', { fromQueue: true })
expect(second).toBe(true)
expect(requestGateway).toHaveBeenCalledWith('prompt.submit', {
session_id: RUNTIME_SESSION_ID,
text: 'please send me'
})
})
it('a normal (non-queue) submit still respects the busyRef guard', async () => {
const busyRef = { current: true }
const requestGateway = vi.fn(async () => ({}) as never)

View file

@ -1210,6 +1210,8 @@ export const en: Translations = {
queueSendNext: 'Next',
queueSend: 'Send',
queueDelete: 'Delete',
queueStuckTitle: 'Queued message not sent',
queueStuckBody: 'A queued turn kept failing to send. It is still in the queue — try sending it again.',
previewUnavailable: 'Preview unavailable',
previewLabel: label => `Preview ${label}`,
couldNotPreview: label => `Could not preview ${label}`,

View file

@ -1348,6 +1348,8 @@ export const ja = defineLocale({
queueSendNext: '次に送信',
queueSend: '送信',
queueDelete: '削除',
queueStuckTitle: 'キュー内のメッセージを送信できません',
queueStuckBody: 'キューに入れたターンの送信が繰り返し失敗しました。まだキューに残っています。もう一度送信してください。',
previewUnavailable: 'プレビューは利用できません',
previewLabel: label => `${label} のプレビュー`,
couldNotPreview: label => `${label} をプレビューできませんでした`,

View file

@ -925,6 +925,8 @@ export interface Translations {
queueSendNext: string
queueSend: string
queueDelete: string
queueStuckTitle: string
queueStuckBody: string
previewUnavailable: string
previewLabel: (label: string) => string
couldNotPreview: (label: string) => string

View file

@ -1304,6 +1304,8 @@ export const zhHant = defineLocale({
queueSendNext: '下一個',
queueSend: '傳送',
queueDelete: '刪除',
queueStuckTitle: '佇列訊息未送出',
queueStuckBody: '佇列中的對話多次傳送失敗。它仍在佇列中,請重試傳送。',
previewUnavailable: '預覽不可用',
previewLabel: label => `預覽 ${label}`,
couldNotPreview: label => `無法預覽 ${label}`,

View file

@ -1398,6 +1398,8 @@ export const zh: Translations = {
queueSendNext: '下一个',
queueSend: '发送',
queueDelete: '删除',
queueStuckTitle: '排队消息未发送',
queueStuckBody: '排队的对话多次发送失败。它仍在队列中,请重试发送。',
previewUnavailable: '预览不可用',
previewLabel: label => `预览 ${label}`,
couldNotPreview: label => `无法预览 ${label}`,

View file

@ -7,9 +7,10 @@ import {
dequeueQueuedPrompt,
enqueueQueuedPrompt,
getQueuedPrompts,
migrateQueuedPrompts,
promoteQueuedPrompt,
removeQueuedPrompt,
shouldAutoDrainOnSettle,
shouldAutoDrain,
updateQueuedPrompt,
updateQueuedPromptText
} from './composer-queue'
@ -117,32 +118,53 @@ describe('composer queue store', () => {
})
})
describe('shouldAutoDrainOnSettle', () => {
const base = { isBusy: false, queueLength: 1, wasBusy: true }
it('drains the next queued prompt when a turn settles', () => {
expect(shouldAutoDrainOnSettle(base)).toBe(true)
describe('migrateQueuedPrompts', () => {
beforeEach(() => {
window.localStorage.removeItem(QUEUE_STORAGE_KEY)
$queuedPromptsBySession.set({})
})
it('drains after an interrupt — the settle edge is the same', () => {
// Interrupting to reach a queued message is the point of the queue; the
// gateway emits the same settle whether the turn finished or was stopped.
expect(shouldAutoDrainOnSettle(base)).toBe(true)
it('moves entries from a dead runtime key onto the live one', () => {
enqueueQueuedPrompt('rt-old', { attachments: [], text: 'stranded' })
expect(migrateQueuedPrompts('rt-old', 'rt-new')).toBe(true)
expect(getQueuedPrompts('rt-old')).toEqual([])
expect(getQueuedPrompts('rt-new').map(e => e.text)).toEqual(['stranded'])
// The dead key is dropped from the store entirely.
expect($queuedPromptsBySession.get()['rt-old']).toBeUndefined()
})
it('does not drain when the queue is empty', () => {
expect(shouldAutoDrainOnSettle({ ...base, queueLength: 0 })).toBe(false)
it('appends after existing target entries (FIFO preserved)', () => {
enqueueQueuedPrompt('rt-new', { attachments: [], text: 'already here' })
enqueueQueuedPrompt('rt-old', { attachments: [], text: 'migrated' })
migrateQueuedPrompts('rt-old', 'rt-new')
expect(getQueuedPrompts('rt-new').map(e => e.text)).toEqual(['already here', 'migrated'])
})
it('ignores steady busy state (no true → false transition)', () => {
expect(shouldAutoDrainOnSettle({ ...base, isBusy: true })).toBe(false)
})
it('ignores busy entry (false → true, not a settle)', () => {
expect(shouldAutoDrainOnSettle({ ...base, isBusy: true, wasBusy: false })).toBe(false)
})
it('ignores steady idle state (was not busy)', () => {
expect(shouldAutoDrainOnSettle({ ...base, wasBusy: false })).toBe(false)
it('is a no-op when source is empty or keys match', () => {
expect(migrateQueuedPrompts('rt-old', 'rt-new')).toBe(false)
expect(migrateQueuedPrompts('rt-x', 'rt-x')).toBe(false)
})
})
describe('shouldAutoDrain', () => {
it('drains whenever idle with a non-empty queue', () => {
expect(shouldAutoDrain({ isBusy: false, queueLength: 1 })).toBe(true)
})
it('drains on mount/reconnect with no observed busy edge', () => {
// The whole point of dropping the edge: a remount resets the busy ref, so an
// edge-gated drain would strand the entry. Idle + non-empty must still fire.
expect(shouldAutoDrain({ isBusy: false, queueLength: 2 })).toBe(true)
})
it('does not drain mid-turn', () => {
expect(shouldAutoDrain({ isBusy: true, queueLength: 1 })).toBe(false)
})
it('does not drain an empty queue', () => {
expect(shouldAutoDrain({ isBusy: false, queueLength: 0 })).toBe(false)
})
})

View file

@ -209,31 +209,58 @@ export const clearQueuedPrompts = (key: string | null | undefined) => {
writeSession(sid, [])
}
/** Inputs to {@link shouldAutoDrainOnSettle}, captured at a `busy` transition. */
export interface AutoDrainSettleInput {
wasBusy: boolean
/**
* Move pending entries from a dead session key onto a live one, preserving FIFO
* (existing target entries first, migrated entries appended). A backend bounce /
* resume can mint a fresh runtime session id for the *same* conversation; the
* entries enqueued under the old id would otherwise be stranded under a key
* nothing reads anymore. No-op unless both keys resolve and differ.
*/
export const migrateQueuedPrompts = (
fromKey: string | null | undefined,
toKey: string | null | undefined
): boolean => {
const from = sidOf(fromKey)
const to = sidOf(toKey)
if (!from || !to || from === to) {
return false
}
const pending = queueFor(from)
if (pending.length === 0) {
return false
}
const next = { ...$queuedPromptsBySession.get() }
delete next[from]
next[to] = [...queueFor(to), ...pending]
$queuedPromptsBySession.set(next)
save(next)
return true
}
/** Inputs to {@link shouldAutoDrain}. */
export interface AutoDrainInput {
isBusy: boolean
queueLength: number
}
/**
* Decide whether the composer should auto-drain the next queued prompt when a
* turn settles (busy transitions true false).
* Decide whether the composer should auto-drain the next queued prompt.
*
* Queued turns always advance once the session is idle again, whether the turn
* finished naturally or the user interrupted it. Interrupting to reach a queued
* message is the whole point of the queue, so we never suppress the drain. The
* gateway guarantees a settle (message.complete + session.info running:false)
* even after an interrupt, so this single edge reliably advances the queue. To
* cancel queued turns the user deletes them from the panel.
* Edge-independent on purpose: the queue must advance whenever the session is
* idle and has pending entries, NOT only on an observed busy true false edge.
* A backend bounce / websocket reconnect remounts the composer and resets the
* busy ref to the current value, swallowing the settle edge an edge-gated
* drain would then strand the entry forever. The caller's drain lock
* (`drainingQueueRef`) serializes sends so being edge-free can't double-submit.
*/
export const shouldAutoDrainOnSettle = (params: AutoDrainSettleInput): boolean => {
const { isBusy, queueLength, wasBusy } = params
export const shouldAutoDrain = ({ isBusy, queueLength }: AutoDrainInput): boolean => !isBusy && queueLength > 0
// Only react to a true → false transition; ignore steady state and entry.
if (isBusy || !wasBusy) {
return false
}
return queueLength > 0
}
/** Auto-drain attempts for one entry before we stop retrying and toast. The
* entry stays queued for a manual send; a remount/reconnect resets the count. */
export const MAX_AUTO_DRAIN_ATTEMPTS = 4