fix(tui): responsive /compress with live progress + CLI-parity feedback (#17661)

* fix(tui): offload manual compaction RPC

Route TUI session compression through the existing long-handler pool so slow compaction does not block other gateway RPCs.

* fix(tui): show compaction progress immediately

Print a local status line before the compress RPC starts so slow manual compaction does not look like a no-op.

* feat(tui): rich /compress feedback parity with CLI

Show pre-compaction message count and rough token estimate immediately, emit a status update so the bottom bar reflects ongoing compaction, and report a multi-line summary (headline + token delta + optional note) using the shared summarize_manual_compression helper.

* fix(tui): show live compaction estimate in transcript

Mirror compression progress status into the transcript so users see the backend message count and token estimate while /compress is still running.

* fix(tui): single live compaction line with spinner glyph

Drop the redundant local "compressing context..." placeholder and prefix the live backend status line with a braille spinner glyph so /compress reads as a single in-progress row.

* fix(tui): address review nits on /compress feedback

Reuse the precomputed token estimate inside _compress_session_history so the gateway does not redo the O(n) work while holding history_lock, keep the status bar pinned during long manual compactions instead of auto-restoring after 4s, and drop the redundant noop bullet that doubled with the system role glyph.

* fix(tui): release history_lock during compaction LLM call

Move the snapshot/commit pattern into _compress_session_history so the lock is held only across the in-memory bookkeeping, not during agent._compress_context. Also emit a final neutral status update from session.compress so the pinned compressing indicator clears even on errors.

* fix(tui): rebuild prompt cleanly + sync session_key after compress

Pass system_message=None so AIAgent._compress_context rebuilds the system prompt without nesting the cached identity block. Reuse the handler's pre-snapshotted history inside _compress_session_history to avoid a second O(n) copy under the lock. After compaction, when AIAgent._compress_context rotates session_id, sync the gateway session_key, migrate approval notify + yolo state, restart the slash worker, and clear the stale pending title. Mirrors HermesCLI._manual_compress.

* Avoid /compress lock re-entry in slash side effects.

Stop pre-locking history before _compress_session_history in slash command mirroring, keep session-key sync parity with manual compression, and add a regression test that asserts /compress is invoked without holding history_lock.
This commit is contained in:
brooklyn! 2026-04-29 18:01:18 -07:00 committed by GitHub
parent 98f5be13fa
commit fc7f55f490
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 309 additions and 29 deletions

View file

@ -1566,7 +1566,7 @@ def test_session_compress_uses_compress_helper(monkeypatch):
monkeypatch.setattr( monkeypatch.setattr(
server, server,
"_compress_session_history", "_compress_session_history",
lambda session, focus_topic=None: (2, {"total": 42}), lambda session, focus_topic=None, **_kw: (2, {"total": 42}),
) )
monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"}) monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"})
@ -1577,7 +1577,52 @@ def test_session_compress_uses_compress_helper(monkeypatch):
assert resp["result"]["removed"] == 2 assert resp["result"]["removed"] == 2
assert resp["result"]["usage"]["total"] == 42 assert resp["result"]["usage"]["total"] == 42
emit.assert_called_once_with("session.info", "sid", {"model": "x"}) emit.assert_any_call("session.info", "sid", {"model": "x"})
# Final status.update clears the pinned "compressing" indicator so the
# status bar can revert to the neutral state when compaction finishes.
emit.assert_any_call(
"status.update", "sid", {"kind": "status", "text": "ready"}
)
def test_session_compress_syncs_session_key_after_rotation(monkeypatch):
"""When AIAgent._compress_context rotates session_id (compression split),
the gateway session_key must follow so subsequent approval routing,
DB title/history lookups, and slash worker resume target the new
continuation session mirrors HermesCLI._manual_compress's
session_id sync (cli.py).
"""
agent = types.SimpleNamespace(session_id="rotated-id")
server._sessions["sid"] = _session(agent=agent)
server._sessions["sid"]["session_key"] = "old-key"
server._sessions["sid"]["pending_title"] = "stale title"
monkeypatch.setattr(
server,
"_compress_session_history",
lambda session, focus_topic=None, **_kw: (2, {"total": 42}),
)
monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"})
restart_calls = []
monkeypatch.setattr(
server, "_restart_slash_worker", lambda s: restart_calls.append(s)
)
try:
with patch("tui_gateway.server._emit"):
server.handle_request(
{
"id": "1",
"method": "session.compress",
"params": {"session_id": "sid"},
}
)
assert server._sessions["sid"]["session_key"] == "rotated-id"
assert server._sessions["sid"]["pending_title"] is None
assert len(restart_calls) == 1
finally:
server._sessions.pop("sid", None)
def test_prompt_submit_sets_approval_session_key(monkeypatch): def test_prompt_submit_sets_approval_session_key(monkeypatch):
@ -2423,6 +2468,39 @@ def test_mirror_slash_side_effects_allowed_when_idle(monkeypatch):
assert applied["model"] assert applied["model"]
def test_mirror_slash_compress_does_not_prelock_history(monkeypatch):
"""Regression guard: /compress side effect must not hold history_lock
when calling _compress_session_history (the helper snapshots under
the same non-reentrant lock internally)."""
import types
seen = {"compress": False, "sync": False}
emitted = []
def _fake_compress(session, focus_topic=None, **_kw):
seen["compress"] = True
assert not session["history_lock"].locked()
return (0, {"total": 0})
def _fake_sync(_sid, _session):
seen["sync"] = True
monkeypatch.setattr(server, "_compress_session_history", _fake_compress)
monkeypatch.setattr(server, "_sync_session_key_after_compress", _fake_sync)
monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"})
monkeypatch.setattr(server, "_emit", lambda *args: emitted.append(args))
session = _session(running=False)
session["agent"] = types.SimpleNamespace(model="x")
warning = server._mirror_slash_side_effects("sid", session, "/compress")
assert warning == ""
assert seen["compress"]
assert seen["sync"]
assert ("session.info", "sid", {"model": "x"}) in emitted
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# session.create / session.close race: fast /new churn must not orphan the # session.create / session.close race: fast /new churn must not orphan the
# slash_worker subprocess or the global approval-notify registration. # slash_worker subprocess or the global approval-notify registration.

