hermes-agent/ui-tui/src/app/createGatewayEventHandler.ts
Siddharth Balyan fcb1944b4f
Some checks are pending
Deploy Site / deploy-vercel (push) Waiting to run
Deploy Site / deploy-docs (push) Waiting to run
Docker Build and Publish / build-amd64 (push) Waiting to run
Docker Build and Publish / build-arm64 (push) Waiting to run
Docker Build and Publish / merge (push) Blocked by required conditions
Lint (ruff + ty) / ruff + ty diff (push) Waiting to run
Lint (ruff + ty) / ruff enforcement (blocking) (push) Waiting to run
Lint (ruff + ty) / Windows footguns (blocking) (push) Waiting to run
Nix Lockfile Fix / auto-fix-main (push) Waiting to run
Nix Lockfile Fix / fix (push) Waiting to run
Nix / nix (macos-latest) (push) Waiting to run
Nix / nix (ubuntu-latest) (push) Waiting to run
OSV-Scanner / Scan lockfiles (push) Waiting to run
Tests / test (1) (push) Waiting to run
Tests / test (2) (push) Waiting to run
Tests / test (3) (push) Waiting to run
Tests / test (4) (push) Waiting to run
Tests / test (5) (push) Waiting to run
Tests / test (6) (push) Waiting to run
Tests / save-durations (push) Blocked by required conditions
Tests / e2e (push) Waiting to run
uv.lock check / uv lock --check (push) Waiting to run
feat(credits): usage-aware credits — in-session notices, /usage view, dev readout (#40011)
* feat(tui): HERMES_DEV_CREDITS live-spend dev readout (L0 tracer for usage-aware credits)

L0 of the usage-aware-credits feature: a dev-only, env-gated tracer that
exercises the real header -> CreditsState -> TUI pipe end-to-end behind
HERMES_DEV_CREDITS, de-risking the L1/L5 build before the notice policy exists.

- agent/credits_tracker.py: CreditsState + parse_credits_headers (headers are
  strings -> paid_access via == "true", never bool(); retain-last-known; only
  subscription_micros may be negative; *_usd kept verbatim).
- run_agent.py: _capture_credits / get_credits_state / get_credits_spent_micros,
  session-start baseline latch, + dev-gated "credits" capture log.
- agent/chat_completion_helpers.py: capture on the streaming response.
- agent/agent_init.py: init _credits_state + _credits_session_start_micros.
- tui_gateway/server.py: _get_usage emits dev_credits_spent_micros only when flagged.
- ui-tui appChrome.tsx / types.ts: cents delta status segment + "(dev credits)" banner.

Off by default; silent for normal users. Validated live against staging
(capture log delta matches the TUI segment). Throwaway consumer (readout/log/
banner); credits_tracker + the capture plumbing are the real feature foundation.

* test(credits): lock parser under 9-state matrix + harden validation (L2)

Add tests/agent/test_credits_tracker.py with 92 tests covering the 9-state
matrix (healthy, sub_90pct, grant_exhausted, purchased_only, tool_pool_free,
depleted, debt, missing, no_org) plus validation edge cases: version strict==1
with warn-once latch for v>1, bool-string trap (paid_access/tool_pool_gated_off
== "true"/"false", never bool()), half-pair subscription limit treated as
both-absent while parse succeeds, USD regex ^-?\d+\.\d{2}$, non-int micros
→ None, negative non-subscription micros → None, as_of_ms junk → None, zero
limit ZeroDivision guard.

Harden agent/credits_tracker.py to match the spec:
- Add tool_pool_micros/tool_pool_gated_off/from_header fields to CreditsState
- Add depleted property (== not paid_access, never remaining==0)
- Change used_fraction guard to key off subscription_limit_micros (the actual
  denominator) not denominator_kind (metadata)
- Replace fail-soft _safe_int with a sentinel-returning variant; full validation
  now returns None on any malformed field rather than silently defaulting
- Add module-level warn-once latch for version > 1
- Add USD regex validation; add denominator_kind allow-list check
- Parse x-nous-tool-pool-* prefix headers (not x-nous-credits-tool-pool-*)

* feat(credits): notice spine — AgentNotice + notice_callback/notice_clear_callback + TUI binding (L1)

L1 of usage-aware credits: the driver-agnostic notice delivery spine that L4's
policy will fire through and L5's TUI render will consume.

- agent/credits_tracker.py: AgentNotice dataclass (text/level/kind/ttl_ms/key/id;
  kind defaults "sticky", kept TTL-expressive for a future config seam).
- run_agent.py: AIAgent gains notice_callback + notice_clear_callback slots and
  _emit_notice / _emit_notice_clear emitters (swallow all callback errors — a
  notice must never break the agent loop; no-op when unbound).
- agent/agent_init.py: thread both callbacks through init_agent.
- tui_gateway/server.py: bind both in _agent_cbs → notification.show / notification.clear
  WS events (snake_case payload, matching the existing gateway-event convention).
- ui-tui/src/gatewayTypes.ts: notification.show / notification.clear arms on GatewayEvent.
- tests/run_agent/test_notice_spine.py: 15 tests (emitter fire + fail-open + no-op,
  signature threading, TUI binding payload shape).

Messaging push is out of v1 (binds neither callback). CLI binding + the TUI render/
decode land with L4 (firing) and L5 (render) so turn-end flush is wired correctly.

* feat(credits): threshold reconciliation policy + tests (L4.1)

* feat(credits): wire threshold policy into capture + latch (L4.2)

After a fresh header parse, _capture_credits runs evaluate_credits_notices against
the agent's _credits_latch and emits the result — clears first, then shows (so a
recovered depletion clears before the "restored" success lands, and depleted wins
the latest-wins slot). Gated on a bound notice_callback: messaging (no callbacks)
still caches state for /usage but runs no policy. Parse stays fail-open (miss →
keep last-known); the eval/emit path warns on failure rather than swallowing, so a
depletion-notice bug can't vanish silently.

- run_agent.py: _capture_credits split into parse (swallow→miss) + policy (warn);
  latch lazy-guarded (object.__new__ safety).
- agent/agent_init.py: init agent._credits_latch = {"active": set(), "seen_below_90": False}.

