mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-09 03:11:58 +00:00
feat(tui): support attaching to an existing gateway (#21978)
* feat(tui): support attaching to an existing gateway Allow the TUI gateway client to connect via HERMES_TUI_GATEWAY_URL while preserving spawned gateway fallback, and mirror event frames to sidecar feeds so dashboard tool activity remains visible. * review(copilot): redact attach URLs and gate stale transport exits Strip query strings (and any user info) from gateway / sidecar URLs before logging or surfacing them in `gateway.start_timeout`, so attach tokens never leak into the TUI log tail or activity feed. Also gate the spawned-proc and websocket close handlers on transport identity so a stale child or socket cannot clear a freshly-started ready timer or reject newly-issued pending requests during reconnect. * review(copilot): tighten transport restart and shutdown lifecycle Reject any in-flight RPCs in resetStartupState so callers do not hang on promises issued to the previous transport when start() swaps a child or socket. Have kill() explicitly reject pending so attach-mode promises drain after an intentional shutdown, and reattach when HERMES_TUI_GATEWAY_URL rotates between requests instead of silently keeping the old session. Fold the spawned child error path through handleTransportExit so a failed spawn clears the startup timer and emits a single exit event. Also null the websocket reference before calling close so the identity guard correctly tags stale close events on real WebSocket timing. Locks the new behaviors in with regression tests for kill, URL rotation, and stale-pending cleanup. * review(copilot): swallow stray ws connect rejection and isolate test env Attach a no-op catch handler on the websocket connect promise so an unobserved connect-error / early-close rejection cannot surface as an unhandled promise rejection in Node when no request is currently racing the open. Snapshot HERMES_TUI_GATEWAY_URL / HERMES_TUI_SIDECAR_URL in beforeEach and restore them in afterEach so vitest runs that set those env vars beforehand do not get permanently cleared. * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * review(copilot): hoist wire decoder and harden redact fallback Reuse a single module-level TextDecoder for binary websocket frames so high-frequency attach-mode traffic does not allocate one per message. Strengthen the redactUrl fallback so embedded user:pass@ credentials are also masked when the WHATWG URL parser rejects the input, and pin the new behavior with a regression test that drives a malformed bearer URL through the gateway-stderr publish path. * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> * review(copilot): force redact fallback path with deterministic fixture Replace the "%zz" user-info fixture, which WHATWG URL actually accepts in recent Node and silently routed the test back through the structured-URL branch, with a port-99999 fixture that the parser rejects across Node versions. Add a pre-flight `expect(() => new URL(fixture)).toThrow()` assertion so a future URL-parser change can never silently bypass `redactUrl()`'s fallback again. * review(copilot): sanitize websocket constructor failures Avoid logging raw WebSocket constructor error messages because some implementations include the full input URL, including token-bearing query strings. Log the redacted gateway or sidecar URL with the error class instead, and add regression coverage for constructor-throw paths on both attach and sidecar sockets. * review(self): restart transport on attach-mode transition Route runtime HERMES_TUI_GATEWAY_URL changes through start() so switching from spawned-gateway mode to attach mode also tears down the previously spawned Python child instead of leaving it alive. Keep the existing fast-fail behavior for pending RPCs. Also make constructor-failure logging fully generic after the redacted URL, avoiding even implementation-specific error class text in the log tail. * review(copilot): use websocket wording for attach close errors When the attached websocket closes, reject pending RPCs with an explicit websocket-closed reason instead of the spawned-process oriented `gateway exited` wording. Add coverage to ensure close code 1011 surfaces as `gateway websocket closed (1011)`. --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
parent
9680827078
commit
1997b3baf8
2 changed files with 812 additions and 29 deletions
386
ui-tui/src/__tests__/gatewayClient.test.ts
Normal file
386
ui-tui/src/__tests__/gatewayClient.test.ts
Normal file
|
|
@ -0,0 +1,386 @@
|
|||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import { GatewayClient } from '../gatewayClient.js'
|
||||
|
||||
interface ListenerEntry {
|
||||
callback: (event: any) => void
|
||||
once: boolean
|
||||
}
|
||||
|
||||
class FakeWebSocket {
|
||||
static CONNECTING = 0
|
||||
static OPEN = 1
|
||||
static CLOSING = 2
|
||||
static CLOSED = 3
|
||||
static instances: FakeWebSocket[] = []
|
||||
|
||||
readyState = FakeWebSocket.CONNECTING
|
||||
sent: string[] = []
|
||||
readonly url: string
|
||||
private listeners = new Map<string, ListenerEntry[]>()
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url
|
||||
FakeWebSocket.instances.push(this)
|
||||
}
|
||||
|
||||
static reset() {
|
||||
FakeWebSocket.instances = []
|
||||
}
|
||||
|
||||
addEventListener(type: string, callback: (event: any) => void, options?: unknown) {
|
||||
const once =
|
||||
typeof options === 'object' &&
|
||||
options !== null &&
|
||||
'once' in options &&
|
||||
Boolean((options as { once?: unknown }).once)
|
||||
const entries = this.listeners.get(type) ?? []
|
||||
|
||||
entries.push({ callback, once })
|
||||
this.listeners.set(type, entries)
|
||||
}
|
||||
|
||||
removeEventListener(type: string, callback: (event: any) => void) {
|
||||
const entries = this.listeners.get(type)
|
||||
|
||||
if (!entries) {
|
||||
return
|
||||
}
|
||||
|
||||
this.listeners.set(
|
||||
type,
|
||||
entries.filter(entry => entry.callback !== callback)
|
||||
)
|
||||
}
|
||||
|
||||
send(payload: string) {
|
||||
if (this.readyState !== FakeWebSocket.OPEN) {
|
||||
throw new Error('socket not open')
|
||||
}
|
||||
|
||||
this.sent.push(payload)
|
||||
}
|
||||
|
||||
close(code = 1000) {
|
||||
if (this.readyState === FakeWebSocket.CLOSED) {
|
||||
return
|
||||
}
|
||||
|
||||
this.readyState = FakeWebSocket.CLOSED
|
||||
this.emit('close', { code })
|
||||
}
|
||||
|
||||
open() {
|
||||
this.readyState = FakeWebSocket.OPEN
|
||||
this.emit('open', {})
|
||||
}
|
||||
|
||||
message(data: string) {
|
||||
this.emit('message', { data })
|
||||
}
|
||||
|
||||
private emit(type: string, event: any) {
|
||||
const entries = [...(this.listeners.get(type) ?? [])]
|
||||
|
||||
for (const entry of entries) {
|
||||
entry.callback(event)
|
||||
if (entry.once) {
|
||||
this.removeEventListener(type, entry.callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
describe('GatewayClient websocket attach mode', () => {
|
||||
const originalWebSocket = globalThis.WebSocket
|
||||
let originalGatewayUrl: string | undefined
|
||||
let originalSidecarUrl: string | undefined
|
||||
|
||||
beforeEach(() => {
|
||||
originalGatewayUrl = process.env.HERMES_TUI_GATEWAY_URL
|
||||
originalSidecarUrl = process.env.HERMES_TUI_SIDECAR_URL
|
||||
FakeWebSocket.reset()
|
||||
;(globalThis as { WebSocket?: unknown }).WebSocket = FakeWebSocket as unknown as typeof WebSocket
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
if (originalGatewayUrl === undefined) {
|
||||
delete process.env.HERMES_TUI_GATEWAY_URL
|
||||
} else {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = originalGatewayUrl
|
||||
}
|
||||
|
||||
if (originalSidecarUrl === undefined) {
|
||||
delete process.env.HERMES_TUI_SIDECAR_URL
|
||||
} else {
|
||||
process.env.HERMES_TUI_SIDECAR_URL = originalSidecarUrl
|
||||
}
|
||||
|
||||
FakeWebSocket.reset()
|
||||
|
||||
if (originalWebSocket) {
|
||||
globalThis.WebSocket = originalWebSocket
|
||||
} else {
|
||||
delete (globalThis as { WebSocket?: unknown }).WebSocket
|
||||
}
|
||||
})
|
||||
|
||||
it('waits for websocket open and resolves RPC requests', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
const req = gw.request<{ ok: boolean }>('session.create', { cols: 80 })
|
||||
|
||||
expect(gatewaySocket.sent).toHaveLength(0)
|
||||
gatewaySocket.open()
|
||||
await vi.waitFor(() => expect(gatewaySocket.sent).toHaveLength(1))
|
||||
|
||||
const frame = JSON.parse(gatewaySocket.sent[0] ?? '{}') as { id: string; method: string }
|
||||
expect(frame.method).toBe('session.create')
|
||||
|
||||
gatewaySocket.message(JSON.stringify({ id: frame.id, jsonrpc: '2.0', result: { ok: true } }))
|
||||
await expect(req).resolves.toEqual({ ok: true })
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('mirrors event frames to sidecar websocket when configured', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
process.env.HERMES_TUI_SIDECAR_URL = 'ws://gateway.test/api/pub?token=abc&channel=demo'
|
||||
|
||||
const gw = new GatewayClient()
|
||||
const seen: string[] = []
|
||||
|
||||
gw.on('event', ev => seen.push(ev.type))
|
||||
gw.start()
|
||||
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
gatewaySocket.open()
|
||||
await vi.waitFor(() => expect(FakeWebSocket.instances).toHaveLength(2))
|
||||
|
||||
const sidecarSocket = FakeWebSocket.instances[1]!
|
||||
|
||||
sidecarSocket.open()
|
||||
gw.drain()
|
||||
|
||||
const eventFrame = JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
method: 'event',
|
||||
params: { type: 'tool.start', payload: { tool_id: 't1' } }
|
||||
})
|
||||
gatewaySocket.message(eventFrame)
|
||||
|
||||
expect(seen).toContain('tool.start')
|
||||
expect(sidecarSocket.sent).toContain(eventFrame)
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('emits exit when attached websocket closes', () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
const exits: Array<null | number> = []
|
||||
|
||||
gw.on('exit', code => exits.push(code))
|
||||
gw.start()
|
||||
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
|
||||
gatewaySocket.open()
|
||||
gw.drain()
|
||||
gatewaySocket.close(1011)
|
||||
|
||||
expect(exits).toEqual([1011])
|
||||
})
|
||||
|
||||
it('rejects pending RPCs with websocket wording when the attached socket closes', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
|
||||
gatewaySocket.open()
|
||||
gw.drain()
|
||||
|
||||
const req = gw.request('session.create', {})
|
||||
await vi.waitFor(() => expect(gatewaySocket.sent.length).toBeGreaterThan(0))
|
||||
|
||||
gatewaySocket.close(1011)
|
||||
|
||||
await expect(req).rejects.toThrow(/gateway websocket closed \(1011\)/)
|
||||
})
|
||||
|
||||
it('rejects pending RPCs when kill() closes the attached websocket', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
|
||||
gatewaySocket.open()
|
||||
gw.drain()
|
||||
|
||||
const req = gw.request('session.create', {})
|
||||
await vi.waitFor(() => expect(gatewaySocket.sent.length).toBeGreaterThan(0))
|
||||
|
||||
gw.kill()
|
||||
|
||||
await expect(req).rejects.toThrow(/gateway closed/)
|
||||
})
|
||||
|
||||
it('reattaches when HERMES_TUI_GATEWAY_URL rotates between requests', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway-old.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const firstSocket = FakeWebSocket.instances[0]!
|
||||
|
||||
firstSocket.open()
|
||||
gw.drain()
|
||||
|
||||
const stale = gw.request('session.create', {})
|
||||
await vi.waitFor(() => expect(firstSocket.sent.length).toBeGreaterThan(0))
|
||||
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway-new.test/api/ws?token=xyz'
|
||||
const next = gw.request('session.create', {})
|
||||
|
||||
await expect(stale).rejects.toThrow(/gateway attach url changed/)
|
||||
await vi.waitFor(() => expect(FakeWebSocket.instances).toHaveLength(2))
|
||||
|
||||
const secondSocket = FakeWebSocket.instances[1]!
|
||||
expect(secondSocket.url).toContain('gateway-new.test')
|
||||
|
||||
secondSocket.open()
|
||||
await vi.waitFor(() => expect(secondSocket.sent.length).toBeGreaterThan(0))
|
||||
|
||||
const frame = JSON.parse(secondSocket.sent[0] ?? '{}') as { id: string }
|
||||
secondSocket.message(JSON.stringify({ id: frame.id, jsonrpc: '2.0', result: { ok: true } }))
|
||||
|
||||
await expect(next).resolves.toEqual({ ok: true })
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('redacts query string secrets in attach failure logs and events', () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=hunter2&channel=secret'
|
||||
delete (globalThis as { WebSocket?: unknown }).WebSocket
|
||||
|
||||
const gw = new GatewayClient()
|
||||
const stderrLines: string[] = []
|
||||
|
||||
gw.on('event', ev => {
|
||||
if (ev.type === 'gateway.stderr' && typeof ev.payload?.line === 'string') {
|
||||
stderrLines.push(ev.payload.line)
|
||||
}
|
||||
})
|
||||
gw.start()
|
||||
gw.drain()
|
||||
|
||||
expect(stderrLines.length).toBeGreaterThan(0)
|
||||
for (const line of stderrLines) {
|
||||
expect(line).not.toContain('hunter2')
|
||||
expect(line).not.toContain('channel=secret')
|
||||
}
|
||||
|
||||
expect(gw.getLogTail(20)).not.toContain('hunter2')
|
||||
expect(gw.getLogTail(20)).not.toContain('channel=secret')
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('redacts attach URL secrets when the WebSocket constructor throws', () => {
|
||||
const secretUrl = 'ws://gateway.test/api/ws?token=hunter2&channel=secret'
|
||||
|
||||
process.env.HERMES_TUI_GATEWAY_URL = secretUrl
|
||||
;(globalThis as { WebSocket?: unknown }).WebSocket = class ThrowingWebSocket extends FakeWebSocket {
|
||||
constructor(url: string) {
|
||||
throw new TypeError(`Invalid URL: ${url}`)
|
||||
}
|
||||
} as unknown as typeof WebSocket
|
||||
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
gw.drain()
|
||||
|
||||
const tail = gw.getLogTail(20)
|
||||
expect(tail).not.toContain('hunter2')
|
||||
expect(tail).not.toContain('channel=secret')
|
||||
expect(tail).not.toContain(secretUrl)
|
||||
expect(tail).toContain('ws://gateway.test/api/ws?***')
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('redacts sidecar URL secrets when the WebSocket constructor throws', async () => {
|
||||
const sidecarUrl = 'ws://gateway.test/api/pub?token=hunter2&channel=secret'
|
||||
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
process.env.HERMES_TUI_SIDECAR_URL = sidecarUrl
|
||||
;(globalThis as { WebSocket?: unknown }).WebSocket = class ThrowingSidecarWebSocket extends FakeWebSocket {
|
||||
constructor(url: string) {
|
||||
if (url.includes('/api/pub')) {
|
||||
throw new TypeError(`Invalid URL: ${url}`)
|
||||
}
|
||||
|
||||
super(url)
|
||||
}
|
||||
} as unknown as typeof WebSocket
|
||||
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
gatewaySocket.open()
|
||||
await vi.waitFor(() => expect(gw.getLogTail(20)).toContain('[sidecar] failed to connect'))
|
||||
|
||||
const tail = gw.getLogTail(20)
|
||||
expect(tail).not.toContain('hunter2')
|
||||
expect(tail).not.toContain('channel=secret')
|
||||
expect(tail).not.toContain(sidecarUrl)
|
||||
expect(tail).toContain('ws://gateway.test/api/pub?***')
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('redacts user-info credentials even on URLs the WHATWG parser rejects', () => {
|
||||
// Port 99999 is outside the WHATWG URL parser's valid 0–65535
|
||||
// range and survives `.trim()`, so the fixture deterministically
|
||||
// exercises `redactUrl()`'s fallback branch across Node versions.
|
||||
// (An earlier `%zz` user-info fixture did NOT actually throw in
|
||||
// recent Node — WHATWG accepts malformed percent escapes there —
|
||||
// which silently routed the test through the structured-URL path.)
|
||||
const fixture = 'ws://alice:hunter2@gateway.test:99999/api/ws?token=secret'
|
||||
expect(() => new URL(fixture)).toThrow()
|
||||
|
||||
process.env.HERMES_TUI_GATEWAY_URL = fixture
|
||||
delete (globalThis as { WebSocket?: unknown }).WebSocket
|
||||
|
||||
const gw = new GatewayClient()
|
||||
const stderrLines: string[] = []
|
||||
|
||||
gw.on('event', ev => {
|
||||
if (ev.type === 'gateway.stderr' && typeof ev.payload?.line === 'string') {
|
||||
stderrLines.push(ev.payload.line)
|
||||
}
|
||||
})
|
||||
gw.start()
|
||||
gw.drain()
|
||||
|
||||
expect(stderrLines.length).toBeGreaterThan(0)
|
||||
for (const line of stderrLines) {
|
||||
expect(line).not.toContain('alice')
|
||||
expect(line).not.toContain('hunter2')
|
||||
expect(line).not.toContain('token=secret')
|
||||
}
|
||||
|
||||
const tail = gw.getLogTail(20)
|
||||
expect(tail).not.toContain('alice')
|
||||
expect(tail).not.toContain('hunter2')
|
||||
expect(tail).not.toContain('token=secret')
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
})
|
||||
|
|
@ -13,10 +13,26 @@ const MAX_BUFFERED_EVENTS = 2000
|
|||
const MAX_LOG_PREVIEW = 240
|
||||
const STARTUP_TIMEOUT_MS = Math.max(5000, parseInt(process.env.HERMES_TUI_STARTUP_TIMEOUT_MS ?? '15000', 10) || 15000)
|
||||
const REQUEST_TIMEOUT_MS = Math.max(30000, parseInt(process.env.HERMES_TUI_RPC_TIMEOUT_MS ?? '120000', 10) || 120000)
|
||||
const WS_CONNECTING = 0
|
||||
const WS_OPEN = 1
|
||||
const WS_CLOSING = 2
|
||||
const WS_CLOSED = 3
|
||||
|
||||
const truncateLine = (line: string) =>
|
||||
line.length > MAX_LOG_LINE_BYTES ? `${line.slice(0, MAX_LOG_LINE_BYTES)}… [truncated ${line.length} bytes]` : line
|
||||
|
||||
const resolveGatewayAttachUrl = () => {
|
||||
const raw = process.env.HERMES_TUI_GATEWAY_URL?.trim()
|
||||
|
||||
return raw ? raw : null
|
||||
}
|
||||
|
||||
const resolveSidecarUrl = () => {
|
||||
const raw = process.env.HERMES_TUI_SIDECAR_URL?.trim()
|
||||
|
||||
return raw ? raw : null
|
||||
}
|
||||
|
||||
const resolvePython = (root: string) => {
|
||||
const configured = process.env.HERMES_PYTHON?.trim() || process.env.PYTHON?.trim()
|
||||
|
||||
|
|
@ -43,6 +59,60 @@ const asGatewayEvent = (value: unknown): GatewayEvent | null =>
|
|||
? (value as GatewayEvent)
|
||||
: null
|
||||
|
||||
// Hoisted decoder: attach mode can drive high-frequency binary frames
|
||||
// (tool deltas, reasoning streams) and constructing a fresh TextDecoder
|
||||
// per message creates avoidable GC pressure. One module-level instance
|
||||
// is fine because UTF-8 is stateless and we always pass entire frames.
|
||||
const _wireDecoder = new TextDecoder()
|
||||
|
||||
const asWireText = (raw: unknown): string | null => {
|
||||
if (typeof raw === 'string') {
|
||||
return raw
|
||||
}
|
||||
|
||||
if (raw instanceof ArrayBuffer) {
|
||||
return _wireDecoder.decode(raw)
|
||||
}
|
||||
|
||||
if (ArrayBuffer.isView(raw)) {
|
||||
return _wireDecoder.decode(raw)
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
// Matches `<scheme>://user:pass@host…` style user-info segments in
|
||||
// otherwise-malformed URLs that the WHATWG `URL` parser can't accept.
|
||||
// Used by the `redactUrl` fallback so embedded credentials are
|
||||
// scrubbed from log lines even when the URL is unparseable.
|
||||
const _USERINFO_FALLBACK_RE = /^([a-z][a-z0-9+.\-]*:\/\/)[^/?#@]*@/i
|
||||
|
||||
// Connection URLs (gateway, sidecar) often carry bearer tokens in the query
|
||||
// string. We surface them in user-facing log lines and the
|
||||
// `gateway.start_timeout` payload, so always strip the query string and any
|
||||
// embedded user-info before logging.
|
||||
const redactUrl = (raw: string): string => {
|
||||
if (!raw) {
|
||||
return raw
|
||||
}
|
||||
|
||||
try {
|
||||
const url = new URL(raw)
|
||||
const userInfo = url.username || url.password ? '***@' : ''
|
||||
const query = url.search ? '?***' : ''
|
||||
|
||||
return `${url.protocol}//${userInfo}${url.host}${url.pathname}${query}`
|
||||
} catch {
|
||||
// WHATWG URL rejected the input. Best-effort: strip an embedded
|
||||
// `user:pass@` segment AND the query string so a malformed token
|
||||
// bearer can never escape into the log tail.
|
||||
const noUserInfo = raw.replace(_USERINFO_FALLBACK_RE, '$1***@')
|
||||
const queryIdx = noUserInfo.indexOf('?')
|
||||
|
||||
return queryIdx >= 0 ? `${noUserInfo.slice(0, queryIdx)}?***` : noUserInfo
|
||||
}
|
||||
}
|
||||
|
||||
interface Pending {
|
||||
id: string
|
||||
method: string
|
||||
|
|
@ -53,6 +123,11 @@ interface Pending {
|
|||
|
||||
export class GatewayClient extends EventEmitter {
|
||||
private proc: ChildProcess | null = null
|
||||
private ws: WebSocket | null = null
|
||||
private wsConnectPromise: Promise<void> | null = null
|
||||
private sidecarWs: WebSocket | null = null
|
||||
private attachUrl: null | string = null
|
||||
private sidecarUrl: null | string = null
|
||||
private reqId = 0
|
||||
private logs = new CircularBuffer<string>(MAX_GATEWAY_LOG_LINES)
|
||||
private pending = new Map<string, Pending>()
|
||||
|
|
@ -88,14 +163,48 @@ export class GatewayClient extends EventEmitter {
|
|||
this.bufferedEvents.push(ev)
|
||||
}
|
||||
|
||||
start() {
|
||||
const root = process.env.HERMES_PYTHON_SRC_ROOT ?? resolve(import.meta.dirname, '../../')
|
||||
const python = resolvePython(root)
|
||||
const cwd = process.env.HERMES_CWD || root
|
||||
const env = { ...process.env }
|
||||
const pyPath = env.PYTHONPATH?.trim()
|
||||
env.PYTHONPATH = pyPath ? `${root}${delimiter}${pyPath}` : root
|
||||
private clearReadyTimer() {
|
||||
if (this.readyTimer) {
|
||||
clearTimeout(this.readyTimer)
|
||||
this.readyTimer = null
|
||||
}
|
||||
}
|
||||
|
||||
private closeSidecarSocket() {
|
||||
try {
|
||||
this.sidecarWs?.close()
|
||||
} catch {
|
||||
// best effort
|
||||
} finally {
|
||||
this.sidecarWs = null
|
||||
}
|
||||
}
|
||||
|
||||
private closeGatewaySocket() {
|
||||
// Null the active reference BEFORE invoking close(): real WebSocket
|
||||
// implementations dispatch the 'close' event after a microtask hop,
|
||||
// so by the time the handler runs `this.ws` should already be null
|
||||
// and the identity guard will correctly classify the close as
|
||||
// belonging to a discarded socket. (Test fakes emit synchronously,
|
||||
// so doing the swap up front is also what makes the identity guard
|
||||
// match real timing in tests.)
|
||||
const ws = this.ws
|
||||
this.ws = null
|
||||
this.wsConnectPromise = null
|
||||
try {
|
||||
ws?.close()
|
||||
} catch {
|
||||
// best effort
|
||||
}
|
||||
}
|
||||
|
||||
private resetStartupState() {
|
||||
// Reject any in-flight RPCs left over from the previous transport
|
||||
// before we swap. Otherwise the old transport's stale exit/close
|
||||
// handlers (now identity-gated to ignore unrelated transports)
|
||||
// never fire `rejectPending`, leaving callers hanging on promises
|
||||
// attached to a discarded child / socket.
|
||||
this.rejectPending(new Error('gateway restarting'))
|
||||
this.ready = false
|
||||
this.bufferedEvents.clear()
|
||||
this.pendingExit = undefined
|
||||
|
|
@ -103,15 +212,10 @@ export class GatewayClient extends EventEmitter {
|
|||
this.stderrRl?.close()
|
||||
this.stdoutRl = null
|
||||
this.stderrRl = null
|
||||
this.clearReadyTimer()
|
||||
}
|
||||
|
||||
if (this.proc && !this.proc.killed && this.proc.exitCode === null) {
|
||||
this.proc.kill()
|
||||
}
|
||||
|
||||
if (this.readyTimer) {
|
||||
clearTimeout(this.readyTimer)
|
||||
}
|
||||
|
||||
private startReadyTimer(python: string, cwd: string) {
|
||||
this.readyTimer = setTimeout(() => {
|
||||
if (this.ready) {
|
||||
return
|
||||
|
|
@ -130,7 +234,95 @@ export class GatewayClient extends EventEmitter {
|
|||
payload: { cwd, python, stderr_tail: stderrTail }
|
||||
})
|
||||
}, STARTUP_TIMEOUT_MS)
|
||||
}
|
||||
|
||||
private handleTransportExit(code: null | number, reason?: string) {
|
||||
this.clearReadyTimer()
|
||||
this.closeSidecarSocket()
|
||||
this.rejectPending(new Error(reason || `gateway exited${code === null ? '' : ` (${code})`}`))
|
||||
|
||||
if (this.subscribed) {
|
||||
this.emit('exit', code)
|
||||
} else {
|
||||
this.pendingExit = code
|
||||
}
|
||||
}
|
||||
|
||||
private connectSidecarMirror() {
|
||||
this.closeSidecarSocket()
|
||||
|
||||
if (!this.sidecarUrl) {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof WebSocket === 'undefined') {
|
||||
this.pushLog(`[sidecar] WebSocket unavailable; skipping mirror to ${redactUrl(this.sidecarUrl)}`)
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const ws = new WebSocket(this.sidecarUrl)
|
||||
|
||||
this.sidecarWs = ws
|
||||
ws.addEventListener('close', () => {
|
||||
if (this.sidecarWs === ws) {
|
||||
this.sidecarWs = null
|
||||
}
|
||||
})
|
||||
ws.addEventListener('error', () => {
|
||||
this.pushLog('[sidecar] mirror connection error')
|
||||
})
|
||||
} catch (err) {
|
||||
this.pushLog(`[sidecar] failed to connect ${redactUrl(this.sidecarUrl)} (constructor error)`)
|
||||
this.sidecarWs = null
|
||||
}
|
||||
}
|
||||
|
||||
private mirrorEventToSidecar(rawFrame: string) {
|
||||
const ws = this.sidecarWs
|
||||
|
||||
if (!ws || ws.readyState !== WS_OPEN) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(rawFrame)
|
||||
} catch {
|
||||
// best effort
|
||||
}
|
||||
}
|
||||
|
||||
private handleWebSocketFrame(raw: unknown) {
|
||||
const text = asWireText(raw)
|
||||
|
||||
if (!text) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const frame = JSON.parse(text) as Record<string, unknown>
|
||||
|
||||
if (frame.method === 'event') {
|
||||
this.mirrorEventToSidecar(text)
|
||||
}
|
||||
|
||||
this.dispatch(frame)
|
||||
} catch {
|
||||
const preview = text.trim().slice(0, MAX_LOG_PREVIEW) || '(empty frame)'
|
||||
|
||||
this.pushLog(`[protocol] malformed websocket frame: ${preview}`)
|
||||
this.publish({ type: 'gateway.protocol_error', payload: { preview } })
|
||||
}
|
||||
}
|
||||
|
||||
private startSpawnedGateway(root: string) {
|
||||
const python = resolvePython(root)
|
||||
const cwd = process.env.HERMES_CWD || root
|
||||
const env = { ...process.env }
|
||||
const pyPath = env.PYTHONPATH?.trim()
|
||||
|
||||
env.PYTHONPATH = pyPath ? `${root}${delimiter}${pyPath}` : root
|
||||
this.startReadyTimer(python, cwd)
|
||||
this.proc = spawn(python, ['-m', 'tui_gateway.entry'], { cwd, env, stdio: ['pipe', 'pipe', 'pipe'] })
|
||||
|
||||
this.stdoutRl = createInterface({ input: this.proc.stdout! })
|
||||
|
|
@ -157,28 +349,154 @@ export class GatewayClient extends EventEmitter {
|
|||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
})
|
||||
|
||||
const ownedProc = this.proc
|
||||
this.proc.on('error', err => {
|
||||
this.pushLog(`[spawn] ${err.message}`)
|
||||
this.rejectPending(new Error(`gateway error: ${err.message}`))
|
||||
this.publish({ type: 'gateway.stderr', payload: { line: `[spawn] ${err.message}` } })
|
||||
})
|
||||
// Skip stale errors on an already-replaced child.
|
||||
if (this.proc !== ownedProc) {
|
||||
return
|
||||
}
|
||||
|
||||
const line = `[spawn] ${err.message}`
|
||||
|
||||
this.pushLog(line)
|
||||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
// Detach the reference up front so the late `exit` event for
|
||||
// this same child is identity-skipped (we don't want to emit
|
||||
// 'exit' twice). Then run the full teardown — clears the
|
||||
// startup timer so we don't fire a misleading
|
||||
// `gateway.start_timeout`, rejects pending RPCs, and emits or
|
||||
// queues a single `exit`.
|
||||
this.proc = null
|
||||
this.handleTransportExit(1, `gateway error: ${err.message}`)
|
||||
})
|
||||
this.proc.on('exit', code => {
|
||||
if (this.readyTimer) {
|
||||
clearTimeout(this.readyTimer)
|
||||
this.readyTimer = null
|
||||
// start() can replace `this.proc` while an old child is still
|
||||
// tearing down. Skip stale exits so we don't clear the new
|
||||
// startup timer or reject newly-issued pending requests.
|
||||
if (this.proc !== ownedProc) {
|
||||
return
|
||||
}
|
||||
|
||||
this.rejectPending(new Error(`gateway exited${code === null ? '' : ` (${code})`}`))
|
||||
|
||||
if (this.subscribed) {
|
||||
this.emit('exit', code)
|
||||
} else {
|
||||
this.pendingExit = code
|
||||
}
|
||||
this.handleTransportExit(code)
|
||||
})
|
||||
}
|
||||
|
||||
private startAttachedGateway(attachUrl: string) {
|
||||
const safeAttachUrl = redactUrl(attachUrl)
|
||||
this.startReadyTimer('websocket', safeAttachUrl)
|
||||
|
||||
if (typeof WebSocket === 'undefined') {
|
||||
const line = `[startup] WebSocket API unavailable; cannot attach to ${safeAttachUrl}`
|
||||
|
||||
this.pushLog(line)
|
||||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
this.handleTransportExit(1, 'gateway websocket unavailable')
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const ws = new WebSocket(attachUrl)
|
||||
let settled = false
|
||||
|
||||
this.ws = ws
|
||||
const connectPromise = new Promise<void>((resolve, reject) => {
|
||||
ws.addEventListener(
|
||||
'open',
|
||||
() => {
|
||||
if (!settled) {
|
||||
settled = true
|
||||
resolve()
|
||||
}
|
||||
|
||||
this.connectSidecarMirror()
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
|
||||
ws.addEventListener(
|
||||
'error',
|
||||
() => {
|
||||
if (!settled) {
|
||||
this.pushLog('[startup] gateway websocket connect error')
|
||||
settled = true
|
||||
reject(new Error('gateway websocket connection failed'))
|
||||
}
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
ws.addEventListener(
|
||||
'close',
|
||||
ev => {
|
||||
if (!settled) {
|
||||
settled = true
|
||||
reject(new Error(`gateway websocket closed (${ev.code}) during connect`))
|
||||
}
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
})
|
||||
|
||||
// The connect promise is only awaited by RPCs that arrive while
|
||||
// the socket is still connecting. If no request races the open
|
||||
// (or a teardown drops the reference before anyone observes it),
|
||||
// a connect-error / early-close rejection would surface as an
|
||||
// unhandled promise rejection in Node. Attach a no-op handler to
|
||||
// ensure the rejection is always observed.
|
||||
connectPromise.catch(() => {})
|
||||
this.wsConnectPromise = connectPromise
|
||||
|
||||
ws.addEventListener('message', ev => this.handleWebSocketFrame(ev.data))
|
||||
ws.addEventListener('close', ev => {
|
||||
// Skip close events from sockets that have already been
|
||||
// replaced — start() / closeGatewaySocket() can swap `this.ws`
|
||||
// before an in-flight close lands, and we must not clear the
|
||||
// new ready timer or reject the new pending requests on behalf
|
||||
// of a stale socket.
|
||||
if (this.ws !== ws) {
|
||||
return
|
||||
}
|
||||
|
||||
this.ws = null
|
||||
this.wsConnectPromise = null
|
||||
this.handleTransportExit(ev.code, `gateway websocket closed${ev.code ? ` (${ev.code})` : ''}`)
|
||||
})
|
||||
ws.addEventListener('error', () => {
|
||||
const line = '[gateway] websocket transport error'
|
||||
|
||||
this.pushLog(line)
|
||||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
})
|
||||
} catch (err) {
|
||||
this.pushLog(`[startup] failed to connect websocket gateway ${safeAttachUrl} (constructor error)`)
|
||||
this.handleTransportExit(1, 'gateway websocket startup failed')
|
||||
}
|
||||
}
|
||||
|
||||
start() {
|
||||
const root = process.env.HERMES_PYTHON_SRC_ROOT ?? resolve(import.meta.dirname, '../../')
|
||||
const attachUrl = resolveGatewayAttachUrl()
|
||||
const sidecarUrl = resolveSidecarUrl()
|
||||
|
||||
this.attachUrl = attachUrl
|
||||
this.sidecarUrl = sidecarUrl
|
||||
this.resetStartupState()
|
||||
|
||||
if (this.proc && !this.proc.killed && this.proc.exitCode === null) {
|
||||
this.proc.kill()
|
||||
}
|
||||
this.proc = null
|
||||
this.closeGatewaySocket()
|
||||
this.closeSidecarSocket()
|
||||
|
||||
if (attachUrl) {
|
||||
this.startAttachedGateway(attachUrl)
|
||||
return
|
||||
}
|
||||
|
||||
this.startSpawnedGateway(root)
|
||||
}
|
||||
|
||||
private dispatch(msg: Record<string, unknown>) {
|
||||
const id = msg.id as string | undefined
|
||||
const p = id ? this.pending.get(id) : undefined
|
||||
|
|
@ -258,7 +576,78 @@ export class GatewayClient extends EventEmitter {
|
|||
return this.logs.tail(Math.max(1, limit)).join('\n')
|
||||
}
|
||||
|
||||
private async ensureAttachedWebSocket(method: string): Promise<WebSocket> {
|
||||
if (!this.attachUrl) {
|
||||
throw new Error('gateway not running')
|
||||
}
|
||||
|
||||
if (!this.ws || this.ws.readyState === WS_CLOSED || this.ws.readyState === WS_CLOSING) {
|
||||
this.start()
|
||||
}
|
||||
|
||||
if (this.ws?.readyState === WS_CONNECTING) {
|
||||
try {
|
||||
await this.wsConnectPromise
|
||||
} catch (err) {
|
||||
throw err instanceof Error ? err : new Error(String(err))
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.ws || this.ws.readyState !== WS_OPEN) {
|
||||
throw new Error(`gateway not connected: ${method}`)
|
||||
}
|
||||
|
||||
return this.ws
|
||||
}
|
||||
|
||||
private requestOverWebSocket<T = unknown>(method: string, params: Record<string, unknown> = {}): Promise<T> {
|
||||
return this.ensureAttachedWebSocket(method).then(
|
||||
ws =>
|
||||
new Promise<T>((resolve, reject) => {
|
||||
const id = `r${++this.reqId}`
|
||||
const timeout = setTimeout(this.onTimeout, REQUEST_TIMEOUT_MS, id)
|
||||
|
||||
timeout.unref?.()
|
||||
this.pending.set(id, {
|
||||
id,
|
||||
method,
|
||||
reject,
|
||||
resolve: v => resolve(v as T),
|
||||
timeout
|
||||
})
|
||||
|
||||
try {
|
||||
ws.send(JSON.stringify({ id, jsonrpc: '2.0', method, params }))
|
||||
} catch (e) {
|
||||
const pending = this.pending.get(id)
|
||||
|
||||
if (pending) {
|
||||
clearTimeout(pending.timeout)
|
||||
this.pending.delete(id)
|
||||
}
|
||||
|
||||
reject(e instanceof Error ? e : new Error(String(e)))
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
request<T = unknown>(method: string, params: Record<string, unknown> = {}): Promise<T> {
|
||||
const attachUrl = resolveGatewayAttachUrl()
|
||||
|
||||
if (attachUrl) {
|
||||
if (this.attachUrl !== attachUrl) {
|
||||
// The env var rotated at runtime — restart the transport so
|
||||
// switching from spawned-gateway mode to attach mode also
|
||||
// tears down the old Python child. Merely closing `this.ws`
|
||||
// would leave a previously spawned gateway process alive.
|
||||
this.rejectPending(new Error('gateway attach url changed'))
|
||||
this.start()
|
||||
}
|
||||
|
||||
return this.requestOverWebSocket<T>(method, params)
|
||||
}
|
||||
|
||||
if (!this.proc?.stdin || this.proc.killed || this.proc.exitCode !== null) {
|
||||
this.start()
|
||||
}
|
||||
|
|
@ -299,5 +688,13 @@ export class GatewayClient extends EventEmitter {
|
|||
|
||||
kill() {
|
||||
this.proc?.kill()
|
||||
this.closeGatewaySocket()
|
||||
this.closeSidecarSocket()
|
||||
this.clearReadyTimer()
|
||||
// The ws 'close' handler is identity-gated on `this.ws === ws`
|
||||
// and we just nulled `this.ws`, so it will short-circuit and
|
||||
// skip handleTransportExit. Reject pending RPCs explicitly so
|
||||
// attach-mode promises do not hang after an intentional kill.
|
||||
this.rejectPending(new Error('gateway closed'))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue