fix(tui): clean force-send of queued messages (#40235)

Force-sending a queued message (double-empty-enter, or interrupt-mode
submit) flipped busy→false optimistically, so the queue drain raced the
still-unwinding turn: duplicate user bubble, a stray "queued: …" note, and
the cancelled turn's "Operation interrupted…" reply leaking in.

interruptTurn gains `keepBusy`: hold busy until the gateway's real settle
edge (message.complete, suppressed while interrupted), which drains the
queued message exactly once — desktop "send now" parity. The interrupt
paths now queue + interrupt instead of optimistically sending.
This commit is contained in:
brooklyn! 2026-06-05 20:39:10 -05:00 committed by GitHub
parent ac177cea87
commit e375c33f70
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 71 additions and 25 deletions

View file

@ -1114,6 +1114,43 @@ describe('createGatewayEventHandler', () => {
}
})
it('keepBusy interrupt holds busy until the gateway settles and suppresses the cancelled turns final_response', () => {
// Force-send: interrupt holds busy so the drain waits for the real settle
// instead of racing it (the race duplicated the bubble, leaked a "queued: …"
// note, and surfaced the cancelled turn's "Operation interrupted…" reply).
const appended: Msg[] = []
const ctx = buildCtx(appended)
ctx.gateway.gw.request = vi.fn(async () => ({ status: 'interrupted' }))
const onEvent = createGatewayEventHandler(ctx)
patchUiState({ sid: 'sess-1' })
onEvent({ payload: {}, type: 'message.start' } as any)
onEvent({ payload: { text: 'thinking…' }, type: 'reasoning.delta' } as any)
expect(getUiState().busy).toBe(true)
turnController.interruptTurn(
{ appendMessage: (msg: Msg) => appended.push(msg), gw: ctx.gateway.gw, sid: 'sess-1', sys: ctx.system.sys },
{ keepBusy: true }
)
// Held busy: the drain effect keys off busy→false, so it must not fire yet.
expect(getUiState().busy).toBe(true)
// The cancelled turn settles with a backend interrupted final_response.
const before = appended.length
onEvent({
payload: { text: 'Operation interrupted: waiting for model response (4.1s elapsed).' },
type: 'message.complete'
} as any)
// Settle flips busy false (the single drain edge) and the backend
// "Operation interrupted…" line is suppressed (not appended).
expect(getUiState().busy).toBe(false)
expect(appended.slice(before).some(m => typeof m.text === 'string' && m.text.includes('Operation interrupted'))).toBe(
false
)
})
it('persists an abandoned (timed-out) clarify into the transcript when the clarify tool completes', () => {
const appended: Msg[] = []
const onEvent = createGatewayEventHandler(buildCtx(appended))

View file

@ -182,7 +182,12 @@ class TurnController {
resetFlowOverlays()
}
interruptTurn({ appendMessage, gw, sid, sys }: InterruptDeps) {
// `keepBusy` holds the session busy after interrupting so a queued message
// drains on the gateway's real settle edge (message.complete, suppressed
// while `interrupted`) instead of racing the still-unwinding turn — the race
// duplicated the user bubble, leaked a "queued: …" note, and surfaced the
// cancelled turn's "[interrupted]" reply.
interruptTurn({ appendMessage, gw, sid, sys }: InterruptDeps, opts: { keepBusy?: boolean } = {}) {
this.interrupted = true
gw.request<SessionInterruptResponse>('session.interrupt', { session_id: sid }).catch(() => {})
@ -218,9 +223,17 @@ class TurnController {
sys('interrupted')
}
patchUiState({ status: 'interrupted' })
this.clearStatusTimer()
if (opts.keepBusy) {
// `idle()` already cleared busy; re-assert it so the drain waits for settle.
patchUiState({ busy: true, status: 'interrupting…' })
return
}
patchUiState({ status: 'interrupted' })
this.statusTimer = setTimeout(() => {
this.statusTimer = null
patchUiState({ status: 'ready' })

View file

@ -220,25 +220,28 @@ export function useSubmission(opts: UseSubmissionOptions) {
// - 'steer' : inject into the current turn via session.steer; falls
// back to queue when steer is rejected (no agent / no
// tool window).
// - 'interrupt' (default): cancel the in-flight turn, then send the
// new text as a fresh prompt so it actually moves.
// - 'interrupt' (default): queue the text + interrupt with `keepBusy`; the
// busy→false settle edge drains it once (desktop parity).
// No optimistic send → no duplicate bubble / race note.
//
// `opts.fallbackToFront` controls whether a steer fallback re-inserts
// at the front of the queue (used by the queue-edit path to preserve
// a picked item's position); the mainline submit path always appends.
// `opts.fallbackToFront` re-inserts at the queue head (queue-edit picks keep
// their position); the mainline submit path appends.
const handleBusyInput = useCallback(
(full: string, opts: { fallbackToFront?: boolean } = {}) => {
const live = getUiState()
const mode = live.busyInputMode
const fallback = (note: string) => {
const enqueueText = () => {
if (opts.fallbackToFront) {
composerRefs.queueRef.current.unshift(full)
composerActions.syncQueue()
} else {
composerActions.enqueue(full)
}
}
const fallback = (note: string) => {
enqueueText()
sys(note)
}
@ -260,25 +263,14 @@ export function useSubmission(opts: UseSubmissionOptions) {
return
}
// 'interrupt' (default): tear down the current turn, then send.
// `interruptTurn` fires `session.interrupt` without awaiting; if
// the gateway is still mid-response when `prompt.submit` lands,
// `send()`'s catch path re-queues with a "queued: ..." sys note
// (`isSessionBusyError`) — so a lost race degrades to queue
// semantics, not a dropped message.
// 'interrupt': queue + interrupt(keepBusy); the settle edge drains it once.
enqueueText()
if (live.sid) {
turnController.interruptTurn({ appendMessage, gw, sid: live.sid, sys })
turnController.interruptTurn({ appendMessage, gw, sid: live.sid, sys }, { keepBusy: true })
}
if (hasInterpolation(full)) {
patchUiState({ busy: true })
return interpolate(full, send)
}
send(full)
},
[appendMessage, composerActions, composerRefs, gw, interpolate, send, sys]
[appendMessage, composerActions, composerRefs, gw, sys]
)
const dispatchSubmission = useCallback(
@ -380,7 +372,11 @@ export function useSubmission(opts: UseSubmissionOptions) {
lastEmptyAt.current = now
if (doubleTap && live.busy && live.sid) {
return turnController.interruptTurn({ appendMessage, gw, sid: live.sid, sys })
// Force-send: keep busy when a message is queued so the settle edge
// drains it once (no race). Empty queue = plain Stop → 'ready'.
const hasQueued = composerRefs.queueRef.current.length > 0
return turnController.interruptTurn({ appendMessage, gw, sid: live.sid, sys }, { keepBusy: hasQueued })
}
if (doubleTap && live.sid && composerRefs.queueRef.current.length) {