* feat(tui): render credits notices in the status bar (L5, Strategy B)

The TUI now renders the notification.show / notification.clear gateway events the
agent emits — a level-colored notice overrides the status/verb slot when not busy.

- Notice state machine on turnController (pendingNotice + dedicated noticeTimer +
  show/clear/applyNotice/flushPendingNotice/clearNoticeState). createGatewayEventHandler
  decodes the events and delegates.
- Render priority busy > notice > status (appChrome StatusRule); notice text rendered
  verbatim (its glyph comes from the policy), shrinkable so it never clips model│ctx;
  dev-credits banner + Δ segment preserved. UiState.notice is snake_case (matches wire).
- Busy-wins: a notice arriving mid-turn is held and flushed at the THREE turn-end sites
  (recordMessageComplete / interruptTurn / recordError) — never idle(), which reset()
  also calls (would leak across sessions); reset() clears instead.
- Dedicated noticeTimer (never statusTimer); TTL starts on visibility with an id-guard;
  latest-wins cancels the prior timer; clear is key-matched (no-op on mismatch); a sticky
  survives a turn (flush no-ops with no pending); session reset clears (no cross-session leak).
- 20 tests (handler/turnController logic incl. R3-C2 timer isolation + render priority).

* feat(credits): cold-start seed for new Nous sessions (L3)

A genuinely-new Nous session has no inference header yet, so seed credits state from
the authoritative GET /api/oauth/account snapshot at session start (in the new-session
branch of _restore_or_build_system_prompt — inline, since the on_session_start plugin
hook gets no agent reference). The seed runs the shared notice policy, so a session that
opens already depleted warns IMMEDIATELY rather than only after the first turn.

- Maps the nested account fields (paid_service_access → paid_access; total_usable /
  subscription / purchased on paid_service_access_info; rollover on subscription), each
  None-guarded; float dollars → micros via round(d*1e6), *_usd left "" (render formats
  from micros — never synthesize a verbatim usd from a float).
- Magnitudes-only: no monthlyCredits on the endpoint → subscription_limit_* unset →
  used_fraction None → no warn90 from the seed (% only once a header lands, per D-E).
- Provider-guarded to Nous; fail-open (any error leaves _credits_state None, never
  blocks startup); paid_access unknown ⇒ True (never falsely depleted).
- run_agent.py: extracted the warm-path policy/emit block into a shared
  _emit_credits_notices() so capture and the seed fire notices identically.

* feat(credits): /usage Nous credits magnitudes view + recovery trigger (L6)

Add Nous credit dollar magnitudes to /usage (subscription / top-up / total
+ rollover + renewal + portal CTA), magnitudes-only per v1 (no % until the
account endpoint exposes a denominator). Reuses the existing account-usage
render machinery via a new pure build_nous_credits_snapshot() that maps a
NousPortalAccountInfo to an AccountUsageSnapshot; no nous branch is added to
fetch_account_usage (keeps the per-provider boundary intact).

CLI /usage also doubles as a depletion-recovery trigger: a force_fresh
account fetch, kept in a SEPARATE local so it never clobbers the
header-sourced agent._credits_state (which alone carries used_fraction). If
paid access recovered while credits.depleted is latched and a notice
consumer is bound, it reuses agent._emit_credits_notices() to clear it.
Gateway /usage displays magnitudes only — messaging binds no notice
consumer, so it performs no recovery emit.

Fail-open throughout: any portal hiccup leaves /usage unaffected.

* refactor(credits): dedupe HERMES_DEV_CREDITS flag parse via shared helpers

The dev-flag truthy check was inlined in three places. Replace with the shared
utils.is_truthy_value (run_agent.py, tui_gateway/server.py — also drops a
redundant inline `import os`) and a hoisted DEV_CREDITS_MODE export in
ui-tui/src/config/env.ts (consumed by appChrome, which also stops recomputing the
env check on every render). Behaviour-preserving; identical truthy set.

* fix(credits): cut dead /usage recovery trigger + bound portal fetches (L6 review)

Adversarial review found the /usage depletion-recovery trigger dead AND broken:
the CLI binds no notice_clear_callback, the TUI runs /usage in a separate
slash-worker subprocess (its own agent/latch), and the no-clobber rule made it
evaluate stale paid_access anyway. Recovery already happens on the next inference
(warm path), so the trigger was redundant — remove it and stop the depleted
notice over-promising.

- cli.py: remove the dead recovery block; bound the /usage portal fetch with a
  10s wall-clock timeout (ThreadPoolExecutor) like the per-provider fetch —
  urllib's per-socket timeout is not a wall-clock guarantee.
- agent/credits_tracker.py: reword the depleted CTA to "run /usage for balance"
  (no false recovery promise; /usage shows fresh magnitudes, sticky clears next turn).
- agent/conversation_loop.py: same wall-clock timeout on the cold-start seed fetch
  so a stalled portal can't hang session startup; tidy its time import.

* chore(credits): dev notice-state fixtures (HERMES_DEV_CREDITS_FIXTURE)

Throwaway dev scaffolding to exercise the notice pipeline without real spend or
Redis seeding. Set HERMES_DEV_CREDITS_FIXTURE to a state name (healthy / sub_90pct
/ grant_exhausted / depleted / clear) or a file path whose contents name a state
(re-read each turn → flip states live for recovery testing). _capture_credits
injects the chosen CreditsState instead of parsing real headers and runs the
shared notice policy. Deletable with the rest of the HERMES_DEV_CREDITS scaffolding.

* feat(credits): /usage monthly-grant % gauge

The portal /api/oauth/account subscription block now carries monthly_credits
(the per-period grant allowance, the % denominator). The consumer parsed
monthly_charge but dropped monthly_credits, so /usage stayed magnitudes-only.

Capture monthly_credits into NousPortalSubscriptionInfo + _subscription_from_payload.
build_nous_credits_snapshot emits a Subscription usage window (real % used, routed
through the existing render machinery) when monthly_credits is a finite positive
denominator and credits_remaining is finite and <= cap; otherwise it degrades to
magnitudes-only (older portals, rollover-over-cap, or non-finite payloads).

