From 520212cc593dd8bc0472002877e31d7310bd295b Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Sun, 28 Jun 2026 19:33:43 -0500 Subject: [PATCH] feat(desktop): stream agent terminal output live instead of polling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the 5s output_tail poll (which often showed nothing) with a real push stream. The process registry gains an on_output sink called from its reader threads with each chunk; the tui_gateway wires it to emit agent.terminal.output {process_id, chunk} (write_json is _stdout_lock-guarded, so emitting from the reader thread is safe). The desktop routes chunks by process id straight into the read-only agent xterm via a small writer registry, with a capped backlog so a tab opened mid-stream (or reopened) replays what it missed. Drops the fragile poll/tail path: no session-key matching, no truncation, no lag — full-fidelity ANSI, env-agnostic (local/docker/ssh). --- .../terminal/agent-terminal-stream.ts | 42 +++++++++++++++++++ .../app/right-sidebar/terminal/instance.tsx | 10 ++--- .../terminal/use-agent-terminal.ts | 34 ++++----------- .../app/right-sidebar/terminal/workspace.tsx | 31 ++++---------- .../app/session/hooks/use-message-stream.ts | 4 ++ apps/desktop/src/lib/chat-messages.ts | 3 ++ apps/desktop/src/store/composer-status.ts | 16 ------- tools/process_registry.py | 18 ++++++++ tui_gateway/server.py | 17 ++++++++ 9 files changed, 104 insertions(+), 71 deletions(-) create mode 100644 apps/desktop/src/app/right-sidebar/terminal/agent-terminal-stream.ts diff --git a/apps/desktop/src/app/right-sidebar/terminal/agent-terminal-stream.ts b/apps/desktop/src/app/right-sidebar/terminal/agent-terminal-stream.ts new file mode 100644 index 00000000000..e22cea5b903 --- /dev/null +++ b/apps/desktop/src/app/right-sidebar/terminal/agent-terminal-stream.ts @@ -0,0 +1,42 @@ +// Live agent-terminal output, pushed from the backend as `agent.terminal.output` +// events (see tui_gateway `_wire_agent_terminal_output`). Chunks route straight +// to the matching read-only xterm, keyed by process id — no polling, no tail +// truncation. A capped per-proc backlog lets a tab opened mid-stream replay what +// it missed, and lets a closed-then-reopened tab restore its history. + +type Writer = (chunk: string) => void + +const writers = new Map() +const backlog = new Map() + +const MAX_BACKLOG = 256_000 + +/** A live agent terminal registers its xterm write and replays the backlog. + * Returns an idempotent unregister. */ +export function registerAgentTerminalWriter(procId: string, write: Writer): () => void { + writers.set(procId, write) + + const history = backlog.get(procId) + + if (history) { + write(history) + } + + return () => { + if (writers.get(procId) === write) { + writers.delete(procId) + } + } +} + +/** Append a streamed chunk: buffer it (capped) for future opens and write it to + * the live terminal, if one is mounted. */ +export function writeAgentTerminalChunk(procId: string, chunk: string): void { + if (!procId || !chunk) { + return + } + + const next = (backlog.get(procId) ?? '') + chunk + backlog.set(procId, next.length > MAX_BACKLOG ? next.slice(-MAX_BACKLOG) : next) + writers.get(procId)?.(chunk) +} diff --git a/apps/desktop/src/app/right-sidebar/terminal/instance.tsx b/apps/desktop/src/app/right-sidebar/terminal/instance.tsx index 4e881ff85b6..23d905f0503 100644 --- a/apps/desktop/src/app/right-sidebar/terminal/instance.tsx +++ b/apps/desktop/src/app/right-sidebar/terminal/instance.tsx @@ -1,13 +1,10 @@ import '@xterm/xterm/css/xterm.css' -import { useStore } from '@nanostores/react' - import { Button } from '@/components/ui/button' import { KbdCombo } from '@/components/ui/kbd' import { Loader } from '@/components/ui/loader' import { useI18n } from '@/i18n' import { cn } from '@/lib/utils' -import { $backgroundOutputByProc } from '@/store/composer-status' import { reportTerminalShell } from './terminals' import { useAgentTerminal } from './use-agent-terminal' @@ -81,11 +78,10 @@ interface AgentTerminalInstanceProps { procId: string } -/** Read-only mirror of an agent background process — a write-only xterm fed by - * the process's output tail (no PTY, no input). */ +/** Read-only mirror of an agent background process — a write-only xterm streamed + * live from the backend output (no PTY, no input). */ export function AgentTerminalInstance({ active, procId }: AgentTerminalInstanceProps) { - const output = useStore($backgroundOutputByProc)[procId] ?? '' - const { hostRef } = useAgentTerminal({ active, output }) + const { hostRef } = useAgentTerminal({ active, procId }) return (
diff --git a/apps/desktop/src/app/right-sidebar/terminal/use-agent-terminal.ts b/apps/desktop/src/app/right-sidebar/terminal/use-agent-terminal.ts index aab7c1428a0..8e47a20a6c9 100644 --- a/apps/desktop/src/app/right-sidebar/terminal/use-agent-terminal.ts +++ b/apps/desktop/src/app/right-sidebar/terminal/use-agent-terminal.ts @@ -5,18 +5,18 @@ import { useEffect, useRef } from 'react' import { useTheme } from '@/themes/context' +import { registerAgentTerminalWriter } from './agent-terminal-stream' import { resolveSurfaceColor, terminalTheme } from './selection' -// Read-only terminal driven by a string (an agent background process's output -// tail), not a PTY — no input, no shell. Shares the user terminal's look so the -// two read as one surface. -export function useAgentTerminal({ active, output }: { active: boolean; output: string }) { +// Read-only terminal for an agent background process: a write-only xterm (no PTY, +// no input) fed live by the backend output stream, keyed by process id. Shares +// the user terminal's look so the two read as one surface. +export function useAgentTerminal({ active, procId }: { active: boolean; procId: string }) { const { renderedMode, theme, themeName } = useTheme() const hostRef = useRef(null) const termRef = useRef(null) const webglRef = useRef(null) const fitRef = useRef<(() => void) | null>(null) - const writtenRef = useRef('') const surfaceTheme = () => { const ansi = renderedMode === 'dark' ? (theme.darkTerminal ?? theme.terminal) : theme.terminal @@ -76,35 +76,19 @@ export function useAgentTerminal({ active, output }: { active: boolean; output: const observer = new ResizeObserver(() => fitRef.current?.()) observer.observe(host) + // Stream live output straight into the terminal (replays backlog on attach). + const unregister = registerAgentTerminalWriter(procId, chunk => term.write(chunk)) + return () => { + unregister() observer.disconnect() term.dispose() termRef.current = null webglRef.current = null - writtenRef.current = '' } // eslint-disable-next-line react-hooks/exhaustive-deps }, []) - // Append the delta when the tail just grew; otherwise (the rolling window slid) - // reset and rewrite. Avoids reflowing the whole buffer on every poll. - useEffect(() => { - const term = termRef.current - - if (!term) { - return - } - - if (output.startsWith(writtenRef.current)) { - term.write(output.slice(writtenRef.current.length)) - } else { - term.reset() - term.write(output) - } - - writtenRef.current = output - }, [output]) - useEffect(() => { const term = termRef.current diff --git a/apps/desktop/src/app/right-sidebar/terminal/workspace.tsx b/apps/desktop/src/app/right-sidebar/terminal/workspace.tsx index 48389e0b6c7..e63d8e2bad1 100644 --- a/apps/desktop/src/app/right-sidebar/terminal/workspace.tsx +++ b/apps/desktop/src/app/right-sidebar/terminal/workspace.tsx @@ -1,8 +1,7 @@ import { useStore } from '@nanostores/react' import { useEffect } from 'react' -import { $backgroundStatusBySession, refreshBackgroundProcesses } from '@/store/composer-status' -import { $activeSessionId } from '@/store/session' +import { $backgroundStatusBySession } from '@/store/composer-status' import { setActiveTerminalId } from './buffer' import { AgentTerminalInstance, TerminalInstance } from './instance' @@ -12,9 +11,6 @@ interface TerminalWorkspaceProps { onAddSelectionToChat: (text: string, label?: string) => void } -// Faster than the 5s status-stack poll so an open agent tab tails near-live. -const AGENT_POLL_MS = 1500 - /** The persistent-overlay layer: the stack of live xterm instances (only these * must stay in the fixed overlay, for the WebGL host). Mount/visibility is owned * by PersistentTerminal (latched so shells survive hiding); the tab rail and @@ -22,7 +18,6 @@ const AGENT_POLL_MS = 1500 export function TerminalWorkspace({ onAddSelectionToChat }: TerminalWorkspaceProps) { const terminals = useStore($terminals) const activeId = useStore($activeTerminalId) - const activeSession = useStore($activeSessionId) const background = useStore($backgroundStatusBySession) // Mirror the tab selection into the agent reader (read_terminal reads it). @@ -35,25 +30,15 @@ export function TerminalWorkspace({ onAddSelectionToChat }: TerminalWorkspacePro } }, []) - // Surface the agent's background processes as read-only tabs (once each). + // Surface the agent's background processes as read-only tabs (once each); their + // output streams in live via agent.terminal.output, no polling needed. useEffect(() => { - for (const item of (activeSession && background[activeSession]) || []) { - ensureAgentTerminal(item.id, item.title) + for (const list of Object.values(background)) { + for (const item of list) { + ensureAgentTerminal(item.id, item.title) + } } - }, [background, activeSession]) - - // While an agent tab exists, tail its process faster than the status stack. - const hasAgent = terminals.some(term => term.kind === 'agent') - - useEffect(() => { - if (!hasAgent || !activeSession) { - return - } - - const interval = setInterval(() => void refreshBackgroundProcesses(activeSession), AGENT_POLL_MS) - - return () => clearInterval(interval) - }, [hasAgent, activeSession]) + }, [background]) return ( <> diff --git a/apps/desktop/src/app/session/hooks/use-message-stream.ts b/apps/desktop/src/app/session/hooks/use-message-stream.ts index aef87783864..b5a6b8017be 100644 --- a/apps/desktop/src/app/session/hooks/use-message-stream.ts +++ b/apps/desktop/src/app/session/hooks/use-message-stream.ts @@ -1,6 +1,7 @@ import type { QueryClient } from '@tanstack/react-query' import { type MutableRefObject, useCallback, useEffect, useRef } from 'react' +import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream' import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer' import { translateNow } from '@/i18n' import { @@ -1165,6 +1166,9 @@ export function useMessageStream({ text: result ? JSON.stringify(result) : '' }) } + } else if (event.type === 'agent.terminal.output') { + // Live chunk from a background process → its read-only agent terminal tab. + writeAgentTerminalChunk(payload?.process_id ?? '', payload?.chunk ?? '') } else if (event.type === 'status.update') { if (sessionId && payload?.kind === 'compacting') { setSessionCompacting(sessionId, true) diff --git a/apps/desktop/src/lib/chat-messages.ts b/apps/desktop/src/lib/chat-messages.ts index 72997c2ddd0..317108a9955 100644 --- a/apps/desktop/src/lib/chat-messages.ts +++ b/apps/desktop/src/lib/chat-messages.ts @@ -52,6 +52,9 @@ export type GatewayEventPayload = { credential_warning?: string personality?: string usage?: Partial + // agent.terminal.output — live chunk for a read-only agent terminal tab + process_id?: string + chunk?: string // clarify.request request_id?: string question?: string diff --git a/apps/desktop/src/store/composer-status.ts b/apps/desktop/src/store/composer-status.ts index fe6f4262972..4d20b476b74 100644 --- a/apps/desktop/src/store/composer-status.ts +++ b/apps/desktop/src/store/composer-status.ts @@ -35,22 +35,6 @@ export interface ComposerStatusItem { // registry (`terminal(background=true)` spawns) via `process.list`. export const $backgroundStatusBySession = atom>({}) -// Flattened process-id → output tail, for the read-only agent terminal tabs that -// mirror background processes (keyed globally since a tab outlives its session view). -export const $backgroundOutputByProc = computed($backgroundStatusBySession, bySession => { - const out: Record = {} - - for (const list of Object.values(bySession)) { - for (const item of list) { - if (item.output) { - out[item.id] = item.output - } - } - } - - return out -}) - // Rows the user X-ed away. The registry keeps finished processes around for a // while, so without this every refresh would resurrect a dismissed row. const dismissedBySession = new Map>() diff --git a/tools/process_registry.py b/tools/process_registry.py index f364c00ebe6..b6aa38fed9a 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -196,6 +196,10 @@ class ProcessRegistry: self._global_watch_window_hits: int = 0 self._global_watch_tripped_until: float = 0.0 self._global_watch_suppressed_during_trip: int = 0 + # Live-output sink set by a driver (e.g. the desktop gateway): called from + # reader threads with (session, chunk) to stream output to a UI in + # real time, instead of polling the output tail. + self.on_output = None @staticmethod def _clean_shell_noise(text: str) -> str: @@ -205,6 +209,17 @@ class ProcessRegistry: lines.pop(0) return "\n".join(lines) + def _emit_output(self, session: ProcessSession, chunk: str) -> None: + """Forward a freshly-read chunk to the live-output sink, if one is set. + Called from reader threads; never raise into the read loop.""" + sink = self.on_output + if sink is None or not chunk: + return + try: + sink(session, chunk) + except Exception: + pass + def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None: """Scan new output for watch patterns and queue notifications. @@ -916,6 +931,7 @@ class ProcessRegistry: if len(session.output_buffer) > session.max_output_chars: session.output_buffer = session.output_buffer[-session.max_output_chars:] self._check_watch_patterns(session, chunk) + self._emit_output(session, chunk) except Exception as e: logger.debug("Process stdout reader ended: %s", e) finally: @@ -954,6 +970,7 @@ class ProcessRegistry: session.output_buffer = session.output_buffer[-session.max_output_chars:] if delta: self._check_watch_patterns(session, delta) + self._emit_output(session, delta) # Check if process is still running check = env.execute( @@ -1002,6 +1019,7 @@ class ProcessRegistry: if len(session.output_buffer) > session.max_output_chars: session.output_buffer = session.output_buffer[-session.max_output_chars:] self._check_watch_patterns(session, text) + self._emit_output(session, text) except EOFError: break except Exception: diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 4eb4a1b2ff9..71a7e6718f4 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -8311,8 +8311,25 @@ def _notification_poller_loop( process_registry.completion_queue.put(evt) +def _wire_agent_terminal_output() -> None: + """Idempotently route background-process output chunks to the desktop as + `agent.terminal.output` events (keyed by process id). Read-only agent + terminal tabs stream these live instead of polling the output tail. + `_emit`/`write_json` is `_stdout_lock`-guarded, so calling it from the + registry's reader threads is safe.""" + from tools.process_registry import process_registry + + if getattr(process_registry, "on_output", None) is not None: + return + + process_registry.on_output = lambda session, chunk: _emit( + "agent.terminal.output", "", {"process_id": session.id, "chunk": chunk} + ) + + def _start_notification_poller(sid: str, session: dict) -> threading.Event: """Start the background notification poller for a TUI session.""" + _wire_agent_terminal_output() stop = threading.Event() t = threading.Thread( target=_notification_poller_loop,