diff --git a/ui-tui/src/__tests__/gatewayClient.test.ts b/ui-tui/src/__tests__/gatewayClient.test.ts new file mode 100644 index 0000000000..eac96c2078 --- /dev/null +++ b/ui-tui/src/__tests__/gatewayClient.test.ts @@ -0,0 +1,386 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' + +import { GatewayClient } from '../gatewayClient.js' + +interface ListenerEntry { + callback: (event: any) => void + once: boolean +} + +class FakeWebSocket { + static CONNECTING = 0 + static OPEN = 1 + static CLOSING = 2 + static CLOSED = 3 + static instances: FakeWebSocket[] = [] + + readyState = FakeWebSocket.CONNECTING + sent: string[] = [] + readonly url: string + private listeners = new Map() + + constructor(url: string) { + this.url = url + FakeWebSocket.instances.push(this) + } + + static reset() { + FakeWebSocket.instances = [] + } + + addEventListener(type: string, callback: (event: any) => void, options?: unknown) { + const once = + typeof options === 'object' && + options !== null && + 'once' in options && + Boolean((options as { once?: unknown }).once) + const entries = this.listeners.get(type) ?? [] + + entries.push({ callback, once }) + this.listeners.set(type, entries) + } + + removeEventListener(type: string, callback: (event: any) => void) { + const entries = this.listeners.get(type) + + if (!entries) { + return + } + + this.listeners.set( + type, + entries.filter(entry => entry.callback !== callback) + ) + } + + send(payload: string) { + if (this.readyState !== FakeWebSocket.OPEN) { + throw new Error('socket not open') + } + + this.sent.push(payload) + } + + close(code = 1000) { + if (this.readyState === FakeWebSocket.CLOSED) { + return + } + + this.readyState = FakeWebSocket.CLOSED + this.emit('close', { code }) + } + + open() { + this.readyState = FakeWebSocket.OPEN + this.emit('open', {}) + } + + message(data: string) { + this.emit('message', { data }) + } + + private emit(type: string, event: any) { + const entries = [...(this.listeners.get(type) ?? [])] + + for (const entry of entries) { + entry.callback(event) + if (entry.once) { + this.removeEventListener(type, entry.callback) + } + } + } +} + +describe('GatewayClient websocket attach mode', () => { + const originalWebSocket = globalThis.WebSocket + let originalGatewayUrl: string | undefined + let originalSidecarUrl: string | undefined + + beforeEach(() => { + originalGatewayUrl = process.env.HERMES_TUI_GATEWAY_URL + originalSidecarUrl = process.env.HERMES_TUI_SIDECAR_URL + FakeWebSocket.reset() + ;(globalThis as { WebSocket?: unknown }).WebSocket = FakeWebSocket as unknown as typeof WebSocket + }) + + afterEach(() => { + if (originalGatewayUrl === undefined) { + delete process.env.HERMES_TUI_GATEWAY_URL + } else { + process.env.HERMES_TUI_GATEWAY_URL = originalGatewayUrl + } + + if (originalSidecarUrl === undefined) { + delete process.env.HERMES_TUI_SIDECAR_URL + } else { + process.env.HERMES_TUI_SIDECAR_URL = originalSidecarUrl + } + + FakeWebSocket.reset() + + if (originalWebSocket) { + globalThis.WebSocket = originalWebSocket + } else { + delete (globalThis as { WebSocket?: unknown }).WebSocket + } + }) + + it('waits for websocket open and resolves RPC requests', async () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc' + const gw = new GatewayClient() + + gw.start() + const gatewaySocket = FakeWebSocket.instances[0]! + const req = gw.request<{ ok: boolean }>('session.create', { cols: 80 }) + + expect(gatewaySocket.sent).toHaveLength(0) + gatewaySocket.open() + await vi.waitFor(() => expect(gatewaySocket.sent).toHaveLength(1)) + + const frame = JSON.parse(gatewaySocket.sent[0] ?? '{}') as { id: string; method: string } + expect(frame.method).toBe('session.create') + + gatewaySocket.message(JSON.stringify({ id: frame.id, jsonrpc: '2.0', result: { ok: true } })) + await expect(req).resolves.toEqual({ ok: true }) + + gw.kill() + }) + + it('mirrors event frames to sidecar websocket when configured', async () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc' + process.env.HERMES_TUI_SIDECAR_URL = 'ws://gateway.test/api/pub?token=abc&channel=demo' + + const gw = new GatewayClient() + const seen: string[] = [] + + gw.on('event', ev => seen.push(ev.type)) + gw.start() + + const gatewaySocket = FakeWebSocket.instances[0]! + gatewaySocket.open() + await vi.waitFor(() => expect(FakeWebSocket.instances).toHaveLength(2)) + + const sidecarSocket = FakeWebSocket.instances[1]! + + sidecarSocket.open() + gw.drain() + + const eventFrame = JSON.stringify({ + jsonrpc: '2.0', + method: 'event', + params: { type: 'tool.start', payload: { tool_id: 't1' } } + }) + gatewaySocket.message(eventFrame) + + expect(seen).toContain('tool.start') + expect(sidecarSocket.sent).toContain(eventFrame) + + gw.kill() + }) + + it('emits exit when attached websocket closes', () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc' + const gw = new GatewayClient() + const exits: Array = [] + + gw.on('exit', code => exits.push(code)) + gw.start() + + const gatewaySocket = FakeWebSocket.instances[0]! + + gatewaySocket.open() + gw.drain() + gatewaySocket.close(1011) + + expect(exits).toEqual([1011]) + }) + + it('rejects pending RPCs with websocket wording when the attached socket closes', async () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc' + const gw = new GatewayClient() + + gw.start() + const gatewaySocket = FakeWebSocket.instances[0]! + + gatewaySocket.open() + gw.drain() + + const req = gw.request('session.create', {}) + await vi.waitFor(() => expect(gatewaySocket.sent.length).toBeGreaterThan(0)) + + gatewaySocket.close(1011) + + await expect(req).rejects.toThrow(/gateway websocket closed \(1011\)/) + }) + + it('rejects pending RPCs when kill() closes the attached websocket', async () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc' + const gw = new GatewayClient() + + gw.start() + const gatewaySocket = FakeWebSocket.instances[0]! + + gatewaySocket.open() + gw.drain() + + const req = gw.request('session.create', {}) + await vi.waitFor(() => expect(gatewaySocket.sent.length).toBeGreaterThan(0)) + + gw.kill() + + await expect(req).rejects.toThrow(/gateway closed/) + }) + + it('reattaches when HERMES_TUI_GATEWAY_URL rotates between requests', async () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway-old.test/api/ws?token=abc' + const gw = new GatewayClient() + + gw.start() + const firstSocket = FakeWebSocket.instances[0]! + + firstSocket.open() + gw.drain() + + const stale = gw.request('session.create', {}) + await vi.waitFor(() => expect(firstSocket.sent.length).toBeGreaterThan(0)) + + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway-new.test/api/ws?token=xyz' + const next = gw.request('session.create', {}) + + await expect(stale).rejects.toThrow(/gateway attach url changed/) + await vi.waitFor(() => expect(FakeWebSocket.instances).toHaveLength(2)) + + const secondSocket = FakeWebSocket.instances[1]! + expect(secondSocket.url).toContain('gateway-new.test') + + secondSocket.open() + await vi.waitFor(() => expect(secondSocket.sent.length).toBeGreaterThan(0)) + + const frame = JSON.parse(secondSocket.sent[0] ?? '{}') as { id: string } + secondSocket.message(JSON.stringify({ id: frame.id, jsonrpc: '2.0', result: { ok: true } })) + + await expect(next).resolves.toEqual({ ok: true }) + gw.kill() + }) + + it('redacts query string secrets in attach failure logs and events', () => { + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=hunter2&channel=secret' + delete (globalThis as { WebSocket?: unknown }).WebSocket + + const gw = new GatewayClient() + const stderrLines: string[] = [] + + gw.on('event', ev => { + if (ev.type === 'gateway.stderr' && typeof ev.payload?.line === 'string') { + stderrLines.push(ev.payload.line) + } + }) + gw.start() + gw.drain() + + expect(stderrLines.length).toBeGreaterThan(0) + for (const line of stderrLines) { + expect(line).not.toContain('hunter2') + expect(line).not.toContain('channel=secret') + } + + expect(gw.getLogTail(20)).not.toContain('hunter2') + expect(gw.getLogTail(20)).not.toContain('channel=secret') + + gw.kill() + }) + + it('redacts attach URL secrets when the WebSocket constructor throws', () => { + const secretUrl = 'ws://gateway.test/api/ws?token=hunter2&channel=secret' + + process.env.HERMES_TUI_GATEWAY_URL = secretUrl + ;(globalThis as { WebSocket?: unknown }).WebSocket = class ThrowingWebSocket extends FakeWebSocket { + constructor(url: string) { + throw new TypeError(`Invalid URL: ${url}`) + } + } as unknown as typeof WebSocket + + const gw = new GatewayClient() + + gw.start() + gw.drain() + + const tail = gw.getLogTail(20) + expect(tail).not.toContain('hunter2') + expect(tail).not.toContain('channel=secret') + expect(tail).not.toContain(secretUrl) + expect(tail).toContain('ws://gateway.test/api/ws?***') + + gw.kill() + }) + + it('redacts sidecar URL secrets when the WebSocket constructor throws', async () => { + const sidecarUrl = 'ws://gateway.test/api/pub?token=hunter2&channel=secret' + + process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc' + process.env.HERMES_TUI_SIDECAR_URL = sidecarUrl + ;(globalThis as { WebSocket?: unknown }).WebSocket = class ThrowingSidecarWebSocket extends FakeWebSocket { + constructor(url: string) { + if (url.includes('/api/pub')) { + throw new TypeError(`Invalid URL: ${url}`) + } + + super(url) + } + } as unknown as typeof WebSocket + + const gw = new GatewayClient() + + gw.start() + const gatewaySocket = FakeWebSocket.instances[0]! + gatewaySocket.open() + await vi.waitFor(() => expect(gw.getLogTail(20)).toContain('[sidecar] failed to connect')) + + const tail = gw.getLogTail(20) + expect(tail).not.toContain('hunter2') + expect(tail).not.toContain('channel=secret') + expect(tail).not.toContain(sidecarUrl) + expect(tail).toContain('ws://gateway.test/api/pub?***') + + gw.kill() + }) + + it('redacts user-info credentials even on URLs the WHATWG parser rejects', () => { + // Port 99999 is outside the WHATWG URL parser's valid 0–65535 + // range and survives `.trim()`, so the fixture deterministically + // exercises `redactUrl()`'s fallback branch across Node versions. + // (An earlier `%zz` user-info fixture did NOT actually throw in + // recent Node — WHATWG accepts malformed percent escapes there — + // which silently routed the test through the structured-URL path.) + const fixture = 'ws://alice:hunter2@gateway.test:99999/api/ws?token=secret' + expect(() => new URL(fixture)).toThrow() + + process.env.HERMES_TUI_GATEWAY_URL = fixture + delete (globalThis as { WebSocket?: unknown }).WebSocket + + const gw = new GatewayClient() + const stderrLines: string[] = [] + + gw.on('event', ev => { + if (ev.type === 'gateway.stderr' && typeof ev.payload?.line === 'string') { + stderrLines.push(ev.payload.line) + } + }) + gw.start() + gw.drain() + + expect(stderrLines.length).toBeGreaterThan(0) + for (const line of stderrLines) { + expect(line).not.toContain('alice') + expect(line).not.toContain('hunter2') + expect(line).not.toContain('token=secret') + } + + const tail = gw.getLogTail(20) + expect(tail).not.toContain('alice') + expect(tail).not.toContain('hunter2') + expect(tail).not.toContain('token=secret') + + gw.kill() + }) +}) diff --git a/ui-tui/src/gatewayClient.ts b/ui-tui/src/gatewayClient.ts index 838bf31fbc..9590b386aa 100644 --- a/ui-tui/src/gatewayClient.ts +++ b/ui-tui/src/gatewayClient.ts @@ -13,10 +13,26 @@ const MAX_BUFFERED_EVENTS = 2000 const MAX_LOG_PREVIEW = 240 const STARTUP_TIMEOUT_MS = Math.max(5000, parseInt(process.env.HERMES_TUI_STARTUP_TIMEOUT_MS ?? '15000', 10) || 15000) const REQUEST_TIMEOUT_MS = Math.max(30000, parseInt(process.env.HERMES_TUI_RPC_TIMEOUT_MS ?? '120000', 10) || 120000) +const WS_CONNECTING = 0 +const WS_OPEN = 1 +const WS_CLOSING = 2 +const WS_CLOSED = 3 const truncateLine = (line: string) => line.length > MAX_LOG_LINE_BYTES ? `${line.slice(0, MAX_LOG_LINE_BYTES)}… [truncated ${line.length} bytes]` : line +const resolveGatewayAttachUrl = () => { + const raw = process.env.HERMES_TUI_GATEWAY_URL?.trim() + + return raw ? raw : null +} + +const resolveSidecarUrl = () => { + const raw = process.env.HERMES_TUI_SIDECAR_URL?.trim() + + return raw ? raw : null +} + const resolvePython = (root: string) => { const configured = process.env.HERMES_PYTHON?.trim() || process.env.PYTHON?.trim() @@ -43,6 +59,60 @@ const asGatewayEvent = (value: unknown): GatewayEvent | null => ? (value as GatewayEvent) : null +// Hoisted decoder: attach mode can drive high-frequency binary frames +// (tool deltas, reasoning streams) and constructing a fresh TextDecoder +// per message creates avoidable GC pressure. One module-level instance +// is fine because UTF-8 is stateless and we always pass entire frames. +const _wireDecoder = new TextDecoder() + +const asWireText = (raw: unknown): string | null => { + if (typeof raw === 'string') { + return raw + } + + if (raw instanceof ArrayBuffer) { + return _wireDecoder.decode(raw) + } + + if (ArrayBuffer.isView(raw)) { + return _wireDecoder.decode(raw) + } + + return null +} + +// Matches `://user:pass@host…` style user-info segments in +// otherwise-malformed URLs that the WHATWG `URL` parser can't accept. +// Used by the `redactUrl` fallback so embedded credentials are +// scrubbed from log lines even when the URL is unparseable. +const _USERINFO_FALLBACK_RE = /^([a-z][a-z0-9+.\-]*:\/\/)[^/?#@]*@/i + +// Connection URLs (gateway, sidecar) often carry bearer tokens in the query +// string. We surface them in user-facing log lines and the +// `gateway.start_timeout` payload, so always strip the query string and any +// embedded user-info before logging. +const redactUrl = (raw: string): string => { + if (!raw) { + return raw + } + + try { + const url = new URL(raw) + const userInfo = url.username || url.password ? '***@' : '' + const query = url.search ? '?***' : '' + + return `${url.protocol}//${userInfo}${url.host}${url.pathname}${query}` + } catch { + // WHATWG URL rejected the input. Best-effort: strip an embedded + // `user:pass@` segment AND the query string so a malformed token + // bearer can never escape into the log tail. + const noUserInfo = raw.replace(_USERINFO_FALLBACK_RE, '$1***@') + const queryIdx = noUserInfo.indexOf('?') + + return queryIdx >= 0 ? `${noUserInfo.slice(0, queryIdx)}?***` : noUserInfo + } +} + interface Pending { id: string method: string @@ -53,6 +123,11 @@ interface Pending { export class GatewayClient extends EventEmitter { private proc: ChildProcess | null = null + private ws: WebSocket | null = null + private wsConnectPromise: Promise | null = null + private sidecarWs: WebSocket | null = null + private attachUrl: null | string = null + private sidecarUrl: null | string = null private reqId = 0 private logs = new CircularBuffer(MAX_GATEWAY_LOG_LINES) private pending = new Map() @@ -88,14 +163,48 @@ export class GatewayClient extends EventEmitter { this.bufferedEvents.push(ev) } - start() { - const root = process.env.HERMES_PYTHON_SRC_ROOT ?? resolve(import.meta.dirname, '../../') - const python = resolvePython(root) - const cwd = process.env.HERMES_CWD || root - const env = { ...process.env } - const pyPath = env.PYTHONPATH?.trim() - env.PYTHONPATH = pyPath ? `${root}${delimiter}${pyPath}` : root + private clearReadyTimer() { + if (this.readyTimer) { + clearTimeout(this.readyTimer) + this.readyTimer = null + } + } + private closeSidecarSocket() { + try { + this.sidecarWs?.close() + } catch { + // best effort + } finally { + this.sidecarWs = null + } + } + + private closeGatewaySocket() { + // Null the active reference BEFORE invoking close(): real WebSocket + // implementations dispatch the 'close' event after a microtask hop, + // so by the time the handler runs `this.ws` should already be null + // and the identity guard will correctly classify the close as + // belonging to a discarded socket. (Test fakes emit synchronously, + // so doing the swap up front is also what makes the identity guard + // match real timing in tests.) + const ws = this.ws + this.ws = null + this.wsConnectPromise = null + try { + ws?.close() + } catch { + // best effort + } + } + + private resetStartupState() { + // Reject any in-flight RPCs left over from the previous transport + // before we swap. Otherwise the old transport's stale exit/close + // handlers (now identity-gated to ignore unrelated transports) + // never fire `rejectPending`, leaving callers hanging on promises + // attached to a discarded child / socket. + this.rejectPending(new Error('gateway restarting')) this.ready = false this.bufferedEvents.clear() this.pendingExit = undefined @@ -103,15 +212,10 @@ export class GatewayClient extends EventEmitter { this.stderrRl?.close() this.stdoutRl = null this.stderrRl = null + this.clearReadyTimer() + } - if (this.proc && !this.proc.killed && this.proc.exitCode === null) { - this.proc.kill() - } - - if (this.readyTimer) { - clearTimeout(this.readyTimer) - } - + private startReadyTimer(python: string, cwd: string) { this.readyTimer = setTimeout(() => { if (this.ready) { return @@ -130,7 +234,95 @@ export class GatewayClient extends EventEmitter { payload: { cwd, python, stderr_tail: stderrTail } }) }, STARTUP_TIMEOUT_MS) + } + private handleTransportExit(code: null | number, reason?: string) { + this.clearReadyTimer() + this.closeSidecarSocket() + this.rejectPending(new Error(reason || `gateway exited${code === null ? '' : ` (${code})`}`)) + + if (this.subscribed) { + this.emit('exit', code) + } else { + this.pendingExit = code + } + } + + private connectSidecarMirror() { + this.closeSidecarSocket() + + if (!this.sidecarUrl) { + return + } + + if (typeof WebSocket === 'undefined') { + this.pushLog(`[sidecar] WebSocket unavailable; skipping mirror to ${redactUrl(this.sidecarUrl)}`) + return + } + + try { + const ws = new WebSocket(this.sidecarUrl) + + this.sidecarWs = ws + ws.addEventListener('close', () => { + if (this.sidecarWs === ws) { + this.sidecarWs = null + } + }) + ws.addEventListener('error', () => { + this.pushLog('[sidecar] mirror connection error') + }) + } catch (err) { + this.pushLog(`[sidecar] failed to connect ${redactUrl(this.sidecarUrl)} (constructor error)`) + this.sidecarWs = null + } + } + + private mirrorEventToSidecar(rawFrame: string) { + const ws = this.sidecarWs + + if (!ws || ws.readyState !== WS_OPEN) { + return + } + + try { + ws.send(rawFrame) + } catch { + // best effort + } + } + + private handleWebSocketFrame(raw: unknown) { + const text = asWireText(raw) + + if (!text) { + return + } + + try { + const frame = JSON.parse(text) as Record + + if (frame.method === 'event') { + this.mirrorEventToSidecar(text) + } + + this.dispatch(frame) + } catch { + const preview = text.trim().slice(0, MAX_LOG_PREVIEW) || '(empty frame)' + + this.pushLog(`[protocol] malformed websocket frame: ${preview}`) + this.publish({ type: 'gateway.protocol_error', payload: { preview } }) + } + } + + private startSpawnedGateway(root: string) { + const python = resolvePython(root) + const cwd = process.env.HERMES_CWD || root + const env = { ...process.env } + const pyPath = env.PYTHONPATH?.trim() + + env.PYTHONPATH = pyPath ? `${root}${delimiter}${pyPath}` : root + this.startReadyTimer(python, cwd) this.proc = spawn(python, ['-m', 'tui_gateway.entry'], { cwd, env, stdio: ['pipe', 'pipe', 'pipe'] }) this.stdoutRl = createInterface({ input: this.proc.stdout! }) @@ -157,28 +349,154 @@ export class GatewayClient extends EventEmitter { this.publish({ type: 'gateway.stderr', payload: { line } }) }) + const ownedProc = this.proc this.proc.on('error', err => { - this.pushLog(`[spawn] ${err.message}`) - this.rejectPending(new Error(`gateway error: ${err.message}`)) - this.publish({ type: 'gateway.stderr', payload: { line: `[spawn] ${err.message}` } }) - }) + // Skip stale errors on an already-replaced child. + if (this.proc !== ownedProc) { + return + } + const line = `[spawn] ${err.message}` + + this.pushLog(line) + this.publish({ type: 'gateway.stderr', payload: { line } }) + // Detach the reference up front so the late `exit` event for + // this same child is identity-skipped (we don't want to emit + // 'exit' twice). Then run the full teardown — clears the + // startup timer so we don't fire a misleading + // `gateway.start_timeout`, rejects pending RPCs, and emits or + // queues a single `exit`. + this.proc = null + this.handleTransportExit(1, `gateway error: ${err.message}`) + }) this.proc.on('exit', code => { - if (this.readyTimer) { - clearTimeout(this.readyTimer) - this.readyTimer = null + // start() can replace `this.proc` while an old child is still + // tearing down. Skip stale exits so we don't clear the new + // startup timer or reject newly-issued pending requests. + if (this.proc !== ownedProc) { + return } - this.rejectPending(new Error(`gateway exited${code === null ? '' : ` (${code})`}`)) - - if (this.subscribed) { - this.emit('exit', code) - } else { - this.pendingExit = code - } + this.handleTransportExit(code) }) } + private startAttachedGateway(attachUrl: string) { + const safeAttachUrl = redactUrl(attachUrl) + this.startReadyTimer('websocket', safeAttachUrl) + + if (typeof WebSocket === 'undefined') { + const line = `[startup] WebSocket API unavailable; cannot attach to ${safeAttachUrl}` + + this.pushLog(line) + this.publish({ type: 'gateway.stderr', payload: { line } }) + this.handleTransportExit(1, 'gateway websocket unavailable') + + return + } + + try { + const ws = new WebSocket(attachUrl) + let settled = false + + this.ws = ws + const connectPromise = new Promise((resolve, reject) => { + ws.addEventListener( + 'open', + () => { + if (!settled) { + settled = true + resolve() + } + + this.connectSidecarMirror() + }, + { once: true } + ) + + ws.addEventListener( + 'error', + () => { + if (!settled) { + this.pushLog('[startup] gateway websocket connect error') + settled = true + reject(new Error('gateway websocket connection failed')) + } + }, + { once: true } + ) + ws.addEventListener( + 'close', + ev => { + if (!settled) { + settled = true + reject(new Error(`gateway websocket closed (${ev.code}) during connect`)) + } + }, + { once: true } + ) + }) + + // The connect promise is only awaited by RPCs that arrive while + // the socket is still connecting. If no request races the open + // (or a teardown drops the reference before anyone observes it), + // a connect-error / early-close rejection would surface as an + // unhandled promise rejection in Node. Attach a no-op handler to + // ensure the rejection is always observed. + connectPromise.catch(() => {}) + this.wsConnectPromise = connectPromise + + ws.addEventListener('message', ev => this.handleWebSocketFrame(ev.data)) + ws.addEventListener('close', ev => { + // Skip close events from sockets that have already been + // replaced — start() / closeGatewaySocket() can swap `this.ws` + // before an in-flight close lands, and we must not clear the + // new ready timer or reject the new pending requests on behalf + // of a stale socket. + if (this.ws !== ws) { + return + } + + this.ws = null + this.wsConnectPromise = null + this.handleTransportExit(ev.code, `gateway websocket closed${ev.code ? ` (${ev.code})` : ''}`) + }) + ws.addEventListener('error', () => { + const line = '[gateway] websocket transport error' + + this.pushLog(line) + this.publish({ type: 'gateway.stderr', payload: { line } }) + }) + } catch (err) { + this.pushLog(`[startup] failed to connect websocket gateway ${safeAttachUrl} (constructor error)`) + this.handleTransportExit(1, 'gateway websocket startup failed') + } + } + + start() { + const root = process.env.HERMES_PYTHON_SRC_ROOT ?? resolve(import.meta.dirname, '../../') + const attachUrl = resolveGatewayAttachUrl() + const sidecarUrl = resolveSidecarUrl() + + this.attachUrl = attachUrl + this.sidecarUrl = sidecarUrl + this.resetStartupState() + + if (this.proc && !this.proc.killed && this.proc.exitCode === null) { + this.proc.kill() + } + this.proc = null + this.closeGatewaySocket() + this.closeSidecarSocket() + + if (attachUrl) { + this.startAttachedGateway(attachUrl) + return + } + + this.startSpawnedGateway(root) + } + private dispatch(msg: Record) { const id = msg.id as string | undefined const p = id ? this.pending.get(id) : undefined @@ -258,7 +576,78 @@ export class GatewayClient extends EventEmitter { return this.logs.tail(Math.max(1, limit)).join('\n') } + private async ensureAttachedWebSocket(method: string): Promise { + if (!this.attachUrl) { + throw new Error('gateway not running') + } + + if (!this.ws || this.ws.readyState === WS_CLOSED || this.ws.readyState === WS_CLOSING) { + this.start() + } + + if (this.ws?.readyState === WS_CONNECTING) { + try { + await this.wsConnectPromise + } catch (err) { + throw err instanceof Error ? err : new Error(String(err)) + } + } + + if (!this.ws || this.ws.readyState !== WS_OPEN) { + throw new Error(`gateway not connected: ${method}`) + } + + return this.ws + } + + private requestOverWebSocket(method: string, params: Record = {}): Promise { + return this.ensureAttachedWebSocket(method).then( + ws => + new Promise((resolve, reject) => { + const id = `r${++this.reqId}` + const timeout = setTimeout(this.onTimeout, REQUEST_TIMEOUT_MS, id) + + timeout.unref?.() + this.pending.set(id, { + id, + method, + reject, + resolve: v => resolve(v as T), + timeout + }) + + try { + ws.send(JSON.stringify({ id, jsonrpc: '2.0', method, params })) + } catch (e) { + const pending = this.pending.get(id) + + if (pending) { + clearTimeout(pending.timeout) + this.pending.delete(id) + } + + reject(e instanceof Error ? e : new Error(String(e))) + } + }) + ) + } + request(method: string, params: Record = {}): Promise { + const attachUrl = resolveGatewayAttachUrl() + + if (attachUrl) { + if (this.attachUrl !== attachUrl) { + // The env var rotated at runtime — restart the transport so + // switching from spawned-gateway mode to attach mode also + // tears down the old Python child. Merely closing `this.ws` + // would leave a previously spawned gateway process alive. + this.rejectPending(new Error('gateway attach url changed')) + this.start() + } + + return this.requestOverWebSocket(method, params) + } + if (!this.proc?.stdin || this.proc.killed || this.proc.exitCode !== null) { this.start() } @@ -299,5 +688,13 @@ export class GatewayClient extends EventEmitter { kill() { this.proc?.kill() + this.closeGatewaySocket() + this.closeSidecarSocket() + this.clearReadyTimer() + // The ws 'close' handler is identity-gated on `this.ws === ws` + // and we just nulled `this.ws`, so it will short-circuit and + // skip handleTransportExit. Reject pending RPCs explicitly so + // attach-mode promises do not hang after an intentional kill. + this.rejectPending(new Error('gateway closed')) } }