Guards (adversarial-review-driven): reject non-finite operands (json.loads parses
bare NaN/Infinity by default → would render $nan + a false 100% used), reject
bools, guard div-by-zero (cap>0), and suppress the gauge when remaining > cap
(rollover spanning the period makes the cap a nonsensical denominator → the
$X-of-$Y detail would read as a contradiction). Debt (remaining<0) clamps to 100%.

Money rule preserved: the ratio + magnitudes are computed from numeric float
account fields via display formatting, never by parsing a server *_usd string
(there are none on these dataclasses).

13 gauge tests added (tests/agent/test_nous_credits_gauge.py).

* fix(credits): show /usage Nous block whenever a Nous account is present

/usage runs in a slash-worker subprocess whose resolved inference provider is
often not "nous" even when the user has a Nous account, so gating the Nous
credits block on (provider == "nous") hid it entirely — the account data was
fully available but never rendered.

Gate instead on "a Nous account is logged in": a cheap local auth-state lookup
(get_provider_auth_state('nous') has an access_token) decides whether to attempt
the portal fetch, regardless of which provider inference runs on. In the gateway
the block is also lifted out of the 'if provider:' scope so a Nous-credentialled
user with another (or no) resident inference provider still sees their balance.
Fail-open and the per-fetch wall-clock timeout are preserved.

* fix(credits): show /usage Nous block when there's no live agent (TUI slash-worker)

