mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-07-01 12:02:05 +00:00
Merge pull request #54116 from kshitijk4poor/fix/36658-gateway-drain-microtask
fix(tui): defer buffered gateway events to stop dashboard chat #301 (#36658)
This commit is contained in:
commit
f3d8f20a59
2 changed files with 127 additions and 10 deletions
|
|
@ -154,6 +154,79 @@ describe('GatewayClient websocket attach mode', () => {
|
|||
gw.kill()
|
||||
})
|
||||
|
||||
it('drains buffered events on a later microtask, not synchronously inside drain()', async () => {
|
||||
// Regression for #36658: in attach mode the already-running gateway
|
||||
// replays `gateway.ready` the instant the socket connects, so it lands in
|
||||
// bufferedEvents BEFORE the consumer's mount-time subscribe effect runs.
|
||||
// If drain() emitted those synchronously, the gateway.ready handler's
|
||||
// setState cascade would run inside React's first commit -> "Too many
|
||||
// re-renders" (#301). drain() must defer the buffered flush so the first
|
||||
// commit settles first.
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
|
||||
gatewaySocket.open()
|
||||
// Server replays ready BEFORE the consumer subscribes (attach-mode timing):
|
||||
gatewaySocket.message(
|
||||
JSON.stringify({ jsonrpc: '2.0', method: 'event', params: { type: 'gateway.ready', payload: {} } })
|
||||
)
|
||||
|
||||
const order: string[] = []
|
||||
|
||||
gw.on('event', ev => order.push(`event:${ev.type}`))
|
||||
gw.drain()
|
||||
order.push('after-drain')
|
||||
|
||||
// Buffered event must NOT have fired synchronously inside drain():
|
||||
expect(order).toEqual(['after-drain'])
|
||||
|
||||
// ...and must arrive on the next microtask.
|
||||
await vi.waitFor(() => expect(order).toContain('event:gateway.ready'))
|
||||
expect(order).toEqual(['after-drain', 'event:gateway.ready'])
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('preserves FIFO order when a live event arrives before the deferred flush', async () => {
|
||||
// #36658 hardening: `subscribed` must NOT flip synchronously in drain().
|
||||
// A live event delivered in the window between drain() returning and the
|
||||
// deferred microtask running must still queue BEHIND the chronologically
|
||||
// earlier buffered events, not jump ahead of them.
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
|
||||
gatewaySocket.open()
|
||||
// Buffered first (replayed on connect, before subscribe):
|
||||
gatewaySocket.message(
|
||||
JSON.stringify({ jsonrpc: '2.0', method: 'event', params: { type: 'gateway.ready', payload: {} } })
|
||||
)
|
||||
|
||||
const order: string[] = []
|
||||
|
||||
gw.on('event', ev => order.push(ev.type))
|
||||
gw.drain()
|
||||
|
||||
// A LIVE event arrives synchronously in the post-drain / pre-microtask gap:
|
||||
gatewaySocket.message(
|
||||
JSON.stringify({ jsonrpc: '2.0', method: 'event', params: { type: 'session.info', payload: {} } })
|
||||
)
|
||||
|
||||
// Nothing emitted yet (subscribed stays false until the microtask):
|
||||
expect(order).toEqual([])
|
||||
|
||||
await vi.waitFor(() => expect(order.length).toBe(2))
|
||||
// FIFO preserved: the earlier-buffered gateway.ready precedes the live one.
|
||||
expect(order).toEqual(['gateway.ready', 'session.info'])
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('mirrors event frames to sidecar websocket when configured', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
process.env.HERMES_TUI_SIDECAR_URL = 'ws://gateway.test/api/pub?token=abc&channel=demo'
|
||||
|
|
@ -172,6 +245,9 @@ describe('GatewayClient websocket attach mode', () => {
|
|||
|
||||
sidecarSocket.open()
|
||||
gw.drain()
|
||||
// drain() flips `subscribed` on a microtask now (#36658); let it settle so
|
||||
// the subsequent live event takes the synchronous publish path.
|
||||
await Promise.resolve()
|
||||
|
||||
const eventFrame = JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
|
|
@ -206,6 +282,8 @@ describe('GatewayClient websocket attach mode', () => {
|
|||
|
||||
sidecarSocket.open()
|
||||
gw.drain()
|
||||
// drain() flips `subscribed` on a microtask now (#36658); let it settle.
|
||||
await Promise.resolve()
|
||||
|
||||
gw.publishLocalEvent({
|
||||
payload: { reason: 'idle_exit_hotkey' },
|
||||
|
|
@ -227,7 +305,7 @@ describe('GatewayClient websocket attach mode', () => {
|
|||
gw.kill()
|
||||
})
|
||||
|
||||
it('emits exit when attached websocket closes', () => {
|
||||
it('emits exit when attached websocket closes', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
const exits: Array<null | number> = []
|
||||
|
|
@ -239,6 +317,9 @@ describe('GatewayClient websocket attach mode', () => {
|
|||
|
||||
gatewaySocket.open()
|
||||
gw.drain()
|
||||
// drain() flips `subscribed` on a microtask now (#36658); let it settle so
|
||||
// the close below takes the synchronous exit path.
|
||||
await Promise.resolve()
|
||||
gatewaySocket.close(1011)
|
||||
|
||||
expect(exits).toEqual([1011])
|
||||
|
|
|
|||
|
|
@ -146,6 +146,7 @@ export class GatewayClient extends EventEmitter {
|
|||
private ready = false
|
||||
private readyTimer: ReturnType<typeof setTimeout> | null = null
|
||||
private subscribed = false
|
||||
private drainGeneration = 0
|
||||
private stdoutRl: ReturnType<typeof createInterface> | null = null
|
||||
private stderrRl: ReturnType<typeof createInterface> | null = null
|
||||
|
||||
|
|
@ -217,6 +218,10 @@ export class GatewayClient extends EventEmitter {
|
|||
// attached to a discarded child / socket.
|
||||
this.rejectPending(new Error('gateway restarting'))
|
||||
this.ready = false
|
||||
this.subscribed = false
|
||||
// Invalidate any pending deferred drain() flush from a prior transport so
|
||||
// its queued microtask becomes a no-op (it captured the old generation).
|
||||
this.drainGeneration += 1
|
||||
this.bufferedEvents.clear()
|
||||
this.pendingExit = undefined
|
||||
this.stdoutRl?.close()
|
||||
|
|
@ -611,18 +616,49 @@ export class GatewayClient extends EventEmitter {
|
|||
}
|
||||
|
||||
drain() {
|
||||
this.subscribed = true
|
||||
// Defer the buffered-event replay to the next microtask, and DO NOT flip
|
||||
// `subscribed` until that microtask runs.
|
||||
//
|
||||
// `drain()` is called from the consumer's mount-time subscribe effect
|
||||
// (ui-tui/src/app/useMainApp.ts). In *attach* mode the gateway is already
|
||||
// running, so it replays `gateway.ready` / `session.info` the instant the
|
||||
// socket connects — those land in `bufferedEvents` *before* the consumer
|
||||
// subscribes. If we emitted them synchronously here, the `gateway.ready`
|
||||
// handler's `patchUiState` / `setHistoryItems` cascade would run while
|
||||
// React is still inside the first commit, tripping "Too many re-renders"
|
||||
// (Minified React error #301) — issue #36658. Spawn/inline/sidecar modes
|
||||
// don't hit this because `gateway.ready` only arrives after the Python
|
||||
// child boots, i.e. on a later async tick.
|
||||
//
|
||||
// Crucially, `subscribed` stays false until the flush so any LIVE event
|
||||
// arriving in the gap between here and the microtask keeps buffering
|
||||
// (publish() pushes when !subscribed) instead of emitting synchronously
|
||||
// and jumping ahead of the chronologically-earlier replayed events. The
|
||||
// flush re-drains the buffer right after flipping `subscribed`, so any
|
||||
// in-window arrivals are delivered in FIFO order. A generation token makes
|
||||
// the queued microtask a no-op if the transport was reset/killed meanwhile.
|
||||
const generation = this.drainGeneration
|
||||
|
||||
for (const ev of this.bufferedEvents.drain()) {
|
||||
this.emit('event', ev)
|
||||
}
|
||||
queueMicrotask(() => {
|
||||
if (this.drainGeneration !== generation) {
|
||||
return
|
||||
}
|
||||
|
||||
if (this.pendingExit !== undefined) {
|
||||
const code = this.pendingExit
|
||||
this.subscribed = true
|
||||
|
||||
this.pendingExit = undefined
|
||||
this.emit('exit', code)
|
||||
}
|
||||
// Replay everything buffered up to now, then any events that arrived in
|
||||
// the gap before this microtask ran — all in chronological order.
|
||||
for (const ev of this.bufferedEvents.drain()) {
|
||||
this.emit('event', ev)
|
||||
}
|
||||
|
||||
if (this.pendingExit !== undefined) {
|
||||
const code = this.pendingExit
|
||||
|
||||
this.pendingExit = undefined
|
||||
this.emit('exit', code)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getLogTail(limit = 20): string {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue