hermes-agent/tests/tui_gateway/test_protocol.py
brooklyn! d62979a6f3
feat(desktop): composer status stack, live subagent windows, editable prompts (#44630)
* feat(desktop): session-scoped status stack + kill new-window theme flash

Stack subagents, background tasks, and the queue into one collapsible
"sink" above the composer, reusing the queue's chrome so every status
reads as one piece. Extracts shared StatusSection / StatusRow /
TerminalOutput primitives and a unified $statusItemsBySession store
(subagents mirrored, background owned here, merged + grouped for render).
Renames BrailleSpinner → GlyphSpinner now that it drives more than braille.

Separately, fix the white flash on every new/cmd-clicked window: macOS
`vibrancy` paints an NSVisualEffectView that follows the OS appearance and
ignores `backgroundColor`, so a dark app on a light-mode Mac flashed white
until the renderer painted over it. Pin `nativeTheme.themeSource` to the
app theme (persisted to userData so cold launches paint right before the
renderer loads), hold windows with `show:false` until `ready-to-show`, and
pre-paint the themed background via an inline script before the bundle runs.

* feat(desktop): dock the slash popover to the composer via one shared fill var

The slash·@ popover (and ? help) now docks onto the composer's edge with the
same chrome as the queue/status stack — rounded outer corners, fused borderless
edge, no shadow — but keeps its own narrow width.

Surface + drawer paint a single --composer-fill var; the state ladder
(rest / scrolled / focused / drawer-open) lives once in styles.css on
[data-slot='composer-root']. The :has() drawer-open rule is last and forces an
opaque fill, since translucent glass sampling different backdrops (thread vs
fade gradient) can never match. This replaces the focus-within !important
override that repainted the surface behind every previous matching attempt.

Also drop the chevron column from the project file tree — the folder open/closed
icon already carries the expand state.

* feat(desktop): base inset for file tree rows (post-chevron alignment)

* feat(desktop): wire the status stack's background tasks to the real process registry

The background group was UI-only (dev-mock seeded). Now it's live e2e:

- tui_gateway: new session-scoped `process.list` (registry snapshot filtered
  by the session's session_key, plus a 4KB output tail for the inline
  terminal viewer) and `process.kill` (single process, ownership-checked —
  unlike process.stop's kill_all).
- Renderer: `reconcileBackgroundProcesses` syncs snapshots into the store
  layout-stably — rows keep their position when state flips (never re-sort),
  new processes append, unchanged rows keep object identity so memoised rows
  skip re-rendering, and a dismissed-set stops the registry's retained
  finished procs from resurrecting X-ed rows.
- Refresh triggers: session open, terminal/process tool.complete,
  status.update(kind=process) from the gateway's notification poller, and a
  5s poll armed only while a running row is visible (catches silent exits).
- Stop = real `process.kill` + optimistic dismiss; Dismiss = client-side
  with resurrection guard.
- Re-keyed the stack to the RUNTIME session id: it was keyed by the stored
  session id, where neither subagent events nor process.list would ever land.
- Deleted dev-status-mocks.ts (__hermesStatusMocks) — no more seed shit.

Reconcile invariants covered in store/composer-status.test.ts.

* feat(desktop): todos + openable subagents in the status stack, self-healing file tree

- todo lists move out of the inline chat panel into the composer status stack
  (checklist icon, dashed ring = pending, spinner = in progress, check = done),
  fed live from todo tool events and seeded from history on session open
- subagent rows carry the child's real session id end-to-end
  (delegate_tool → gateway → renderer) so clicking one opens ITS session window
- status stack publishes its measured height so the thread's bottom clearance
  grows with it; card paints the shared --composer-fill so focused/scrolled
  states match the composer exactly
- file tree self-heals: ENOENT roots retry on a 3s cadence + Try again button,
  and the main process expands ~ in IPC paths (gateway cwds arrive as ~/...)
- composer drag-drop of tree entries inserts inline refs instead of attachments

* fix(desktop): file tree falls back to the workspace dir when a session's cwd is gone

Sessions record their launch cwd; deleted worktrees leave that path dead,
so opening such a session swapped the tree from the default workspace to a
directory that ENOENTs forever — the 3s retry just spun on it. On a root
read error the tree now asks main to sanitize the cwd (prefers the
configured default project dir), displays that fallback, and quietly
re-probes the original path so it switches back if the dir reappears.

* feat(desktop): working restore-checkpoint button on past user prompts

The discard icon on hover of a past user bubble was decorative — clicking
did nothing. It's now a real control: a confirmation dialog explains that
everything after the prompt is removed, then the session rewinds to that
turn and reruns the same prompt (prompt.submit with
truncate_before_user_ordinal, the same mechanism the edit composer uses).
Failures rethrow into the dialog's inline error instead of toasting.

* fix(desktop): show the restore-checkpoint button on the latest user prompt too

Restoring the most recent prompt is just 'retry this turn' — no reason to
exclude it. Stop still takes the slot while the turn is running.

* fix(desktop): finished todo lists clear themselves out of the status stack

A list whose every item is completed/cancelled lingers ~4s so the final
checkmark is visible, then the todo group drops out of the stack. A fresh
active list arriving within the linger cancels the scheduled clear.

* chore(desktop): drop dead editableCheckpoint copy, terser restore confirm

* fix(desktop): rewind clears the abandoned timeline's todos + background

Restoring to (or editing) an earlier prompt rewinds the conversation, but
the todos and background processes spawned by the now-discarded turns kept
showing in the status stack — and the real background processes kept
running. Both rewind paths now clear the session's todo rows and kill +
drop its background processes before the fresh run repopulates them. Also
drops the click-to-edit clamp transition, which flashed a half-expanded
bubble on the way into the edit composer.

* feat(desktop): user messages are always editable; edit/restore revert mid-stream

The bubble is now always click-to-edit — even while a turn streams — instead
of going inert during a run. Sending an edit acts like restore: it rewinds to
that prompt and re-runs with the new text. Both edit and restore can fire
mid-stream now; the gateway refuses prompt.submit while a turn runs (4009
"session busy"), so they interrupt the live turn first and retry the submit
until the cooperative interrupt winds it down. Restore (re-run as-is) shows on
every prompt except the latest running one, which keeps the Stop button.

* fix(desktop): label preview-pane ⌘L selections with the filename, not "zsh"

The terminal owns a global ⌘/Ctrl+L "send selection to composer" shortcut, so
selecting text in the file preview pane and hitting it fell through to the
terminal handler — which imported the right text but labelled the composer ref
"zsh:N lines" off the shell name. When the selection isn't an xterm selection,
label it with the previewed file instead.

* fix(desktop): ⌘L on a preview line selection inserts the @line ref, like dragging

The source preview lets you select lines in the gutter and drag them into the
composer as an @line:path:start-end ref. ⌘/Ctrl+L now does the same when a line
selection is active — it drops the identical ref instead of falling through to
the terminal's global handler (which grabbed the native text selection and sent
a bogus terminal block). Capture-phase + stopPropagation so it wins; with a line
selection there's no native selection, so the terminal handler stays out of it.

* chore: gitignore apps/desktop/demo/ scratch output

The desktop demo prompt writes demo/*.txt during recorded walkthroughs; it's
throwaway, never part of the app. Ignore it so it stops cluttering git status.

* feat(desktop): subagent watch windows, hard stop, sidebar hygiene

Child-session mirror for live subagent windows, delegate sessions tagged
and excluded from the sidebar, composer focus/stop polish, and WS stall
resilience on the gateway transport.

* refactor: DRY delegate SQL + trim status-stack noise

Extract shared listable-child and delegate-delete helpers in hermes_state,
collapse cancelRun busy release, and cut comment bloat in resume/status paths.

* fix(desktop): hide orphaned subagent sessions in sidebar

Cascade-delete all ephemeral children on parent delete (not just tagged rows),
run v16 backfill to tag legacy orphans, and record new delegates as source=subagent.

* fix: restore orphan contract for untagged children + lazy session eviction

Cascade-delete only _delegate_from-tagged rows (v16 backfill covers legacy),
walk marker chains recursively with FK-safe orphaning, gate lazy watch
sessions out of the still-starting eviction exemption via an explicit flag,
pass session_id to _make_agent only when resuming, and hide source=subagent
from session search.

* fix(gateway): gate child mirror off upgraded sessions + age out stale run entries

Review findings: the mirror could interleave synthetic events with a real
native stream once a watch window upgrades (prompt.submit builds an agent),
and a lost subagent.complete left _active_child_runs pinning running=true
forever. Mirror now stops when the live session owns an agent; liveness
reads ignore entries older than an hour.

* fix(gateway): reject prompt.submit into a watch session while its child runs

A lazy watch session's running flag is False (the run lives in the parent
turn), so typing mid-run sailed past the busy guard and built a second agent
racing the in-flight child on the same stored session. Busy error until the
run completes; afterwards the submit upgrades into a normal conversation.

* refactor(gateway): DRY watch-resume payload + compose listable-child SQL

Fold the duplicated child-run busy overlay into one _reuse_live_payload
helper across both resume reuse paths, collapse the twin mirror early-returns,
and build _LISTABLE_CHILD_SQL from _BRANCH_CHILD_SQL instead of restating it.

* fix(desktop): clip horizontal overflow on sidebar scroll areas

Add overflow-x-hidden alongside overflow-y-auto on session list scrollers
and the shared SidebarContent primitive — vertical scroll unchanged.
2026-06-12 08:30:06 -05:00

1337 lines
45 KiB
Python

"""Tests for tui_gateway JSON-RPC protocol plumbing."""
import io
import json
import sys
import threading
import time
import types
from unittest.mock import MagicMock, patch
import pytest
_original_stdout = sys.stdout
@pytest.fixture(autouse=True)
def _restore_stdout():
yield
sys.stdout = _original_stdout
@pytest.fixture()
def server():
with patch.dict("sys.modules", {
"hermes_constants": MagicMock(get_hermes_home=MagicMock(return_value="/tmp/hermes_test")),
"hermes_cli.env_loader": MagicMock(),
"hermes_cli.banner": MagicMock(),
"hermes_state": MagicMock(),
}):
import importlib
mod = importlib.import_module("tui_gateway.server")
yield mod
# Reset module-level session state without re-importing. importlib.reload
# would re-register the module's atexit hooks (ThreadPoolExecutor
# shutdown, _shutdown_sessions); the duplicates race the stderr
# buffer at interpreter shutdown and surface as Fatal Python error:
# _enter_buffered_busy. Clearing the per-session dicts gives the
# next test a clean slate; _methods is NOT cleared because it's
# populated at module import time and re-registration only happens
# via reload (which we don't do).
mod._sessions.clear()
mod._pending.clear()
mod._answers.clear()
@pytest.fixture()
def capture(server):
"""Redirect server's real stdout to a StringIO and return (server, buf)."""
buf = io.StringIO()
server._real_stdout = buf
return server, buf
# ── JSON-RPC envelope ────────────────────────────────────────────────
def test_unknown_method(server):
resp = server.handle_request({"id": "1", "method": "bogus"})
assert resp["error"]["code"] == -32601
def test_ok_envelope(server):
assert server._ok("r1", {"x": 1}) == {
"jsonrpc": "2.0", "id": "r1", "result": {"x": 1},
}
def test_err_envelope(server):
assert server._err("r2", 4001, "nope") == {
"jsonrpc": "2.0", "id": "r2", "error": {"code": 4001, "message": "nope"},
}
# ── write_json ───────────────────────────────────────────────────────
def test_write_json(capture):
server, buf = capture
assert server.write_json({"test": True})
assert json.loads(buf.getvalue()) == {"test": True}
def test_write_json_broken_pipe(server):
class _Broken:
def write(self, _): raise BrokenPipeError
def flush(self): raise BrokenPipeError
server._real_stdout = _Broken()
assert server.write_json({"x": 1}) is False
def test_write_json_closed_stream_returns_false(server):
"""ValueError ('I/O on closed file') used to bubble up; treat as gone."""
class _Closed:
def write(self, _): raise ValueError("I/O operation on closed file")
def flush(self): raise ValueError("I/O operation on closed file")
server._real_stdout = _Closed()
assert server.write_json({"x": 1}) is False
def test_write_json_unicode_encode_error_re_raises(server):
"""A non-UTF-8 stdout encoding raises UnicodeEncodeError (a ValueError
subclass). It must NOT be swallowed as 'peer gone' — that would let
`entry.py` exit cleanly via the False path and hide the real config
bug. We re-raise so the existing crash-log infrastructure records it."""
class _AsciiOnly:
def write(self, line):
line.encode("ascii") # raises UnicodeEncodeError on non-ascii
def flush(self): pass
server._real_stdout = _AsciiOnly()
with pytest.raises(UnicodeEncodeError):
server.write_json({"msg": "héllo"})
def test_write_json_unrelated_value_error_re_raises(server):
"""Only ValueError('...closed file...') means peer gone. Other
ValueErrors are programming errors and must surface."""
class _BadValue:
def write(self, _): raise ValueError("something else entirely")
def flush(self): pass
server._real_stdout = _BadValue()
with pytest.raises(ValueError, match="something else entirely"):
server.write_json({"x": 1})
def test_write_json_non_serializable_payload_re_raises(server):
"""Non-JSON-safe payloads are programming errors — they must NOT be
silently dropped via the False path (which would trigger a clean exit
in entry.py and mask the real bug)."""
import io
server._real_stdout = io.StringIO()
with pytest.raises(TypeError):
server.write_json({"obj": object()})
def test_write_json_peer_gone_oserror_on_flush_returns_false(server):
"""A flush that raises a peer-gone OSError (EPIPE) must not strand
the lock or crash; it returns False so the dispatcher exits cleanly."""
import errno
written = []
class _FlushPeerGone:
def write(self, line): written.append(line)
def flush(self): raise OSError(errno.EPIPE, "broken pipe")
server._real_stdout = _FlushPeerGone()
assert server.write_json({"x": 1}) is False
assert written and json.loads(written[0]) == {"x": 1}
def test_write_json_non_peer_gone_oserror_re_raises(server):
"""Host I/O failures (ENOSPC, EACCES, EIO …) are NOT peer-gone — they
must re-raise so the crash log records them instead of looking like
a clean disconnect via the False path."""
import errno
class _DiskFull:
def write(self, _): raise OSError(errno.ENOSPC, "no space left")
def flush(self): pass
server._real_stdout = _DiskFull()
with pytest.raises(OSError, match="no space"):
server.write_json({"x": 1})
def test_write_json_skips_flush_when_disable_flush_true(monkeypatch):
"""`StdioTransport` skips flush when `_DISABLE_FLUSH` is true.
Tests the runtime *behaviour* via direct module-attr patch. The env
var → module constant wiring is covered by the dedicated env test
below; reloading server.py here would re-register atexit hooks and
recreate the worker pool.
"""
import importlib
transport_mod = importlib.import_module("tui_gateway.transport")
monkeypatch.setattr(transport_mod, "_DISABLE_FLUSH", True)
flushed = {"count": 0}
written = []
class _Stream:
def write(self, line): written.append(line)
def flush(self): flushed["count"] += 1
stream = _Stream()
transport = transport_mod.StdioTransport(lambda: stream, threading.Lock())
assert transport.write({"x": 1}) is True
assert flushed["count"] == 0
def test_disable_flush_env_var_actually_wires_to_module_constant(monkeypatch):
"""End-to-end: setting `HERMES_TUI_GATEWAY_NO_FLUSH=1` and importing
`tui_gateway.transport` fresh actually flips `_DISABLE_FLUSH` true.
Reloads only the transport module — server.py is untouched so its
atexit hooks/worker pool stay intact."""
import importlib
monkeypatch.setenv("HERMES_TUI_GATEWAY_NO_FLUSH", "1")
transport_mod = importlib.reload(importlib.import_module("tui_gateway.transport"))
try:
assert transport_mod._DISABLE_FLUSH is True
finally:
# Restore the env-disabled state so other tests see the default.
monkeypatch.delenv("HERMES_TUI_GATEWAY_NO_FLUSH", raising=False)
importlib.reload(transport_mod)
# ── _emit ────────────────────────────────────────────────────────────
def test_emit_with_payload(capture):
server, buf = capture
server._emit("test.event", "s1", {"key": "val"})
msg = json.loads(buf.getvalue())
assert msg["method"] == "event"
assert msg["params"]["type"] == "test.event"
assert msg["params"]["session_id"] == "s1"
assert msg["params"]["payload"]["key"] == "val"
def test_emit_without_payload(capture):
server, buf = capture
server._emit("ping", "s2")
assert "payload" not in json.loads(buf.getvalue())["params"]
# ── Blocking prompt round-trip ───────────────────────────────────────
def test_block_and_respond(capture):
server, _ = capture
result = [None]
threading.Thread(
target=lambda: result.__setitem__(0, server._block("test.prompt", "s1", {"q": "?"}, timeout=5)),
).start()
for _ in range(100):
if server._pending:
break
threading.Event().wait(0.01)
rid = next(iter(server._pending))
server._answers[rid] = "my_answer"
# _pending values are (sid, Event) tuples — unpack to set the Event
_, ev = server._pending[rid]
ev.set()
threading.Event().wait(0.1)
assert result[0] == "my_answer"
def test_clear_pending(server):
ev = threading.Event()
# _pending values are (sid, Event) tuples
server._pending["r1"] = ("sid-x", ev)
server._clear_pending()
assert ev.is_set()
assert server._answers["r1"] == ""
# ── Session lookup ───────────────────────────────────────────────────
def test_sess_missing(server):
_, err = server._sess({"session_id": "nope"}, "r1")
assert err["error"]["code"] == 4001
def test_sess_found(server):
server._sessions["abc"] = {"agent": MagicMock()}
s, err = server._sess({"session_id": "abc"}, "r1")
assert s is not None
assert err is None
# ── session.resume payload ────────────────────────────────────────────
def test_session_resume_returns_hydrated_messages(server, monkeypatch):
class _DB:
def get_session(self, _sid):
return {"id": "20260409_010101_abc123"}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "yo", "reasoning": "thoughts"},
{"role": "tool", "content": "searched"},
{"role": "assistant", "content": " "},
{"role": "assistant", "content": None},
{"role": "narrator", "content": "skip"},
]
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_make_agent", lambda sid, key, session_id=None, session_db=None: object())
monkeypatch.setattr(server, "_init_session", lambda sid, key, agent, history, cols=80: None)
monkeypatch.setattr(server, "_session_info", lambda _agent, _session=None: {"model": "test/model"})
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": "20260409_010101_abc123", "cols": 100},
}
)
assert "error" not in resp
assert resp["result"]["message_count"] == 3
assert resp["result"]["messages"] == [
{"role": "user", "text": "hello"},
{"role": "assistant", "text": "yo", "reasoning": "thoughts"},
{"role": "tool", "name": "tool", "context": ""},
]
def test_session_resume_handles_multimodal_list_content(server, monkeypatch):
"""A user message persisted with list-shaped multimodal content used to
crash session resume with ``'list' object has no attribute 'strip'``."""
multimodal_user = {
"role": "user",
"content": [
{"type": "text", "text": "describe this"},
{
"type": "image_url",
"image_url": {"url": "data:image/png;base64,AAAA"},
},
],
}
text_only_assistant = {"role": "assistant", "content": "ok"}
class _DB:
def get_session(self, _sid):
return {"id": "20260502_000000_listcontent"}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [multimodal_user, text_only_assistant]
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_make_agent", lambda sid, key, session_id=None, session_db=None: object())
monkeypatch.setattr(server, "_init_session", lambda sid, key, agent, history, cols=80: None)
monkeypatch.setattr(server, "_session_info", lambda _agent, _session=None: {"model": "test/model"})
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": "20260502_000000_listcontent", "cols": 100},
}
)
assert "error" not in resp
assert resp["result"]["message_count"] == 2
# The image_url part is preserved as a raw data URL inside the text so
# the desktop renderer (which extracts embedded images) sees the same
# content the optimistic local cache returns. Otherwise the inline
# image flashes during initial cache hydration and then vanishes when
# the resume payload overwrites it with cleaned text.
assert resp["result"]["messages"] == [
{
"role": "user",
"text": "describe this\ndata:image/png;base64,AAAA",
},
{"role": "assistant", "text": "ok"},
]
def test_session_resume_lazy_registers_watch_session_without_agent(server, monkeypatch):
"""``lazy: true`` (subagent watch windows) must register the live session
— keyed for the child mirror, on this transport — WITHOUT building an
agent. The eager build is what made opening a subagent window contend
with the already-running parent turn."""
target = "20260612_000000_child99"
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [
{"role": "user", "content": "delegated goal"},
]
def _boom(*_args, **_kwargs):
raise AssertionError("lazy resume must not build an agent")
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_make_agent", _boom)
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": target, "cols": 100, "lazy": True},
}
)
assert "error" not in resp
result = resp["result"]
assert result["resumed"] == target
assert result["session_key"] == target
assert result["info"]["lazy"] is True
assert result["info"]["desktop_contract"] == server.DESKTOP_BACKEND_CONTRACT
assert result["messages"] == [{"role": "user", "text": "delegated goal"}]
sid = result["session_id"]
session = server._sessions[sid]
assert session["agent"] is None
# The child mirror finds the watch window by stored key.
assert server._find_live_session_by_key(target) == (sid, session)
# A later prompt.submit upgrade must continue THIS stored conversation.
assert session["resume_session_id"] == target
# No build started: the idle reaper must still be able to evict it, and
# the live status must not report a never-ending "starting".
assert not session["agent_ready"].is_set()
assert server._session_live_status(sid, session) != "starting"
session["transport"] = server._detached_ws_transport
far_future = time.time() + 999999
assert server._session_is_evictable(sid, session, far_future)
# Resuming again (window refresh) reuses the same live session.
resp2 = server.handle_request(
{
"id": "r2",
"method": "session.resume",
"params": {"session_id": target, "cols": 100, "lazy": True},
}
)
assert "error" not in resp2
assert resp2["result"]["session_id"] == sid
assert len(server._sessions) == 1
def test_session_resume_lazy_reports_running_for_inflight_child(server, monkeypatch):
"""A watch window attaching to a child mid-delegation must learn the run is
live from the resume response itself — the child can sit silent inside a
long tool call, so waiting for the next stream event leaves the window
looking dead."""
target = "20260612_000000_child42"
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [{"role": "user", "content": "delegated goal"}]
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(
server, "_make_agent", lambda *a, **k: (_ for _ in ()).throw(AssertionError("no build"))
)
server._active_child_runs[target] = time.time()
try:
resp = server.handle_request(
{
"id": "r1",
"method": "session.resume",
"params": {"session_id": target, "cols": 100, "lazy": True},
}
)
finally:
server._active_child_runs.pop(target, None)
assert "error" not in resp
assert resp["result"]["running"] is True
assert resp["result"]["status"] == "streaming"
def test_session_resume_reuses_existing_live_session(server, monkeypatch):
"""Repeated resume must not allocate duplicate live agents."""
target = "20260409_010101_abc123"
created_sids: list[str] = []
closed_sids: list[str] = []
first_agent_started = threading.Event()
agent_can_finish = threading.Event()
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "yo"},
]
class _Worker:
def close(self):
pass
class _Agent:
def __init__(self, sid, session_id):
self.sid = sid
self.model = "test/model"
self.session_id = session_id
def close(self):
closed_sids.append(self.sid)
def make_agent(sid, key, session_id=None, session_db=None):
created_sids.append(sid)
first_agent_started.set()
assert agent_can_finish.wait(timeout=1)
return _Agent(sid, session_id or key)
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_make_agent", make_agent)
monkeypatch.setattr(server, "_SlashWorker", lambda _key, _model: _Worker())
monkeypatch.setattr(
server,
"_start_notification_poller",
lambda _sid, _session: threading.Event(),
)
monkeypatch.setattr(server, "_notify_session_boundary", lambda *_args, **_kwargs: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *_args, **_kwargs: None)
monkeypatch.setattr(
server,
"_session_info",
lambda _agent, _session=None: {"model": "test/model"},
)
fake_approval = types.SimpleNamespace(
load_permanent_allowlist=lambda: None,
register_gateway_notify=lambda *_args, **_kwargs: None,
)
with patch.dict(sys.modules, {"tools.approval": fake_approval}):
first_holder = {}
def resume_first():
first_holder["resp"] = server.handle_request(
{
"id": "first",
"method": "session.resume",
"params": {"session_id": target, "cols": 100},
}
)
first_thread = threading.Thread(target=resume_first)
first_thread.start()
assert first_agent_started.wait(timeout=1)
second_holder = {}
def resume_second():
second_holder["resp"] = server.handle_request(
{
"id": "second",
"method": "session.resume",
"params": {"session_id": target, "cols": 120},
}
)
second_thread = threading.Thread(target=resume_second)
second_thread.start()
agent_can_finish.set()
first_thread.join(timeout=1)
second_thread.join(timeout=1)
assert not first_thread.is_alive()
assert not second_thread.is_alive()
first = first_holder["resp"]
second = second_holder["resp"]
assert "error" not in first
assert "error" not in second
# Both resumes resolve to the SAME single live session — the core invariant.
assert second["result"]["session_id"] == first["result"]["session_id"]
assert len(server._sessions) == 1
assert [s.get("session_key") for s in server._sessions.values()].count(target) == 1
winner = first["result"]["session_id"]
# The agent build happens outside the resume lock, so a racing resume may
# build a redundant agent; double-checked locking keeps only one live
# session and closes any loser's agent (no worker/poller is wired for it).
assert winner in created_sids
survivors = [sid for sid in created_sids if sid not in closed_sids]
assert survivors == [winner]
assert all(sid == winner for sid in server._sessions)
def test_session_resume_live_payload_uses_current_history_with_ancestors(server, monkeypatch):
"""Live resume should not reuse a stale ancestor-inclusive snapshot."""
target = "20260409_010101_child"
ancestor_history = [{"role": "user", "content": "ancestor"}]
current_history = [
{"role": "user", "content": "current"},
{"role": "assistant", "content": "current reply"},
]
class _DB:
def get_session(self, _sid):
return {"id": target}
def get_session_by_title(self, _title):
return None
def reopen_session(self, _sid):
return None
def get_messages_as_conversation(self, _sid, include_ancestors=False):
if include_ancestors:
return ancestor_history + current_history
return list(current_history)
class _Worker:
def close(self):
pass
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(
server,
"_make_agent",
lambda _sid, key, session_id=None, session_db=None: types.SimpleNamespace(
model="test/model", session_id=session_id or key
),
)
monkeypatch.setattr(server, "_SlashWorker", lambda _key, _model: _Worker())
monkeypatch.setattr(
server,
"_start_notification_poller",
lambda _sid, _session: threading.Event(),
)
monkeypatch.setattr(server, "_notify_session_boundary", lambda *_args, **_kwargs: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *_args, **_kwargs: None)
monkeypatch.setattr(
server,
"_session_info",
lambda _agent, _session=None: {"model": "test/model"},
)
fake_approval = types.SimpleNamespace(
load_permanent_allowlist=lambda: None,
register_gateway_notify=lambda *_args, **_kwargs: None,
)
with patch.dict(sys.modules, {"tools.approval": fake_approval}):
first = server.handle_request(
{
"id": "first",
"method": "session.resume",
"params": {"session_id": target, "cols": 100},
}
)
assert "error" not in first
sid = first["result"]["session_id"]
assert first["result"]["messages"] == [
{"role": "user", "text": "ancestor"},
{"role": "user", "text": "current"},
{"role": "assistant", "text": "current reply"},
]
with server._sessions[sid]["history_lock"]:
server._sessions[sid]["history"] = current_history + [
{"role": "user", "content": "new live turn"},
{"role": "assistant", "content": "new live reply"},
]
second = server.handle_request(
{
"id": "second",
"method": "session.resume",
"params": {"session_id": target, "cols": 120},
}
)
assert "error" not in second
assert second["result"]["session_id"] == sid
assert second["result"]["messages"] == [
{"role": "user", "text": "ancestor"},
{"role": "user", "text": "current"},
{"role": "assistant", "text": "current reply"},
{"role": "user", "text": "new live turn"},
{"role": "assistant", "text": "new live reply"},
]
def test_session_activate_rebinds_orphaned_ws_session_to_current_transport(server, monkeypatch):
"""Reconnect + activate must reattach a parked live session before orphan reap."""
class _Transport:
def write(self, _obj):
return True
sid = "runtime01"
old_transport = server._stdio_transport
new_transport = _Transport()
server._sessions[sid] = {
"agent": types.SimpleNamespace(model="test/model"),
"created_at": 123.0,
"history": [],
"history_lock": threading.RLock(),
"last_active": 123.0,
"running": False,
"session_key": "20260409_010101_abc123",
"transport": old_transport,
}
monkeypatch.setattr(server, "current_transport", lambda: new_transport)
monkeypatch.setattr(server, "_get_db", lambda: None)
monkeypatch.setattr(
server,
"_session_info",
lambda _agent, _session=None: {"model": "test/model"},
)
resp = server.handle_request(
{"id": "activate", "method": "session.activate", "params": {"session_id": sid}}
)
assert "error" not in resp
assert resp["result"]["session_id"] == sid
assert server._sessions[sid]["transport"] is new_transport
assert not server._ws_session_is_orphaned(server._sessions[sid])
def test_session_branch_persists_branched_from_marker(server, monkeypatch):
"""TUI /branch must persist a _branched_from marker so the branch stays
visible in /resume and /sessions.
Regression for issue #20856: the TUI branch leaves the parent live (it
never ends it with end_reason='branched'), so list_sessions_rich's legacy
heuristic never surfaces it — the stable model_config marker is the only
thing that keeps a TUI branch visible.
"""
create_calls = []
class _DB:
def get_session_title(self, _key):
return "parent-title"
def get_next_title_in_lineage(self, base):
return f"{base} 2"
def create_session(self, new_key, **kwargs):
create_calls.append((new_key, kwargs))
return new_key
def append_message(self, **_kwargs):
return None
def set_session_title(self, _key, _title):
return None
monkeypatch.setattr(server, "_get_db", lambda: _DB())
monkeypatch.setattr(server, "_resolve_model", lambda: "test/model")
monkeypatch.setattr(server, "_new_session_key", lambda: "20260101_000001_child0")
monkeypatch.setattr(
server,
"_make_agent",
lambda _sid, key, session_id=None, session_db=None: types.SimpleNamespace(
model="test/model", session_id=session_id or key
),
)
monkeypatch.setattr(server, "_init_session", lambda *_a, **_k: None)
monkeypatch.setattr(server, "_set_session_context", lambda *_a, **_k: [])
monkeypatch.setattr(server, "_clear_session_context", lambda *_a, **_k: None)
monkeypatch.setattr(server, "_session_cwd", lambda _s: "/tmp/branch-cwd")
parent_sid = "parent01"
parent_key = "20260101_000000_parent"
server._sessions[parent_sid] = {
"session_key": parent_key,
"history": [{"role": "user", "content": "hello"}],
"history_lock": threading.Lock(),
"cols": 80,
}
resp = server.handle_request(
{"id": "b1", "method": "session.branch", "params": {"session_id": parent_sid}}
)
assert "error" not in resp, resp
assert len(create_calls) == 1
new_key, kwargs = create_calls[0]
assert new_key == "20260101_000001_child0"
assert kwargs["parent_session_id"] == parent_key
# The marker — without it the branch is invisible in /resume and /sessions.
assert kwargs["model_config"] == {"_branched_from": parent_key}
def test_make_agent_accepts_list_system_prompt(server, monkeypatch):
captured = {}
class _Agent:
def __init__(self, **kwargs):
captured.update(kwargs)
self.model = kwargs.get("model", "")
monkeypatch.setitem(sys.modules, "run_agent", types.SimpleNamespace(AIAgent=_Agent))
monkeypatch.setitem(
sys.modules,
"hermes_cli.runtime_provider",
types.SimpleNamespace(
resolve_runtime_provider=lambda **_kwargs: {
"provider": "test",
"base_url": None,
"api_key": None,
"api_mode": None,
}
),
)
monkeypatch.setattr(server, "_load_cfg", lambda: {"agent": {"system_prompt": ["one", "two"]}})
monkeypatch.setattr(server, "_resolve_startup_runtime", lambda: ("test/model", "test"))
monkeypatch.setattr(server, "_get_db", lambda: None)
server._make_agent("sid", "session-key", session_id="session-key")
assert captured["ephemeral_system_prompt"] == "one\ntwo"
# ── Config I/O ───────────────────────────────────────────────────────
def test_config_load_missing(server, tmp_path):
server._hermes_home = tmp_path
assert server._load_cfg() == {}
def test_config_roundtrip(server, tmp_path):
server._hermes_home = tmp_path
server._save_cfg({"model": "test/model"})
assert server._load_cfg()["model"] == "test/model"
# ── _cli_exec_blocked ────────────────────────────────────────────────
@pytest.mark.parametrize("argv", [
[],
["setup"],
["gateway"],
["sessions", "browse"],
["config", "edit"],
])
def test_cli_exec_blocked(server, argv):
assert server._cli_exec_blocked(argv) is not None
@pytest.mark.parametrize("argv", [
["version"],
["sessions", "list"],
])
def test_cli_exec_allowed(server, argv):
assert server._cli_exec_blocked(argv) is None
# ── slash.exec skill command interception ────────────────────────────
def test_slash_exec_rejects_skill_commands(server):
"""slash.exec must reject skill commands so the TUI falls through to command.dispatch."""
# Register a mock session
sid = "test-session"
server._sessions[sid] = {"session_key": sid, "agent": None}
# Mock scan_skill_commands to return a known skill
fake_skills = {"/hermes-agent-dev": {"name": "hermes-agent-dev", "description": "Dev workflow"}}
with patch("agent.skill_commands.get_skill_commands", return_value=fake_skills):
resp = server.handle_request({
"id": "r1",
"method": "slash.exec",
"params": {"command": "hermes-agent-dev", "session_id": sid},
})
# Should return an error so the TUI's .catch() fires command.dispatch
assert "error" in resp
assert resp["error"]["code"] == 4018
assert "skill command" in resp["error"]["message"]
def test_slash_exec_handles_plugin_commands_in_live_gateway(server):
"""Plugin slash commands return normal slash.exec output without using the worker."""
sid = "test-session"
class Worker:
def __init__(self):
self.calls = []
def run(self, cmd):
self.calls.append(cmd)
return f"worker:{cmd}"
worker = Worker()
server._sessions[sid] = {"session_key": sid, "agent": None, "slash_worker": worker}
with patch(
"hermes_cli.plugins.get_plugin_command_handler",
lambda name: (lambda arg: f"plugin:{arg}") if name == "plugin-cmd" else None,
):
resp = server.handle_request({
"id": "r-plugin-slash",
"method": "slash.exec",
"params": {"command": "plugin-cmd hello", "session_id": sid},
})
assert "error" not in resp
assert resp["result"] == {"output": "plugin:hello"}
assert worker.calls == []
def test_slash_exec_plugin_lookup_failure_falls_back_to_worker(server):
"""Plugin discovery failures must not break ordinary slash-worker commands."""
sid = "test-session"
class Worker:
def __init__(self):
self.calls = []
def run(self, cmd):
self.calls.append(cmd)
return f"worker:{cmd}"
worker = Worker()
server._sessions[sid] = {"session_key": sid, "agent": None, "slash_worker": worker}
with patch(
"hermes_cli.plugins.get_plugin_command_handler",
side_effect=RuntimeError("discovery boom"),
):
resp = server.handle_request({
"id": "r-plugin-lookup-failure",
"method": "slash.exec",
"params": {"command": "help", "session_id": sid},
})
assert "error" not in resp
assert resp["result"] == {"output": "worker:help"}
assert worker.calls == ["help"]
def test_slash_exec_plugin_handler_error_returns_output(server):
"""Plugin handler failures return slash output so the TUI does not redispatch."""
sid = "test-session"
class Worker:
def __init__(self):
self.calls = []
def run(self, cmd):
self.calls.append(cmd)
return f"worker:{cmd}"
def handler(arg):
raise RuntimeError(f"handler boom: {arg}")
worker = Worker()
server._sessions[sid] = {"session_key": sid, "agent": None, "slash_worker": worker}
with patch(
"hermes_cli.plugins.get_plugin_command_handler",
lambda name: handler if name == "plugin-cmd" else None,
):
resp = server.handle_request({
"id": "r-plugin-handler-error",
"method": "slash.exec",
"params": {"command": "plugin-cmd hello", "session_id": sid},
})
assert "error" not in resp
assert resp["result"] == {"output": "Plugin command error: handler boom: hello"}
assert worker.calls == []
@pytest.mark.parametrize("cmd", ["retry", "queue hello", "q hello", "steer fix the test", "plan"])
def test_slash_exec_rejects_pending_input_commands(server, cmd):
"""slash.exec must reject commands that use _pending_input in the CLI."""
sid = "test-session"
server._sessions[sid] = {"session_key": sid, "agent": None}
resp = server.handle_request({
"id": "r1",
"method": "slash.exec",
"params": {"command": cmd, "session_id": sid},
})
assert "error" in resp
assert resp["error"]["code"] == 4018
assert "pending-input command" in resp["error"]["message"]
def test_command_dispatch_queue_sends_message(server):
"""command.dispatch /queue returns {type: 'send', message: ...} for the TUI."""
sid = "test-session"
server._sessions[sid] = {"session_key": sid}
resp = server.handle_request({
"id": "r1",
"method": "command.dispatch",
"params": {"name": "queue", "arg": "tell me about quantum computing", "session_id": sid},
})
assert "error" not in resp
result = resp["result"]
assert result["type"] == "send"
assert result["message"] == "tell me about quantum computing"
def test_command_dispatch_queue_requires_arg(server):
"""command.dispatch /queue without an argument returns an error."""
sid = "test-session"
server._sessions[sid] = {"session_key": sid}
resp = server.handle_request({
"id": "r2",
"method": "command.dispatch",
"params": {"name": "queue", "arg": "", "session_id": sid},
})
assert "error" in resp
assert resp["error"]["code"] == 4004
def test_skills_manage_search_uses_tools_hub_sources(server):
result = type("Result", (), {
"description": "Build better terminal demos",
"name": "showroom",
})()
auth = MagicMock(return_value="auth")
router = MagicMock(return_value=["source"])
search = MagicMock(return_value=[result])
fake_hub = types.SimpleNamespace(
GitHubAuth=auth,
create_source_router=router,
unified_search=search,
)
with patch.dict(sys.modules, {"tools.skills_hub": fake_hub}):
resp = server.handle_request({
"id": "skills-search",
"method": "skills.manage",
"params": {"action": "search", "query": "showroom"},
})
assert "error" not in resp
assert resp["result"] == {
"results": [{"description": "Build better terminal demos", "name": "showroom"}]
}
auth.assert_called_once_with()
router.assert_called_once_with("auth")
search.assert_called_once_with("showroom", ["source"], source_filter="all", limit=20)
def test_command_dispatch_steer_fallback_sends_message(server):
"""command.dispatch /steer with no active agent falls back to send."""
sid = "test-session"
server._sessions[sid] = {"session_key": sid, "agent": None}
resp = server.handle_request({
"id": "r3",
"method": "command.dispatch",
"params": {"name": "steer", "arg": "focus on testing", "session_id": sid},
})
assert "error" not in resp
result = resp["result"]
assert result["type"] == "send"
assert result["message"] == "focus on testing"
def test_command_dispatch_retry_finds_last_user_message(server):
"""command.dispatch /retry walks session['history'] to find the last user message."""
sid = "test-session"
history = [
{"role": "user", "content": "first question"},
{"role": "assistant", "content": "first answer"},
{"role": "user", "content": "second question"},
{"role": "assistant", "content": "second answer"},
]
server._sessions[sid] = {
"session_key": sid,
"agent": None,
"history": history,
"history_lock": threading.Lock(),
"history_version": 0,
}
resp = server.handle_request({
"id": "r4",
"method": "command.dispatch",
"params": {"name": "retry", "session_id": sid},
})
assert "error" not in resp
result = resp["result"]
assert result["type"] == "send"
assert result["message"] == "second question"
# Verify history was truncated: everything from last user message onward removed
assert len(server._sessions[sid]["history"]) == 2
assert server._sessions[sid]["history"][-1]["role"] == "assistant"
assert server._sessions[sid]["history_version"] == 1
def test_command_dispatch_retry_empty_history(server):
"""command.dispatch /retry with empty history returns error."""
sid = "test-session"
server._sessions[sid] = {
"session_key": sid,
"agent": None,
"history": [],
"history_lock": threading.Lock(),
"history_version": 0,
}
resp = server.handle_request({
"id": "r5",
"method": "command.dispatch",
"params": {"name": "retry", "session_id": sid},
})
assert "error" in resp
assert resp["error"]["code"] == 4018
def test_command_dispatch_retry_handles_multipart_content(server):
"""command.dispatch /retry extracts text from multipart content lists."""
sid = "test-session"
history = [
{"role": "user", "content": [
{"type": "text", "text": "analyze this"},
{"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}}
]},
{"role": "assistant", "content": "I see the image."},
]
server._sessions[sid] = {
"session_key": sid,
"agent": None,
"history": history,
"history_lock": threading.Lock(),
"history_version": 0,
}
resp = server.handle_request({
"id": "r6",
"method": "command.dispatch",
"params": {"name": "retry", "session_id": sid},
})
assert "error" not in resp
result = resp["result"]
assert result["type"] == "send"
assert result["message"] == "analyze this"
def test_command_dispatch_returns_skill_payload(server):
"""command.dispatch returns structured skill payload for the TUI to send()."""
sid = "test-session"
server._sessions[sid] = {"session_key": sid}
fake_skills = {"/hermes-agent-dev": {"name": "hermes-agent-dev", "description": "Dev workflow"}}
fake_msg = "Loaded skill content here"
with patch("agent.skill_commands.scan_skill_commands", return_value=fake_skills), \
patch("agent.skill_commands.build_skill_invocation_message", return_value=fake_msg):
resp = server.handle_request({
"id": "r2",
"method": "command.dispatch",
"params": {"name": "hermes-agent-dev", "session_id": sid},
})
assert "error" not in resp
result = resp["result"]
assert result["type"] == "skill"
assert result["message"] == fake_msg
assert result["name"] == "hermes-agent-dev"
def test_command_dispatch_awaits_async_plugin_handler(server):
async def _handler(arg):
return f"async:{arg}"
with patch(
"hermes_cli.plugins.get_plugin_command_handler",
lambda name: _handler if name == "async-cmd" else None,
):
resp = server.handle_request({
"id": "r-plugin",
"method": "command.dispatch",
"params": {"name": "async-cmd", "arg": "hello"},
})
assert "error" not in resp
assert resp["result"] == {"type": "plugin", "output": "async:hello"}
# ── dispatch(): pool routing for long handlers (#12546) ──────────────
def test_dispatch_runs_short_handlers_inline(server):
"""Non-long handlers return their response synchronously from dispatch()."""
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
resp = server.dispatch({"id": "r1", "method": "fast.ping", "params": {}})
assert resp == {"jsonrpc": "2.0", "id": "r1", "result": {"pong": True}}
def test_dispatch_offloads_long_handlers_and_emits_via_stdout(capture):
"""Long handlers run on the pool and write their response via write_json."""
server, buf = capture
server._methods["slash.exec"] = lambda rid, params: server._ok(rid, {"output": "hi"})
resp = server.dispatch({"id": "r2", "method": "slash.exec", "params": {}})
assert resp is None
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written == {"jsonrpc": "2.0", "id": "r2", "result": {"output": "hi"}}
def test_dispatch_long_handler_does_not_block_fast_handler(server):
"""A slow long handler must not prevent a concurrent fast handler from completing."""
released = threading.Event()
server._methods["slash.exec"] = lambda rid, params: (released.wait(timeout=5), server._ok(rid, {"done": True}))[1]
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
t0 = time.monotonic()
assert server.dispatch({"id": "slow", "method": "slash.exec", "params": {}}) is None
fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}})
fast_elapsed = time.monotonic() - t0
assert fast_resp["result"] == {"pong": True}
assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind slow handler"
released.set()
def test_dispatch_session_compress_does_not_block_fast_handler(server):
"""Manual TUI compaction can take minutes, so it must not block the RPC loop."""
released = threading.Event()
def slow_compress(rid, params):
released.wait(timeout=5)
return server._ok(rid, {"done": True})
server._methods["session.compress"] = slow_compress
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
t0 = time.monotonic()
assert server.dispatch({"id": "slow", "method": "session.compress", "params": {}}) is None
fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}})
fast_elapsed = time.monotonic() - t0
assert fast_resp["result"] == {"pong": True}
assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind session.compress"
released.set()
def test_dispatch_long_handler_exception_produces_error_response(capture):
"""An exception inside a pool-dispatched handler still yields a JSON-RPC error."""
server, buf = capture
def boom(rid, params):
raise RuntimeError("kaboom")
server._methods["slash.exec"] = boom
server.dispatch({"id": "r3", "method": "slash.exec", "params": {}})
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written["id"] == "r3"
assert written["error"]["code"] == -32000
assert "kaboom" in written["error"]["message"]
def test_dispatch_unknown_long_method_still_goes_inline(server):
"""Method name not in _LONG_HANDLERS takes the sync path even if handler is slow."""
server._methods["some.method"] = lambda rid, params: server._ok(rid, {"ok": True})
resp = server.dispatch({"id": "r4", "method": "some.method", "params": {}})
assert resp["result"] == {"ok": True}