View file

@ -298,7 +298,7 @@ def test_session_resume_returns_hydrated_messages(server, monkeypatch):
def reopen_session(self, _sid): def reopen_session(self, _sid):
return None return None
def get_messages_as_conversation(self, _sid): def get_messages_as_conversation(self, _sid, include_ancestors=False):
return [ return [
{"role": "user", "content": "hello"}, {"role": "user", "content": "hello"},
{"role": "assistant", "content": "yo"}, {"role": "assistant", "content": "yo"},
@ -641,6 +641,29 @@ def test_dispatch_long_handler_does_not_block_fast_handler(server):
released.set() released.set()
def test_dispatch_session_compress_does_not_block_fast_handler(server):
"""Manual TUI compaction can take minutes, so it must not block the RPC loop."""
released = threading.Event()
def slow_compress(rid, params):
released.wait(timeout=5)
return server._ok(rid, {"done": True})
server._methods["session.compress"] = slow_compress
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
t0 = time.monotonic()
assert server.dispatch({"id": "slow", "method": "session.compress", "params": {}}) is None
fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}})
fast_elapsed = time.monotonic() - t0
assert fast_resp["result"] == {"pong": True}
assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind session.compress"
released.set()
def test_dispatch_long_handler_exception_produces_error_response(capture): def test_dispatch_long_handler_exception_produces_error_response(capture):
"""An exception inside a pool-dispatched handler still yields a JSON-RPC error.""" """An exception inside a pool-dispatched handler still yields a JSON-RPC error."""
server, buf = capture server, buf = capture