In the TUI, /usage runs in a slash-worker subprocess that resumes the session
WITHOUT building an agent (self.agent is None), so _show_usage early-returned
"(._.) No active agent" before ever reaching the Nous credits block — which is
agent-independent (a portal fetch gated on Nous auth-state). Extract the block
into _print_nous_credits_block() and run it at the no-agent / no-calls
early-returns too (returns True if it printed, so the fallback message only
shows when there's genuinely nothing).

Verified live against staging: the block + monthly-grant gauge now render in the
slash-worker /usage path (previously hidden). The plain CLI REPL + messaging
paths are unchanged (they have a live agent).

* feat(credits): escalating 50/75/90 usage bands (single status line)

Replace the lone 90%-used warning with three escalating bands (50 info, 75 warn,
90 warn) shown as ONE status-bar line: it displays the highest band the
subscription grant has crossed, replaces the line as usage climbs, steps back
down on recovery, and clears below 50%. No stacking, no per-turn churn.

Bands live in a tunable CREDITS_USAGE_BANDS list; the policy derives everything
from it. Single notice key (credits.usage) with a usage_band latch field so the
notice only re-emits when the band actually changes. The crossing gate
(seen_below_90) is preserved so a fresh live session that opens mid-range stays
quiet until it has been observed below the lowest band (cold-start primes it when
it wants an open-high warning). Denominator math unchanged: % = subscription
grant burn (cap - grant_remaining)/cap, clamped [0,1]; top-up never moves the %.

Migrated test_credits_policy.py to the new key + added TestUsageBands (climb,
step-down, recovery-clear, idempotent, inclusive boundaries).

* feat(credits): hydrate notices at session OPEN via shared seed (TUI + first-turn)

Notices previously only fired inside a conversation turn (first message), so a
session that opened already depleted / past a usage band showed nothing at
'ready'. Extract the cold-start seed into a shared seed_credits_at_session_start()
and call it (a) in the TUI/desktop agent build right after the notice callback is
wired (fires at 'ready', before any message) and (b) as the first-turn fallback in
conversation_loop. Idempotent (skips once _credits_state exists) and fail-open.

The seed now maps monthly_credits -> subscription_limit_micros +
denominator_kind='subscription_cap', so used_fraction is computable at seed time
and usage-band warnings (not just depletion) hydrate on open. Primes the crossing
latch so a session opening already in a band warns immediately. Degrades to
depletion-only when monthly_credits is absent (older portals).

Adds test_credits_cold_start.py covering open-at-band, depletion, debt, no-cap
degradation, and the shared seed (fires/idempotent/skips-non-nous).

* feat(credits): /usage monthly-grant % gauge + fixture support + TUI surfacing

agent/account_usage.py: build_nous_credits_snapshot emits a subscription %% gauge
when the portal supplies a positive, finite monthly_credits denominator with
remaining <= cap (guards reject NaN/Infinity and rollover-over-cap, which would
render $nan or a contradictory $X-of-$Y); degrades to magnitudes-only otherwise.
Adds shared nous_credits_lines() (auth-gated, wall-clock-bounded portal fetch) so
the CLI and TUI /usage render the same block, and _snapshot_from_credits_state()
so HERMES_DEV_CREDITS_FIXTURE drives /usage offline too.

TUI: session.usage RPC carries credits_lines (agent-independent) and the /usage
panel renders them regardless of API-call count or resume state — previously the
TUI's separate /usage implementation only showed token counts.

Money rule preserved: %% and magnitudes come from numeric float account fields via
display formatting, never by parsing a server *_usd string.

* feat(credits): CLI REPL inline notices (parity with TUI)

The plain CLI agent bound no notice callbacks, so credit notices were TUI-only.
Bind notice_callback/notice_clear_callback on the CLI AIAgent; _on_notice renders
a single level-colored line above the prompt (error red / warn yellow / success
green / info dim) via _cprint, and seed credits at session open so a depletion or
usage-band warning shows before the first message — the same hydration the TUI
got. _on_notice_clear is a no-op (the REPL prints lines, no persistent slot).

* test(credits): add sub_50pct + sub_75pct dev fixtures for the new usage bands

The fixture set jumped 10%% -> 90%%; add sub_50pct (uf 0.5 -> band 50 info) and
sub_75pct (uf 0.75 -> band 75 warn) so the new escalating bands are exercisable
via HERMES_DEV_CREDITS_FIXTURE across all three surfaces (notice, session-open
seed, /usage gauge).

* fix(credits): usage-band notice clears on next prompt (not sticky-forever)

A 50/75/90 usage heads-up was sticky and camped the status bar indefinitely. Clear
the visible credits.usage notice when a new turn starts (startMessage), so it shows
until your next prompt then yields. The server latch is unchanged, so it won't
re-nag at the same band — it only re-shows when the band actually changes (climb)
or clears when usage drops below the lowest band. Depletion stays sticky.

* refactor(credits): consolidate the /usage credits block behind nous_credits_lines()

The CLI (_print_nous_credits_block) and the messaging gateway (_handle_usage_command)
each re-implemented the auth-gate + portal fetch + render, and both bypassed the
dev-fixture short-circuit that only the TUI honored — so /usage ignored
HERMES_DEV_CREDITS_FIXTURE on the CLI and in chat. Route both through the shared
agent.account_usage.nous_credits_lines() helper: one fetch/render path, one auth
gate, and the fixture works on every surface (~60 fewer duplicated lines).

The gateway usage test recorded only the last asyncio.to_thread call; /usage now
dispatches both the account fetch and the credits fetch, so it records every call
and matches the account fetch by its provider arg.

* fix(credits): keep the /usage gauge type-safe and log its fail-open path

_is_finite_num is now a TypeGuard[float], so the type checker narrows the gauge
operands (monthly_credits / credits_remaining) and the magnitudes passed to
_fmt_usd through it — no more None-operand warnings on the arithmetic. Add a debug
breadcrumb on the nous_credits_lines portal-fetch fail-open so a dead /usage block
is diagnosable in agent.log without a dev flag.

* fix(credits): harden the header tracker — prod-leak gate, hot-path probe, fire-and-forget seed

- Prod-leak guard: dev fixtures (HERMES_DEV_CREDITS_FIXTURE) now also require
  HERMES_DEV_CREDITS, so a stray fixture var can't surface fabricated balances on a
  real account. Matches the documented run workflow (both vars set together).
- Hot-path probe: parse_credits_headers checks for the version sentinel header
  before allocating a lowercased copy of the response headers — skips that work on
  every non-Nous API call. Behaviour-identical and still case-insensitive.
- Fire-and-forget seed: the real portal fetch in seed_credits_at_session_start now
  runs in a daemon thread, so a slow/unreachable portal never delays session "ready"
  (previously blocked up to 10s). The dev-fixture path stays synchronous; the thread
  re-checks idempotency before hydrating (a live header may land first).
- Diagnostics: debug breadcrumbs on the parse and seed fail-open paths so a crashed
  parser / dead seed is distinguishable from a legitimate no-headers miss.

Cold-start tests set HERMES_DEV_CREDITS alongside the fixture to match the gate.

* test(tui): fix env-timing in the StatusRule dev-credits assertion

DEV_CREDITS_MODE is read once at module load (config/env), so mutating
process.env.HERMES_DEV_CREDITS inside the test couldn't flip it — the dev-banner
assertion only passed if the env was exported before vitest started, and failed in a
normal run. Move that assertion to a sibling file that mocks config/env with
DEV_CREDITS_MODE: true (scoped, no module-reset / React-identity hazard).

* test(credits): cover the dev-fixture /usage render and usage-band clear-on-prompt

- _snapshot_from_credits_state (the offline /usage renderer) had no direct test:
  lock the gauge math, the verbatim *_usd magnitudes, the depletion line and the
  fixture marker, plus the no-cap (no gauge) and None-state cases.
- turnController.startMessage had no test for clearing the credits.usage notice on
  the next prompt while leaving credits.depleted sticky.

* feat(credits): deliver credit notices over messaging gateways

Bind notice_callback/notice_clear_callback on the per-turn gateway agent
so usage-band / depletion / restored notices reach Telegram/Discord/Slack/
etc. Previously the messaging gateway bound neither callback, so the agent's
_emit_credits_notices early-returned and a chat user crossing a band got
nothing unless they ran /usage manually.

- render_notice_line(): AgentNotice -> single plaintext line (level glyph +
  text), plaintext-only so it renders uniformly without per-platform escaping.
  Fail-soft on malformed/empty notices.
- Standalone push for every notice (messaging has no persistent status bar):
  route through the shared _deliver_platform_notice rail (honors private/
  public delivery + thread metadata), scheduled onto the gateway loop via
  safe_schedule_threadsafe from the agent's sync worker thread — same pattern
  as _status_callback_sync.
- The fired-once latch lives on the cached (reused-in-place) agent and
  persists across turns, so a band crosses once -> one push, no per-turn
  re-nag. Re-fires only after idle-eviction rebuilds the agent (a reminder).
- Recovery ('Credit access restored') rides the show path (emitted as a
  success notice, not a clear). notice_clear_callback is a no-op: a sent
  platform message can't be cleanly retracted.

Tests: render glyph/levels/fail-soft + public/private delivery seam through
_deliver_platform_notice + no-adapter no-op.

* fix(credits): don't double the glyph on messaging notices

render_notice_line prepended a per-level glyph, but the notice policy already
bakes the glyph into the text (and the TUI + CLI render it verbatim) — so every
credit notice over messaging came out doubled ("⚠ ⚠ Credits 90% used",
" ✕ Credit access paused"). Emit the text verbatim instead; drop the now-dead
level→glyph map.

The render tests fed glyph-less text (and the success case only checked
startswith), so the doubling slipped through. Rework them around the verbatim
contract and add an end-to-end regression that runs real evaluate_credits_notices
output through render_notice_line and asserts the line is returned unchanged.
2026-06-06 13:18:18 +05:30

916 lines
28 KiB
TypeScript

import { STARTUP_IMAGE, STARTUP_QUERY } from '../config/env.js'
import { STREAM_BATCH_MS } from '../config/timing.js'
import { buildSetupRequiredSections, SETUP_REQUIRED_TITLE } from '../content/setup.js'
import type {
CommandsCatalogResponse,
ConfigFullResponse,
DelegationStatusResponse,
GatewayEvent,
GatewaySkin,
SessionMostRecentResponse
} from '../gatewayTypes.js'
import { rpcErrorMessage } from '../lib/rpc.js'
import { topLevelSubagents } from '../lib/subagentTree.js'
import { formatAbandonedClarify, formatToolCall, stripAnsi } from '../lib/text.js'
import { fromSkin } from '../theme.js'
import type { Msg, SubagentProgress, SubagentStatus } from '../types.js'
import { applyDelegationStatus, getDelegationState } from './delegationStore.js'
import type { GatewayEventHandlerContext } from './interfaces.js'
import { getOverlayState, patchOverlayState } from './overlayStore.js'
import { turnController } from './turnController.js'
import { getUiState, patchUiState } from './uiStore.js'
const NO_PROVIDER_RE = /\bNo (?:LLM|inference) provider configured\b/i
const statusFromBusy = () => (getUiState().busy ? 'running…' : 'ready')
const applySkin = (s: GatewaySkin) =>
patchUiState({
theme: fromSkin(
s.colors ?? {},
s.branding ?? {},
s.banner_logo ?? '',
s.banner_hero ?? '',
s.tool_prefix ?? '',
s.help_header ?? ''
)
})
const dropBgTask = (taskId: string) =>
patchUiState(state => {
const next = new Set(state.bgTasks)
next.delete(taskId)
return { ...state, bgTasks: next }
})
const pushUnique =
(max: number) =>
<T>(xs: T[], x: T): T[] =>
xs.at(-1) === x ? xs : [...xs, x].slice(-max)
const pushThinking = pushUnique(6)
const pushNote = pushUnique(6)
const pushTool = pushUnique(8)
const KNOWN_SUBAGENT_STATUSES = new Set<SubagentStatus>([
'completed',
'error',
'failed',
'interrupted',
'queued',
'running',
'timeout'
])
const normalizeSubagentStatus = (status: unknown, fallback: SubagentStatus): SubagentStatus => {
if (typeof status !== 'string') {
return fallback
}
const normalized = status.toLowerCase() as SubagentStatus
return KNOWN_SUBAGENT_STATUSES.has(normalized) ? normalized : fallback
}
export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: GatewayEvent) => void {
const { rpc } = ctx.gateway
const { STARTUP_RESUME_ID, newSession, recoverSidRef, resumeById, setCatalog } = ctx.session
const { bellOnComplete, stdout, sys } = ctx.system
const { appendMessage, panel, setHistoryItems } = ctx.transcript
const { setInput } = ctx.composer
const { submitRef } = ctx.submission
const { setProcessing: setVoiceProcessing, setRecording: setVoiceRecording, setVoiceEnabled } = ctx.voice
let pendingThinkingStatus = ''
let thinkingStatusTimer: null | ReturnType<typeof setTimeout> = null
let startupPromptSubmitted = false
// Request IDs of clarify prompts we've already flushed to the transcript as
// an abandoned-prompt record, so the tool.complete and message.complete
// paths can't both persist the same prompt twice.
const persistedAbandonedClarify = new Set<string>()
// When a clarify prompt is dismissed without an answer (the backend _block
// timed out and returned an empty string), the live ClarifyPrompt overlay is
// left set until the next turn's idle() silently nulls it — so the question
// and options vanish from the screen while the agent's follow-up still refers
// to them. The reliable signal is the clarify tool's own tool.complete (and,
// as a backstop, message.complete): at those points the overlay is provably
// still set on a timeout, but already cleared by answerClarify() on a real
// answer (so this no-ops there). Flush the question + options into the
// transcript as a persistent system line, then clear the overlay.
const flushAbandonedClarify = () => {
const { clarify } = getOverlayState()
if (!clarify || persistedAbandonedClarify.has(clarify.requestId)) {
return
}
persistedAbandonedClarify.add(clarify.requestId)
appendMessage({
role: 'system',
text: formatAbandonedClarify(clarify.question, clarify.choices, 'timed out')
})
patchOverlayState({ clarify: null })
}
// Inject the disk-save callback into turnController so recordMessageComplete
// can fire-and-forget a persist without having to plumb a gateway ref around.
turnController.persistSpawnTree = async (subagents, sessionId) => {
try {
const startedAt = subagents.reduce<number>((min, s) => {
if (!s.startedAt) {
return min
}
return min === 0 ? s.startedAt : Math.min(min, s.startedAt)
}, 0)
const top = topLevelSubagents(subagents)
.map(s => s.goal)
.filter(Boolean)
.slice(0, 2)
const label = top.length ? top.join(' · ') : `${subagents.length} subagents`
await rpc('spawn_tree.save', {
finished_at: Date.now() / 1000,
label: label.slice(0, 120),
session_id: sessionId ?? 'default',
started_at: startedAt ? startedAt / 1000 : null,
subagents
})
} catch {
// Persistence is best-effort; in-memory history is the authoritative
// same-session source. A write failure doesn't block the turn.
}
}
// Refresh delegation caps at most every 5s so the status bar HUD can
// render a /warning close to the configured cap without spamming the RPC.
let lastDelegationFetchAt = 0
// ── Shared full-config read ──────────────────────────────────────────
//
// Several concerns need `display.*` flags at startup (the /agents nudge
// gate below, the auto-resume check in the `gateway.ready` handler).
// Memoize the `config.get full` RPC so we make exactly one round-trip
// instead of one per concern. Resolves to null on RPC failure; callers
// treat null as "use defaults".
let fullConfigPromise: null | Promise<ConfigFullResponse | null> = null
const getFullConfigOnce = (): Promise<ConfigFullResponse | null> => {
fullConfigPromise ??= rpc<ConfigFullResponse>('config.get', { key: 'full' }).catch(() => null)
return fullConfigPromise
}
// ── Nudge toward /agents on delegation ───────────────────────────────
//
// When `display.tui_agents_nudge` is enabled (default true), the first
// time a turn starts delegating we drop a single transient activity hint
// ("subagents working · /agents to watch live") so the user discovers the
// spawn-tree dashboard instead of staring at a quiet transcript — without
// hijacking the screen by force-opening an overlay. Guards:
// • fires at most once per turn (`agentsNudgedThisTurn`)
// • silent if the overlay is already open (nothing to advertise)
// Reset on `message.start`. The config flag is fetched once, lazily;
// until it resolves we assume the default (on).
let agentsNudgeEnabled = true
let agentsNudgeConfigFetched = false
let agentsNudgedThisTurn = false
const ensureAgentsNudgeConfig = () => {
if (agentsNudgeConfigFetched) {
return
}
agentsNudgeConfigFetched = true
getFullConfigOnce().then(cfg => {
// Only an explicit `false` disables it; absent/unknown keeps default on.
if (cfg?.config?.display?.tui_agents_nudge === false) {
agentsNudgeEnabled = false
}
})
}
const maybeNudgeAgents = () => {
ensureAgentsNudgeConfig()
if (!agentsNudgeEnabled || agentsNudgedThisTurn) {
return
}
// Already watching → no point advertising the dashboard. Don't burn the
// turn's nudge credit here: if the user closes the overlay later in the
// same turn while delegation is still ongoing, a subsequent event should
// still be allowed to nudge. The flag is only set once we actually push.
if (getOverlayState().agents) {
return
}
agentsNudgedThisTurn = true
turnController.pushActivity('subagents working · /agents to watch live', 'info')
}
const resetAgentsNudgeTurnState = () => {
agentsNudgedThisTurn = false
}
// Kick off the config fetch eagerly at handler creation so the flag is
// resolved well before the first delegation of any real session (which
// only happens after gateway.ready + a user turn).
ensureAgentsNudgeConfig()
const refreshDelegationStatus = (force = false) => {
const now = Date.now()
if (!force && now - lastDelegationFetchAt < 5000) {
return
}
lastDelegationFetchAt = now
rpc<DelegationStatusResponse>('delegation.status', {})
.then(r => applyDelegationStatus(r))
.catch(() => {})
}
const setStatus = (status: string) => {
pendingThinkingStatus = ''
if (thinkingStatusTimer) {
clearTimeout(thinkingStatusTimer)
thinkingStatusTimer = null
}
patchUiState({ status })
}
const scheduleThinkingStatus = (status: string) => {
pendingThinkingStatus = status
if (thinkingStatusTimer) {
return
}
thinkingStatusTimer = setTimeout(() => {
thinkingStatusTimer = null
patchUiState({ status: pendingThinkingStatus || statusFromBusy() })
}, STREAM_BATCH_MS)
}
const restoreStatusAfter = (ms: number) => {
turnController.clearStatusTimer()
turnController.statusTimer = setTimeout(() => {
turnController.statusTimer = null
patchUiState({ status: statusFromBusy() })
}, ms)
}
const scheduleStartupPrompt = () => {
if (startupPromptSubmitted || (!STARTUP_QUERY && !STARTUP_IMAGE)) {
return
}
startupPromptSubmitted = true
setTimeout(async () => {
let sid = getUiState().sid
for (let i = 0; !sid && i < 40; i += 1) {
await new Promise(resolve => setTimeout(resolve, 100))
sid = getUiState().sid
}
if (!sid) {
return sys('startup query skipped: no active session')
}
if (STARTUP_IMAGE) {
try {
await rpc('image.attach', { path: STARTUP_IMAGE, session_id: sid })
} catch (e) {
sys(`startup image attach failed: ${rpcErrorMessage(e)}`)
}
}
submitRef.current(STARTUP_QUERY || 'What do you see in this image?')
}, 0)
}
// Terminal statuses are never overwritten by late-arriving live events —
// otherwise a stale `subagent.start` / `spawn_requested` can clobber a
// terminal state from complete (failed/interrupted/timeout/error).
const isTerminalStatus = (s: SubagentProgress['status']) =>
s === 'completed' || s === 'error' || s === 'failed' || s === 'interrupted' || s === 'timeout'
const keepTerminalElseRunning = (s: SubagentProgress['status']) => (isTerminalStatus(s) ? s : 'running')
const handleReady = (skin?: GatewaySkin) => {
if (skin) {
applySkin(skin)
}
rpc<CommandsCatalogResponse>('commands.catalog', {})
.then(r => {
if (!r?.pairs) {
return
}
setCatalog({
canon: (r.canon ?? {}) as Record<string, string>,
categories: r.categories ?? [],
pairs: r.pairs as [string, string][],
skillCount: (r.skill_count ?? 0) as number,
sub: (r.sub ?? {}) as Record<string, string[]>
})
if (r.warning) {
turnController.pushActivity(String(r.warning), 'warn')
}
})
.catch((e: unknown) => turnController.pushActivity(`command catalog unavailable: ${rpcErrorMessage(e)}`, 'info'))
// Crash recovery: a respawn triggered by an unexpected gateway death
// resumes the session that was live, not a brand-new one. One-shot — the
// ref is cleared so an ordinary later restart still forges/resumes per
// config. No startup prompt here (this is mid-session, not a cold boot).
const recoverSid = recoverSidRef?.current
if (recoverSidRef && recoverSid) {
recoverSidRef.current = null
resumeById(recoverSid)
// After resumeById: it synchronously sets status to 'resuming…' on entry,
// so override it here to keep the distinct "recovering" label visible for
// the duration of the resume RPC (which later flips status to 'ready').
patchUiState({ status: 'recovering session…' })
return
}
if (STARTUP_RESUME_ID) {
patchUiState({ status: 'resuming…' })
resumeById(STARTUP_RESUME_ID)
scheduleStartupPrompt()
return
}
// Opt-in: when `display.tui_auto_resume_recent` is true, look up
// the most recent human-facing session and resume it instead of
// forging a brand-new one. Mirrors classic CLI's `hermes -c` /
// `hermes --tui` muscle memory and addresses the audit's "session
// unrecoverable after disconnection" gap. Default off so existing
// users aren't surprised. (Shares the memoized full-config read.)
getFullConfigOnce()
.then(cfg => {
if (!cfg?.config?.display?.tui_auto_resume_recent) {
patchUiState({ status: 'forging session…' })
newSession()
scheduleStartupPrompt()
return
}
return rpc<SessionMostRecentResponse>('session.most_recent', {}).then(r => {
const target = r?.session_id
if (target) {
patchUiState({ status: 'resuming most recent…' })
resumeById(target)
scheduleStartupPrompt()
return
}
patchUiState({ status: 'forging session…' })
newSession()
scheduleStartupPrompt()
})
})
.catch(() => {
patchUiState({ status: 'forging session…' })
newSession()
scheduleStartupPrompt()
})
}
return (ev: GatewayEvent) => {
const sid = getUiState().sid
if (ev.session_id && sid && ev.session_id !== sid && !ev.type.startsWith('gateway.')) {
return
}
switch (ev.type) {
case 'gateway.ready':
handleReady(ev.payload?.skin)
return
case 'skin.changed':
if (ev.payload) {
applySkin(ev.payload)
}
return
case 'session.info': {
const info = ev.payload
patchUiState(state => ({
...state,
info,
status: state.status === 'starting agent…' ? 'ready' : state.status,
usage: info.usage ? { ...state.usage, ...info.usage } : state.usage
}))
setHistoryItems(prev => prev.map(m => (m.kind === 'intro' ? { ...m, info } : m)))
return
}
case 'thinking.delta': {
if (!getUiState().busy) {
return
}
const text = ev.payload?.text
if (text !== undefined) {
const value = String(text)
scheduleThinkingStatus(value || statusFromBusy())
if (value) {
turnController.recordReasoningDelta(value)
}
}
return
}
case 'message.start':
resetAgentsNudgeTurnState()
turnController.startMessage()
return
case 'status.update': {
const p = ev.payload
if (!p?.text) {
return
}
if (p.kind === 'goal') {
sys(p.text)
const brief = p.text.startsWith('✓')
? '✓ goal complete'
: p.text.startsWith('↻')
? '↻ goal continuing'
: p.text.startsWith('⏸')
? '⏸ goal paused'
: 'ready'
setStatus(brief)
restoreStatusAfter(6000)
return
}
setStatus(p.text)
if (p.kind === 'compressing') {
sys(p.text)
return
}
if (!p.kind || p.kind === 'status') {
return
}
if (turnController.lastStatusNote !== p.text) {
turnController.lastStatusNote = p.text
turnController.pushActivity(
p.text,
p.kind === 'error' ? 'error' : p.kind === 'warn' || p.kind === 'approval' ? 'warn' : 'info'
)
}
restoreStatusAfter(4000)
return
}
case 'notification.show': {
// Credits/usage notice from the gateway. Payload is snake_case on the
// wire and stays snake_case in UiState.notice (no mapping layer). The
// text already carries its own glyph; turnController decides whether to
// show now or hold until turn end (FaceTicker wins while busy).
const p = ev.payload
if (!p?.text) {
return
}
turnController.showNotice({
id: p.id,
key: p.key,
kind: p.kind ?? 'sticky',
level: p.level ?? 'info',
text: p.text,
ttl_ms: p.ttl_ms ?? null
})
return
}
case 'notification.clear':
// Key-matched clear only — a stale/late clear must not wipe a newer
// notice (turnController guards the key match).
turnController.clearNotice(ev.payload?.key)
return
case 'gateway.stderr': {
const line = String(ev.payload.line).slice(0, 120)
turnController.pushActivity(line, 'info')
return
}
case 'browser.progress': {
const message = String(ev.payload?.message ?? '').trim()
if (message) {
sys(message)
}
return
}
case 'voice.status': {
// Continuous VAD loop reports its internal state so the status bar
// can show listening / transcribing / idle without polling.
const state = String(ev.payload?.state ?? '')
if (state === 'listening') {
setVoiceRecording(true)
setVoiceProcessing(false)
} else if (state === 'transcribing') {
setVoiceRecording(false)
setVoiceProcessing(true)
} else {
setVoiceRecording(false)
setVoiceProcessing(false)
}
return
}
case 'voice.transcript': {
// CLI parity: the 3-strikes silence detector flipped off automatically.
// Mirror that on the UI side and tell the user why the mode is off.
if (ev.payload?.no_speech_limit) {
setVoiceEnabled(false)
setVoiceRecording(false)
setVoiceProcessing(false)
sys('voice: no speech detected 3 times, continuous mode stopped')
return
}
const text = String(ev.payload?.text ?? '').trim()
if (!text) {
return
}
// CLI parity: _pending_input.put(transcript) unconditionally feeds
// the transcript to the agent as its next turn — draft handling
// doesn't apply because voice-mode users are speaking, not typing.
//
// We can't branch on composer input from inside a setInput updater
// (React strict mode double-invokes it, duplicating the submit).
// Just clear + defer submit so the cleared input is committed before
// submit reads it.
setInput('')
setTimeout(() => submitRef.current(text), 0)
return
}
case 'gateway.start_timeout': {
const { cwd, python, stderr_tail: stderrTail } = ev.payload ?? {}
const trace = python || cwd ? ` · ${String(python || '')} ${String(cwd || '')}`.trim() : ''
setStatus('gateway startup timeout')
turnController.pushActivity(`gateway startup timed out${trace} · /logs to inspect`, 'error')
// Surface the most useful stderr lines inline so users can tell
// "wrong python", "missing dep", and "config parse failure"
// apart without leaving the TUI. Filter blank rows BEFORE
// taking the last N so trailing empty lines in the buffer
// don't crowd out actual content; truncate to match the
// 120-char clip used for `gateway.stderr` activity entries.
const STDERR_LINE_CAP = 120
const STDERR_LINES_MAX = 8
const tailLines = (stderrTail ?? '')
.split('\n')
.map(l => l.trim())
.filter(Boolean)
.slice(-STDERR_LINES_MAX)
for (const line of tailLines) {
turnController.pushActivity(line.slice(0, STDERR_LINE_CAP), 'error')
}
return
}
case 'gateway.protocol_error':
setStatus('protocol warning')
restoreStatusAfter(4000)
if (!turnController.protocolWarned) {
turnController.protocolWarned = true
turnController.pushActivity('protocol noise detected · /logs to inspect', 'info')
}
if (ev.payload?.preview) {
turnController.pushActivity(`protocol noise: ${String(ev.payload.preview).slice(0, 120)}`, 'info')
}
return
case 'reasoning.delta':
if (ev.payload?.text) {
turnController.recordReasoningDelta(ev.payload.text, Boolean(ev.payload.verbose))
}
return
case 'reasoning.available':
turnController.recordReasoningAvailable(String(ev.payload?.text ?? ''), Boolean(ev.payload?.verbose))
return
case 'tool.progress':
if (ev.payload?.preview && ev.payload.name) {
turnController.recordToolProgress(ev.payload.name, ev.payload.preview)
}
return
case 'tool.generating':
if (ev.payload?.name) {
turnController.pushTrail(`drafting ${ev.payload.name}`)
}
return
case 'tool.start':
turnController.recordTodos(ev.payload.todos)
turnController.recordToolStart(
ev.payload.tool_id,
ev.payload.name ?? 'tool',
ev.payload.context ?? '',
ev.payload.args_text ? stripAnsi(String(ev.payload.args_text)) : undefined
)
return
case 'tool.complete': {
// The clarify tool finishing with its overlay still live means it was
// abandoned (backend _block timed out, empty answer). A real answer
// clears the overlay in answerClarify() before this fires, so this
// no-ops there. Persist the question + options so they don't vanish.
if (ev.payload.name === 'clarify') {
flushAbandonedClarify()
}
const inlineDiffText =
ev.payload.inline_diff && getUiState().inlineDiffs ? stripAnsi(String(ev.payload.inline_diff)).trim() : ''
const resultText = ev.payload.result_text ? stripAnsi(String(ev.payload.result_text)) : undefined
if (inlineDiffText) {
turnController.recordInlineDiffToolComplete(
inlineDiffText,
ev.payload.tool_id,
ev.payload.name,
ev.payload.error,
ev.payload.duration_s,
resultText
)
} else {
turnController.recordToolComplete(
ev.payload.tool_id,
ev.payload.name,
ev.payload.error,
ev.payload.summary,
ev.payload.duration_s,
ev.payload.todos,
resultText
)
}
return
}
case 'clarify.request':
patchOverlayState({
clarify: { choices: ev.payload.choices, question: ev.payload.question, requestId: ev.payload.request_id }
})
setStatus('waiting for input…')
return
case 'approval.request': {
const description = String(ev.payload.description ?? 'dangerous command')
patchOverlayState({ approval: { command: String(ev.payload.command ?? ''), description } })
setStatus('approval needed')
return
}
case 'sudo.request':
patchOverlayState({ sudo: { requestId: ev.payload.request_id } })
setStatus('sudo password needed')
return
case 'secret.request':
patchOverlayState({
secret: { envVar: ev.payload.env_var, prompt: ev.payload.prompt, requestId: ev.payload.request_id }
})
setStatus('secret input needed')
return
case 'background.complete':
dropBgTask(ev.payload.task_id)
sys(`[bg ${ev.payload.task_id}] ${ev.payload.text}`)
return
case 'review.summary': {
// Self-improvement background review emitted a persistent summary
// of what it saved to memory/skills. Surface it as a system line
// in the transcript so it never gets lost to a transient status
// flash. Python-side already formats it as "💾 Self-improvement
// review: …".
const text = String(ev.payload?.text ?? '').trim()
if (text) {
sys(text)
}
return
}
case 'subagent.spawn_requested':
// Child built but not yet running (waiting on ThreadPoolExecutor slot).
// Preserve completed state if a later event races in before this one.
turnController.upsertSubagent(ev.payload, c => (isTerminalStatus(c.status) ? {} : { status: 'queued' }))
// First sign of delegation this turn → nudge toward /agents.
maybeNudgeAgents()
// Prime the status-bar HUD: fetch caps (once every 5s) so we can
// warn as depth/concurrency approaches the configured ceiling.
if (getDelegationState().maxSpawnDepth === null) {
refreshDelegationStatus(true)
} else {
refreshDelegationStatus()
}
return
case 'subagent.start':
turnController.upsertSubagent(ev.payload, c => (isTerminalStatus(c.status) ? {} : { status: 'running' }))
// `subagent.start` is the first delegation event the TUI reliably
// receives (the delegate callback drops `spawn_requested` in the
// CLI→gateway path), so nudge here too. Once-per-turn guarded, so
// hooking both events is safe.
maybeNudgeAgents()
return
case 'subagent.thinking': {
const text = String(ev.payload.text ?? '').trim()
if (!text) {
return
}
// Update-only: never resurrect subagents whose spawn_requested/start
// we missed or that already flushed via message.complete.
turnController.upsertSubagent(
ev.payload,
c => ({
status: keepTerminalElseRunning(c.status),
thinking: pushThinking(c.thinking, text)
}),
{ createIfMissing: false }
)
return
}
case 'subagent.tool': {
const line = formatToolCall(
ev.payload.tool_name ?? 'delegate_task',
ev.payload.tool_preview ?? ev.payload.text ?? ''
)
turnController.upsertSubagent(
ev.payload,
c => ({
status: keepTerminalElseRunning(c.status),
tools: pushTool(c.tools, line)
}),
{ createIfMissing: false }
)
return
}
case 'subagent.progress': {
const text = String(ev.payload.text ?? '').trim()
if (!text) {
return
}
turnController.upsertSubagent(
ev.payload,
c => ({
notes: pushNote(c.notes, text),
status: keepTerminalElseRunning(c.status)
}),
{ createIfMissing: false }
)
return
}
case 'subagent.complete':
turnController.upsertSubagent(
ev.payload,
c => ({
durationSeconds: ev.payload.duration_seconds ?? c.durationSeconds,
status: normalizeSubagentStatus(ev.payload.status, 'completed'),
summary: ev.payload.summary || ev.payload.text || c.summary
}),
{ createIfMissing: false }
)
return
case 'message.delta':
turnController.recordMessageDelta(ev.payload ?? {})
return
case 'message.complete': {
const { finalMessages, finalText, wasInterrupted } = turnController.recordMessageComplete(ev.payload ?? {})
if (!wasInterrupted) {
const msgs: Msg[] = finalMessages.length ? finalMessages : [{ role: 'assistant', text: finalText }]
msgs.forEach(appendMessage)
if (bellOnComplete && stdout?.isTTY) {
stdout.write('\x07')
}
}
setStatus('ready')
if (ev.payload?.usage) {
patchUiState(state => ({ ...state, usage: { ...state.usage, ...ev.payload!.usage } }))
}
return
}
case 'error':
turnController.recordError()
{
const message = String(ev.payload?.message || 'unknown error')
turnController.pushActivity(message, 'error')
if (NO_PROVIDER_RE.test(message)) {
panel(SETUP_REQUIRED_TITLE, buildSetupRequiredSections())
setStatus('setup required')
return
}
sys(`error: ${message}`)
setStatus('ready')
}
}
}
}