feat(desktop): stream agent terminal output live instead of polling

Replace the 5s output_tail poll (which often showed nothing) with a real push
stream. The process registry gains an on_output sink called from its reader
threads with each chunk; the tui_gateway wires it to emit agent.terminal.output
{process_id, chunk} (write_json is _stdout_lock-guarded, so emitting from the
reader thread is safe). The desktop routes chunks by process id straight into
the read-only agent xterm via a small writer registry, with a capped backlog so
a tab opened mid-stream (or reopened) replays what it missed.

Drops the fragile poll/tail path: no session-key matching, no truncation, no
lag — full-fidelity ANSI, env-agnostic (local/docker/ssh).
This commit is contained in:
Brooklyn Nicholson 2026-06-28 19:33:43 -05:00
parent ad831dd492
commit 520212cc59
9 changed files with 104 additions and 71 deletions

View file

@ -0,0 +1,42 @@
// Live agent-terminal output, pushed from the backend as `agent.terminal.output`
// events (see tui_gateway `_wire_agent_terminal_output`). Chunks route straight
// to the matching read-only xterm, keyed by process id — no polling, no tail
// truncation. A capped per-proc backlog lets a tab opened mid-stream replay what
// it missed, and lets a closed-then-reopened tab restore its history.
type Writer = (chunk: string) => void
const writers = new Map<string, Writer>()
const backlog = new Map<string, string>()
const MAX_BACKLOG = 256_000
/** A live agent terminal registers its xterm write and replays the backlog.
* Returns an idempotent unregister. */
export function registerAgentTerminalWriter(procId: string, write: Writer): () => void {
writers.set(procId, write)
const history = backlog.get(procId)
if (history) {
write(history)
}
return () => {
if (writers.get(procId) === write) {
writers.delete(procId)
}
}
}
/** Append a streamed chunk: buffer it (capped) for future opens and write it to
* the live terminal, if one is mounted. */
export function writeAgentTerminalChunk(procId: string, chunk: string): void {
if (!procId || !chunk) {
return
}
const next = (backlog.get(procId) ?? '') + chunk
backlog.set(procId, next.length > MAX_BACKLOG ? next.slice(-MAX_BACKLOG) : next)
writers.get(procId)?.(chunk)
}

View file