View file

@ -134,7 +134,7 @@ _DETAIL_MODES = frozenset({"hidden", "collapsed", "expanded"})
# ── Async RPC dispatch (#12546) ────────────────────────────────────── # ── Async RPC dispatch (#12546) ──────────────────────────────────────
# A handful of handlers block the dispatcher loop in entry.py for seconds # A handful of handlers block the dispatcher loop in entry.py for seconds
# to minutes (slash.exec, cli.exec, shell.exec, session.resume, # to minutes (slash.exec, cli.exec, shell.exec, session.resume,
# session.branch, skills.manage). While they're running, inbound RPCs — # session.branch, session.compress, skills.manage). While they're running, inbound RPCs —
# notably approval.respond and session.interrupt — sit unread in the # notably approval.respond and session.interrupt — sit unread in the
# stdin pipe. We route only those slow handlers onto a small thread pool; # stdin pipe. We route only those slow handlers onto a small thread pool;
# everything else stays on the main thread so ordering stays sane for the # everything else stays on the main thread so ordering stays sane for the
@ -145,6 +145,7 @@ _LONG_HANDLERS = frozenset(
"browser.manage", "browser.manage",
"cli.exec", "cli.exec",
"session.branch", "session.branch",
"session.compress",
"session.resume", "session.resume",
"shell.exec", "shell.exec",
"skills.manage", "skills.manage",
@ -1101,24 +1102,111 @@ def _apply_model_switch(sid: str, session: dict, raw_input: str) -> dict:
def _compress_session_history( def _compress_session_history(
session: dict, focus_topic: str | None = None session: dict,
focus_topic: str | None = None,
approx_tokens: int | None = None,
before_messages: list | None = None,
history_version: int | None = None,
) -> tuple[int, dict]: ) -> tuple[int, dict]:
from agent.model_metadata import estimate_messages_tokens_rough from agent.model_metadata import estimate_messages_tokens_rough
agent = session["agent"] agent = session["agent"]
history = list(session.get("history", [])) # Snapshot history under the lock so the LLM-bound compression call
# below does NOT hold history_lock for the duration of the request —
# otherwise other handlers acquiring the lock (prompt.submit etc.)
# block on the dispatcher loop while compaction runs.
if before_messages is None or history_version is None:
with session["history_lock"]:
before_messages = list(session.get("history", []))
history_version = int(session.get("history_version", 0))
history = before_messages
if len(history) < 4: if len(history) < 4:
return 0, _get_usage(agent) usage = _get_usage(agent)
return 0, usage
if approx_tokens is None:
approx_tokens = estimate_messages_tokens_rough(history) approx_tokens = estimate_messages_tokens_rough(history)
# Pass system_message=None so AIAgent._compress_context rebuilds the
# system prompt cleanly via _build_system_prompt(None). Passing the
# cached prompt (which already contains the agent identity block)
# makes the rebuild append the identity a second time. Mirrors the
# CLI's _manual_compress fix for issue #15281.
compressed, _ = agent._compress_context( compressed, _ = agent._compress_context(
history, history,
getattr(agent, "_cached_system_prompt", "") or "", None,
approx_tokens=approx_tokens, approx_tokens=approx_tokens,
focus_topic=focus_topic or None, focus_topic=focus_topic or None,
) )
with session["history_lock"]:
if int(session.get("history_version", 0)) != history_version:
# External mutation during compaction — drop the compressed
# result so we don't clobber concurrent edits.
usage = _get_usage(agent)
return 0, usage
session["history"] = compressed session["history"] = compressed
session["history_version"] = int(session.get("history_version", 0)) + 1 session["history_version"] = history_version + 1
return len(history) - len(compressed), _get_usage(agent) usage = _get_usage(agent)
return len(history) - len(compressed), usage
def _sync_session_key_after_compress(sid: str, session: dict) -> None:
"""Re-anchor session_key when AIAgent._compress_context rotates session_id.
AIAgent._compress_context ends the current SessionDB session and creates
a new continuation session, rotating ``agent.session_id``. The TUI
gateway keeps the gateway-side ``session_key`` separate (used for
approval routing, slash worker init, DB title/history lookups, yolo
state). Without this sync, those operations would target the ended
parent session while the agent writes to the new continuation session.
Mirrors HermesCLI._manual_compress's session_id sync.
"""
agent = session.get("agent")
new_session_id = getattr(agent, "session_id", None) or ""
old_key = session.get("session_key", "") or ""
if not new_session_id or new_session_id == old_key:
return
try:
from tools.approval import (
disable_session_yolo,
enable_session_yolo,
is_session_yolo_enabled,
register_gateway_notify,
unregister_gateway_notify,
)
try:
unregister_gateway_notify(old_key)
except Exception:
pass
session["session_key"] = new_session_id
try:
yolo_was_on = is_session_yolo_enabled(old_key)
except Exception:
yolo_was_on = False
if yolo_was_on:
try:
enable_session_yolo(new_session_id)
disable_session_yolo(old_key)
except Exception:
pass
try:
register_gateway_notify(
new_session_id,
lambda data: _emit("approval.request", sid, data),
)
except Exception:
pass
except Exception:
# Even if the approval module fails to import, still anchor the
# session_key on the new continuation id so downstream lookups
# don't keep targeting the ended row.
session["session_key"] = new_session_id
session["pending_title"] = None
try:
_restart_slash_worker(session)
except Exception:
pass
def _get_usage(agent) -> dict: def _get_usage(agent) -> dict:
@ -2137,24 +2225,70 @@ def _(rid, params: dict) -> dict:
return _err( return _err(
rid, 4009, "session busy — /interrupt the current turn before /compress" rid, 4009, "session busy — /interrupt the current turn before /compress"
) )
sid = params.get("session_id", "")
focus_topic = str(params.get("focus_topic", "") or "").strip()
try: try:
from agent.manual_compression_feedback import summarize_manual_compression
from agent.model_metadata import estimate_messages_tokens_rough
with session["history_lock"]: with session["history_lock"]:
removed, usage = _compress_session_history( before_messages = list(session.get("history", []))
session, str(params.get("focus_topic", "") or "").strip() history_version = int(session.get("history_version", 0))
before_count = len(before_messages)
before_tokens = (
estimate_messages_tokens_rough(before_messages) if before_count else 0
) )
if before_count >= 4:
focus_suffix = f', focus: "{focus_topic}"' if focus_topic else ""
_status_update(
sid,
"compressing",
f"⠋ compressing {before_count} messages "
f"(~{before_tokens:,} tok){focus_suffix}",
)
try:
removed, usage = _compress_session_history(
session,
focus_topic,
approx_tokens=before_tokens,
before_messages=before_messages,
history_version=history_version,
)
with session["history_lock"]:
messages = list(session.get("history", [])) messages = list(session.get("history", []))
info = _session_info(session["agent"]) after_count = len(messages)
_emit("session.info", params.get("session_id", ""), info) after_tokens = (
estimate_messages_tokens_rough(messages) if after_count else 0
)
agent = session["agent"]
_sync_session_key_after_compress(sid, session)
summary = summarize_manual_compression(
before_messages, messages, before_tokens, after_tokens
)
info = _session_info(agent)
_emit("session.info", sid, info)
return _ok( return _ok(
rid, rid,
{ {
"status": "compressed", "status": "compressed",
"removed": removed, "removed": removed,
"before_messages": before_count,
"after_messages": after_count,
"before_tokens": before_tokens,
"after_tokens": after_tokens,
"summary": summary,
"usage": usage, "usage": usage,
"info": info, "info": info,
"messages": messages, "messages": messages,
}, },
) )
finally:
# Always clear the pinned compressing status so the bar
# reverts to neutral whether compaction succeeded, was a
# no-op, or raised.
_status_update(sid, "ready")
except Exception as e: except Exception as e:
return _err(rid, 5005, str(e)) return _err(rid, 5005, str(e))
@ -4508,8 +4642,8 @@ def _mirror_slash_side_effects(sid: str, session: dict, command: str) -> str:
agent.ephemeral_system_prompt = new_prompt or None agent.ephemeral_system_prompt = new_prompt or None
agent._cached_system_prompt = None agent._cached_system_prompt = None
elif name == "compress" and agent: elif name == "compress" and agent:
with session["history_lock"]:
_compress_session_history(session, arg) _compress_session_history(session, arg)
_sync_session_key_after_compress(sid, session)
_emit("session.info", sid, _session_info(agent)) _emit("session.info", sid, _session_info(agent))
elif name == "fast" and agent: elif name == "fast" and agent:
mode = arg.lower() mode = arg.lower()

View file

@ -119,6 +119,19 @@ describe('createGatewayEventHandler', () => {
expect(getTurnState().todos).toEqual(todos) expect(getTurnState().todos).toEqual(todos)
}) })
it('prints compaction progress status into the transcript', () => {
const appended: Msg[] = []
const ctx = buildCtx(appended)
const onEvent = createGatewayEventHandler(ctx)
onEvent({
payload: { kind: 'compressing', text: 'compressing 968 messages (~123,400 tok)…' },
type: 'status.update'
} as any)
expect(ctx.system.sys).toHaveBeenCalledWith('compressing 968 messages (~123,400 tok)…')
})
it('clears the visible todo list when the todo tool returns an empty list', () => { it('clears the visible todo list when the todo tool returns an empty list', () => {
const appended: Msg[] = [] const appended: Msg[] = []
const todos = [{ content: 'Boil water', id: 'boil', status: 'in_progress' }] const todos = [{ content: 'Boil water', id: 'boil', status: 'in_progress' }]

View file

@ -282,6 +282,11 @@ export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev:
setStatus(p.text) setStatus(p.text)
if (p.kind === 'compressing') {
sys(p.text)
return
}
if (!p.kind || p.kind === 'status') { if (!p.kind || p.kind === 'status') {
return return
} }

View file

@ -154,6 +154,22 @@ export const sessionCommands: SlashCommand[] = [
patchUiState(state => ({ ...state, usage: { ...state.usage, ...r.usage } })) patchUiState(state => ({ ...state, usage: { ...state.usage, ...r.usage } }))
} }
if (r.summary?.headline) {
const prefix = r.summary.noop ? '' : '✓ '
ctx.transcript.sys(`${prefix}${r.summary.headline}`)
if (r.summary.token_line) {
ctx.transcript.sys(` ${r.summary.token_line}`)
}
if (r.summary.note) {
ctx.transcript.sys(` ${r.summary.note}`)
}
return
}
if ((r.removed ?? 0) <= 0) { if ((r.removed ?? 0) <= 0) {
return ctx.transcript.sys('nothing to compress') return ctx.transcript.sys('nothing to compress')
} }
@ -163,6 +179,7 @@ export const sessionCommands: SlashCommand[] = [
) )
}) })
) )
.catch(ctx.guardedErr)
} }
}, },

View file

@ -167,9 +167,19 @@ export interface SessionUsageResponse {
} }
export interface SessionCompressResponse { export interface SessionCompressResponse {
after_messages?: number
after_tokens?: number
before_messages?: number
before_tokens?: number
info?: SessionInfo info?: SessionInfo
messages?: GatewayTranscriptMessage[] messages?: GatewayTranscriptMessage[]
removed?: number removed?: number
summary?: {
headline?: string
noop?: boolean
note?: null | string
token_line?: string
}
usage?: Usage usage?: Usage
} }