@ -1,13 +1,10 @@
import '@xterm/xterm/css/xterm.css'
import { useStore } from '@nanostores/react'
import { Button } from '@/components/ui/button'
import { KbdCombo } from '@/components/ui/kbd'
import { Loader } from '@/components/ui/loader'
import { useI18n } from '@/i18n'
import { cn } from '@/lib/utils'
import { $backgroundOutputByProc } from '@/store/composer-status'
import { reportTerminalShell } from './terminals'
import { useAgentTerminal } from './use-agent-terminal'
@ -81,11 +78,10 @@ interface AgentTerminalInstanceProps {
procId: string
}
/** Read-only mirror of an agent background process a write-only xterm fed by
* the process's output tail (no PTY, no input). */
/** Read-only mirror of an agent background process a write-only xterm streamed
* live from the backend output (no PTY, no input). */
export function AgentTerminalInstance({ active, procId }: AgentTerminalInstanceProps) {
const output = useStore($backgroundOutputByProc)[procId] ?? ''
const { hostRef } = useAgentTerminal({ active, output })
const { hostRef } = useAgentTerminal({ active, procId })
return (
<div className={cn(INSTANCE_CLASS, active ? 'visible' : 'invisible pointer-events-none')}>

View file

@ -5,18 +5,18 @@ import { useEffect, useRef } from 'react'
import { useTheme } from '@/themes/context'
import { registerAgentTerminalWriter } from './agent-terminal-stream'
import { resolveSurfaceColor, terminalTheme } from './selection'
// Read-only terminal driven by a string (an agent background process's output
// tail), not a PTY — no input, no shell. Shares the user terminal's look so the
// two read as one surface.
export function useAgentTerminal({ active, output }: { active: boolean; output: string }) {
// Read-only terminal for an agent background process: a write-only xterm (no PTY,
// no input) fed live by the backend output stream, keyed by process id. Shares
// the user terminal's look so the two read as one surface.
export function useAgentTerminal({ active, procId }: { active: boolean; procId: string }) {
const { renderedMode, theme, themeName } = useTheme()
const hostRef = useRef<HTMLDivElement | null>(null)
const termRef = useRef<Terminal | null>(null)
const webglRef = useRef<WebglAddon | null>(null)
const fitRef = useRef<(() => void) | null>(null)
const writtenRef = useRef('')
const surfaceTheme = () => {
const ansi = renderedMode === 'dark' ? (theme.darkTerminal ?? theme.terminal) : theme.terminal
@ -76,35 +76,19 @@ export function useAgentTerminal({ active, output }: { active: boolean; output:
const observer = new ResizeObserver(() => fitRef.current?.())
observer.observe(host)
// Stream live output straight into the terminal (replays backlog on attach).
const unregister = registerAgentTerminalWriter(procId, chunk => term.write(chunk))
return () => {
unregister()
observer.disconnect()
term.dispose()
termRef.current = null
webglRef.current = null
writtenRef.current = ''
}
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
// Append the delta when the tail just grew; otherwise (the rolling window slid)
// reset and rewrite. Avoids reflowing the whole buffer on every poll.
useEffect(() => {
const term = termRef.current
if (!term) {
return
}
if (output.startsWith(writtenRef.current)) {
term.write(output.slice(writtenRef.current.length))
} else {
term.reset()
term.write(output)
}
writtenRef.current = output
}, [output])
useEffect(() => {
const term = termRef.current

View file

@ -1,8 +1,7 @@
import { useStore } from '@nanostores/react'
import { useEffect } from 'react'
import { $backgroundStatusBySession, refreshBackgroundProcesses } from '@/store/composer-status'
import { $activeSessionId } from '@/store/session'
import { $backgroundStatusBySession } from '@/store/composer-status'
import { setActiveTerminalId } from './buffer'
import { AgentTerminalInstance, TerminalInstance } from './instance'
@ -12,9 +11,6 @@ interface TerminalWorkspaceProps {
onAddSelectionToChat: (text: string, label?: string) => void
}
// Faster than the 5s status-stack poll so an open agent tab tails near-live.
const AGENT_POLL_MS = 1500
/** The persistent-overlay layer: the stack of live xterm instances (only these
* must stay in the fixed overlay, for the WebGL host). Mount/visibility is owned
* by PersistentTerminal (latched so shells survive hiding); the tab rail and
@ -22,7 +18,6 @@ const AGENT_POLL_MS = 1500
export function TerminalWorkspace({ onAddSelectionToChat }: TerminalWorkspaceProps) {
const terminals = useStore($terminals)
const activeId = useStore($activeTerminalId)
const activeSession = useStore($activeSessionId)
const background = useStore($backgroundStatusBySession)
// Mirror the tab selection into the agent reader (read_terminal reads it).
@ -35,25 +30,15 @@ export function TerminalWorkspace({ onAddSelectionToChat }: TerminalWorkspacePro
}
}, [])
// Surface the agent's background processes as read-only tabs (once each).
// Surface the agent's background processes as read-only tabs (once each); their
// output streams in live via agent.terminal.output, no polling needed.
useEffect(() => {
for (const item of (activeSession && background[activeSession]) || []) {
ensureAgentTerminal(item.id, item.title)
for (const list of Object.values(background)) {
for (const item of list) {
ensureAgentTerminal(item.id, item.title)
}
}
}, [background, activeSession])
// While an agent tab exists, tail its process faster than the status stack.
const hasAgent = terminals.some(term => term.kind === 'agent')
useEffect(() => {
if (!hasAgent || !activeSession) {
return
}
const interval = setInterval(() => void refreshBackgroundProcesses(activeSession), AGENT_POLL_MS)
return () => clearInterval(interval)
}, [hasAgent, activeSession])
}, [background])
return (
<>

View file

@ -1,6 +1,7 @@
import type { QueryClient } from '@tanstack/react-query'
import { type MutableRefObject, useCallback, useEffect, useRef } from 'react'
import { writeAgentTerminalChunk } from '@/app/right-sidebar/terminal/agent-terminal-stream'
import { readActiveTerminal } from '@/app/right-sidebar/terminal/buffer'
import { translateNow } from '@/i18n'
import {
@ -1165,6 +1166,9 @@ export function useMessageStream({
text: result ? JSON.stringify(result) : ''
})
}
} else if (event.type === 'agent.terminal.output') {
// Live chunk from a background process → its read-only agent terminal tab.
writeAgentTerminalChunk(payload?.process_id ?? '', payload?.chunk ?? '')
} else if (event.type === 'status.update') {
if (sessionId && payload?.kind === 'compacting') {
setSessionCompacting(sessionId, true)

View file

@ -52,6 +52,9 @@ export type GatewayEventPayload = {
credential_warning?: string
personality?: string
usage?: Partial<UsageStats>
// agent.terminal.output — live chunk for a read-only agent terminal tab
process_id?: string
chunk?: string
// clarify.request
request_id?: string
question?: string

View file

@ -35,22 +35,6 @@ export interface ComposerStatusItem {
// registry (`terminal(background=true)` spawns) via `process.list`.
export const $backgroundStatusBySession = atom<Record<string, ComposerStatusItem[]>>({})
// Flattened process-id → output tail, for the read-only agent terminal tabs that
// mirror background processes (keyed globally since a tab outlives its session view).
export const $backgroundOutputByProc = computed($backgroundStatusBySession, bySession => {
const out: Record<string, string> = {}
for (const list of Object.values(bySession)) {
for (const item of list) {
if (item.output) {
out[item.id] = item.output
}
}
}
return out
})
// Rows the user X-ed away. The registry keeps finished processes around for a
// while, so without this every refresh would resurrect a dismissed row.
const dismissedBySession = new Map<string, Set<string>>()

View file

@ -196,6 +196,10 @@ class ProcessRegistry:
self._global_watch_window_hits: int = 0
self._global_watch_tripped_until: float = 0.0
self._global_watch_suppressed_during_trip: int = 0
# Live-output sink set by a driver (e.g. the desktop gateway): called from
# reader threads with (session, chunk) to stream output to a UI in
# real time, instead of polling the output tail.
self.on_output = None
@staticmethod
def _clean_shell_noise(text: str) -> str:
@ -205,6 +209,17 @@ class ProcessRegistry:
lines.pop(0)
return "\n".join(lines)
def _emit_output(self, session: ProcessSession, chunk: str) -> None:
"""Forward a freshly-read chunk to the live-output sink, if one is set.
Called from reader threads; never raise into the read loop."""
sink = self.on_output
if sink is None or not chunk:
return
try:
sink(session, chunk)
except Exception:
pass
def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None:
"""Scan new output for watch patterns and queue notifications.
@ -916,6 +931,7 @@ class ProcessRegistry:
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
self._check_watch_patterns(session, chunk)
self._emit_output(session, chunk)
except Exception as e:
logger.debug("Process stdout reader ended: %s", e)
finally:
@ -954,6 +970,7 @@ class ProcessRegistry:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
if delta:
self._check_watch_patterns(session, delta)
self._emit_output(session, delta)
# Check if process is still running
check = env.execute(
@ -1002,6 +1019,7 @@ class ProcessRegistry:
if len(session.output_buffer) > session.max_output_chars:
session.output_buffer = session.output_buffer[-session.max_output_chars:]
self._check_watch_patterns(session, text)
self._emit_output(session, text)
except EOFError:
break
except Exception:

View file

@ -8311,8 +8311,25 @@ def _notification_poller_loop(
process_registry.completion_queue.put(evt)
def _wire_agent_terminal_output() -> None:
"""Idempotently route background-process output chunks to the desktop as
`agent.terminal.output` events (keyed by process id). Read-only agent
terminal tabs stream these live instead of polling the output tail.
`_emit`/`write_json` is `_stdout_lock`-guarded, so calling it from the
registry's reader threads is safe."""
from tools.process_registry import process_registry
if getattr(process_registry, "on_output", None) is not None:
return
process_registry.on_output = lambda session, chunk: _emit(
"agent.terminal.output", "", {"process_id": session.id, "chunk": chunk}
)
def _start_notification_poller(sid: str, session: dict) -> threading.Event:
"""Start the background notification poller for a TUI session."""
_wire_agent_terminal_output()
stop = threading.Event()
t = threading.Thread(
target=_notification_poller_loop,