From fc7f55f4905863f816ae9e6d220e9e82707c1652 Mon Sep 17 00:00:00 2001 From: brooklyn! Date: Wed, 29 Apr 2026 18:01:18 -0700 Subject: [PATCH] fix(tui): responsive /compress with live progress + CLI-parity feedback (#17661) * fix(tui): offload manual compaction RPC Route TUI session compression through the existing long-handler pool so slow compaction does not block other gateway RPCs. * fix(tui): show compaction progress immediately Print a local status line before the compress RPC starts so slow manual compaction does not look like a no-op. * feat(tui): rich /compress feedback parity with CLI Show pre-compaction message count and rough token estimate immediately, emit a status update so the bottom bar reflects ongoing compaction, and report a multi-line summary (headline + token delta + optional note) using the shared summarize_manual_compression helper. * fix(tui): show live compaction estimate in transcript Mirror compression progress status into the transcript so users see the backend message count and token estimate while /compress is still running. * fix(tui): single live compaction line with spinner glyph Drop the redundant local "compressing context..." placeholder and prefix the live backend status line with a braille spinner glyph so /compress reads as a single in-progress row. * fix(tui): address review nits on /compress feedback Reuse the precomputed token estimate inside _compress_session_history so the gateway does not redo the O(n) work while holding history_lock, keep the status bar pinned during long manual compactions instead of auto-restoring after 4s, and drop the redundant noop bullet that doubled with the system role glyph. * fix(tui): release history_lock during compaction LLM call Move the snapshot/commit pattern into _compress_session_history so the lock is held only across the in-memory bookkeeping, not during agent._compress_context. Also emit a final neutral status update from session.compress so the pinned compressing indicator clears even on errors. * fix(tui): rebuild prompt cleanly + sync session_key after compress Pass system_message=None so AIAgent._compress_context rebuilds the system prompt without nesting the cached identity block. Reuse the handler's pre-snapshotted history inside _compress_session_history to avoid a second O(n) copy under the lock. After compaction, when AIAgent._compress_context rotates session_id, sync the gateway session_key, migrate approval notify + yolo state, restart the slash worker, and clear the stale pending title. Mirrors HermesCLI._manual_compress. * Avoid /compress lock re-entry in slash side effects. Stop pre-locking history before _compress_session_history in slash command mirroring, keep session-key sync parity with manual compression, and add a regression test that asserts /compress is invoked without holding history_lock. --- tests/test_tui_gateway_server.py | 82 +++++++- tests/tui_gateway/test_protocol.py | 25 ++- tui_gateway/server.py | 186 +++++++++++++++--- .../createGatewayEventHandler.test.ts | 13 ++ ui-tui/src/app/createGatewayEventHandler.ts | 5 + ui-tui/src/app/slash/commands/session.ts | 17 ++ ui-tui/src/gatewayTypes.ts | 10 + 7 files changed, 309 insertions(+), 29 deletions(-) diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index 24c37c14f5..23332f62ca 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -1566,7 +1566,7 @@ def test_session_compress_uses_compress_helper(monkeypatch): monkeypatch.setattr( server, "_compress_session_history", - lambda session, focus_topic=None: (2, {"total": 42}), + lambda session, focus_topic=None, **_kw: (2, {"total": 42}), ) monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"}) @@ -1577,7 +1577,52 @@ def test_session_compress_uses_compress_helper(monkeypatch): assert resp["result"]["removed"] == 2 assert resp["result"]["usage"]["total"] == 42 - emit.assert_called_once_with("session.info", "sid", {"model": "x"}) + emit.assert_any_call("session.info", "sid", {"model": "x"}) + # Final status.update clears the pinned "compressing" indicator so the + # status bar can revert to the neutral state when compaction finishes. + emit.assert_any_call( + "status.update", "sid", {"kind": "status", "text": "ready"} + ) + + +def test_session_compress_syncs_session_key_after_rotation(monkeypatch): + """When AIAgent._compress_context rotates session_id (compression split), + the gateway session_key must follow so subsequent approval routing, + DB title/history lookups, and slash worker resume target the new + continuation session — mirrors HermesCLI._manual_compress's + session_id sync (cli.py). + """ + agent = types.SimpleNamespace(session_id="rotated-id") + server._sessions["sid"] = _session(agent=agent) + server._sessions["sid"]["session_key"] = "old-key" + server._sessions["sid"]["pending_title"] = "stale title" + + monkeypatch.setattr( + server, + "_compress_session_history", + lambda session, focus_topic=None, **_kw: (2, {"total": 42}), + ) + monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"}) + restart_calls = [] + monkeypatch.setattr( + server, "_restart_slash_worker", lambda s: restart_calls.append(s) + ) + + try: + with patch("tui_gateway.server._emit"): + server.handle_request( + { + "id": "1", + "method": "session.compress", + "params": {"session_id": "sid"}, + } + ) + + assert server._sessions["sid"]["session_key"] == "rotated-id" + assert server._sessions["sid"]["pending_title"] is None + assert len(restart_calls) == 1 + finally: + server._sessions.pop("sid", None) def test_prompt_submit_sets_approval_session_key(monkeypatch): @@ -2423,6 +2468,39 @@ def test_mirror_slash_side_effects_allowed_when_idle(monkeypatch): assert applied["model"] +def test_mirror_slash_compress_does_not_prelock_history(monkeypatch): + """Regression guard: /compress side effect must not hold history_lock + when calling _compress_session_history (the helper snapshots under + the same non-reentrant lock internally).""" + import types + + seen = {"compress": False, "sync": False} + emitted = [] + + def _fake_compress(session, focus_topic=None, **_kw): + seen["compress"] = True + assert not session["history_lock"].locked() + return (0, {"total": 0}) + + def _fake_sync(_sid, _session): + seen["sync"] = True + + monkeypatch.setattr(server, "_compress_session_history", _fake_compress) + monkeypatch.setattr(server, "_sync_session_key_after_compress", _fake_sync) + monkeypatch.setattr(server, "_session_info", lambda _agent: {"model": "x"}) + monkeypatch.setattr(server, "_emit", lambda *args: emitted.append(args)) + + session = _session(running=False) + session["agent"] = types.SimpleNamespace(model="x") + + warning = server._mirror_slash_side_effects("sid", session, "/compress") + + assert warning == "" + assert seen["compress"] + assert seen["sync"] + assert ("session.info", "sid", {"model": "x"}) in emitted + + # --------------------------------------------------------------------------- # session.create / session.close race: fast /new churn must not orphan the # slash_worker subprocess or the global approval-notify registration. diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 6c94ec0710..bd527608a7 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -298,7 +298,7 @@ def test_session_resume_returns_hydrated_messages(server, monkeypatch): def reopen_session(self, _sid): return None - def get_messages_as_conversation(self, _sid): + def get_messages_as_conversation(self, _sid, include_ancestors=False): return [ {"role": "user", "content": "hello"}, {"role": "assistant", "content": "yo"}, @@ -641,6 +641,29 @@ def test_dispatch_long_handler_does_not_block_fast_handler(server): released.set() +def test_dispatch_session_compress_does_not_block_fast_handler(server): + """Manual TUI compaction can take minutes, so it must not block the RPC loop.""" + released = threading.Event() + + def slow_compress(rid, params): + released.wait(timeout=5) + return server._ok(rid, {"done": True}) + + server._methods["session.compress"] = slow_compress + server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True}) + + t0 = time.monotonic() + assert server.dispatch({"id": "slow", "method": "session.compress", "params": {}}) is None + + fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}}) + fast_elapsed = time.monotonic() - t0 + + assert fast_resp["result"] == {"pong": True} + assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind session.compress" + + released.set() + + def test_dispatch_long_handler_exception_produces_error_response(capture): """An exception inside a pool-dispatched handler still yields a JSON-RPC error.""" server, buf = capture diff --git a/tui_gateway/server.py b/tui_gateway/server.py index c3ef5bb574..667980d34b 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -134,7 +134,7 @@ _DETAIL_MODES = frozenset({"hidden", "collapsed", "expanded"}) # ── Async RPC dispatch (#12546) ────────────────────────────────────── # A handful of handlers block the dispatcher loop in entry.py for seconds # to minutes (slash.exec, cli.exec, shell.exec, session.resume, -# session.branch, skills.manage). While they're running, inbound RPCs — +# session.branch, session.compress, skills.manage). While they're running, inbound RPCs — # notably approval.respond and session.interrupt — sit unread in the # stdin pipe. We route only those slow handlers onto a small thread pool; # everything else stays on the main thread so ordering stays sane for the @@ -145,6 +145,7 @@ _LONG_HANDLERS = frozenset( "browser.manage", "cli.exec", "session.branch", + "session.compress", "session.resume", "shell.exec", "skills.manage", @@ -1101,24 +1102,111 @@ def _apply_model_switch(sid: str, session: dict, raw_input: str) -> dict: def _compress_session_history( - session: dict, focus_topic: str | None = None + session: dict, + focus_topic: str | None = None, + approx_tokens: int | None = None, + before_messages: list | None = None, + history_version: int | None = None, ) -> tuple[int, dict]: from agent.model_metadata import estimate_messages_tokens_rough agent = session["agent"] - history = list(session.get("history", [])) + # Snapshot history under the lock so the LLM-bound compression call + # below does NOT hold history_lock for the duration of the request — + # otherwise other handlers acquiring the lock (prompt.submit etc.) + # block on the dispatcher loop while compaction runs. + if before_messages is None or history_version is None: + with session["history_lock"]: + before_messages = list(session.get("history", [])) + history_version = int(session.get("history_version", 0)) + history = before_messages if len(history) < 4: - return 0, _get_usage(agent) - approx_tokens = estimate_messages_tokens_rough(history) + usage = _get_usage(agent) + return 0, usage + if approx_tokens is None: + approx_tokens = estimate_messages_tokens_rough(history) + # Pass system_message=None so AIAgent._compress_context rebuilds the + # system prompt cleanly via _build_system_prompt(None). Passing the + # cached prompt (which already contains the agent identity block) + # makes the rebuild append the identity a second time. Mirrors the + # CLI's _manual_compress fix for issue #15281. compressed, _ = agent._compress_context( history, - getattr(agent, "_cached_system_prompt", "") or "", + None, approx_tokens=approx_tokens, focus_topic=focus_topic or None, ) - session["history"] = compressed - session["history_version"] = int(session.get("history_version", 0)) + 1 - return len(history) - len(compressed), _get_usage(agent) + with session["history_lock"]: + if int(session.get("history_version", 0)) != history_version: + # External mutation during compaction — drop the compressed + # result so we don't clobber concurrent edits. + usage = _get_usage(agent) + return 0, usage + session["history"] = compressed + session["history_version"] = history_version + 1 + usage = _get_usage(agent) + return len(history) - len(compressed), usage + + +def _sync_session_key_after_compress(sid: str, session: dict) -> None: + """Re-anchor session_key when AIAgent._compress_context rotates session_id. + + AIAgent._compress_context ends the current SessionDB session and creates + a new continuation session, rotating ``agent.session_id``. The TUI + gateway keeps the gateway-side ``session_key`` separate (used for + approval routing, slash worker init, DB title/history lookups, yolo + state). Without this sync, those operations would target the ended + parent session while the agent writes to the new continuation session. + Mirrors HermesCLI._manual_compress's session_id sync. + """ + agent = session.get("agent") + new_session_id = getattr(agent, "session_id", None) or "" + old_key = session.get("session_key", "") or "" + if not new_session_id or new_session_id == old_key: + return + + try: + from tools.approval import ( + disable_session_yolo, + enable_session_yolo, + is_session_yolo_enabled, + register_gateway_notify, + unregister_gateway_notify, + ) + + try: + unregister_gateway_notify(old_key) + except Exception: + pass + session["session_key"] = new_session_id + try: + yolo_was_on = is_session_yolo_enabled(old_key) + except Exception: + yolo_was_on = False + if yolo_was_on: + try: + enable_session_yolo(new_session_id) + disable_session_yolo(old_key) + except Exception: + pass + try: + register_gateway_notify( + new_session_id, + lambda data: _emit("approval.request", sid, data), + ) + except Exception: + pass + except Exception: + # Even if the approval module fails to import, still anchor the + # session_key on the new continuation id so downstream lookups + # don't keep targeting the ended row. + session["session_key"] = new_session_id + + session["pending_title"] = None + try: + _restart_slash_worker(session) + except Exception: + pass def _get_usage(agent) -> dict: @@ -2137,24 +2225,70 @@ def _(rid, params: dict) -> dict: return _err( rid, 4009, "session busy — /interrupt the current turn before /compress" ) + sid = params.get("session_id", "") + focus_topic = str(params.get("focus_topic", "") or "").strip() try: + from agent.manual_compression_feedback import summarize_manual_compression + from agent.model_metadata import estimate_messages_tokens_rough + with session["history_lock"]: - removed, usage = _compress_session_history( - session, str(params.get("focus_topic", "") or "").strip() - ) - messages = list(session.get("history", [])) - info = _session_info(session["agent"]) - _emit("session.info", params.get("session_id", ""), info) - return _ok( - rid, - { - "status": "compressed", - "removed": removed, - "usage": usage, - "info": info, - "messages": messages, - }, + before_messages = list(session.get("history", [])) + history_version = int(session.get("history_version", 0)) + before_count = len(before_messages) + before_tokens = ( + estimate_messages_tokens_rough(before_messages) if before_count else 0 ) + + if before_count >= 4: + focus_suffix = f', focus: "{focus_topic}"' if focus_topic else "" + _status_update( + sid, + "compressing", + f"⠋ compressing {before_count} messages " + f"(~{before_tokens:,} tok){focus_suffix}…", + ) + + try: + removed, usage = _compress_session_history( + session, + focus_topic, + approx_tokens=before_tokens, + before_messages=before_messages, + history_version=history_version, + ) + with session["history_lock"]: + messages = list(session.get("history", [])) + after_count = len(messages) + after_tokens = ( + estimate_messages_tokens_rough(messages) if after_count else 0 + ) + agent = session["agent"] + _sync_session_key_after_compress(sid, session) + summary = summarize_manual_compression( + before_messages, messages, before_tokens, after_tokens + ) + info = _session_info(agent) + _emit("session.info", sid, info) + return _ok( + rid, + { + "status": "compressed", + "removed": removed, + "before_messages": before_count, + "after_messages": after_count, + "before_tokens": before_tokens, + "after_tokens": after_tokens, + "summary": summary, + "usage": usage, + "info": info, + "messages": messages, + }, + ) + finally: + # Always clear the pinned compressing status so the bar + # reverts to neutral whether compaction succeeded, was a + # no-op, or raised. + _status_update(sid, "ready") except Exception as e: return _err(rid, 5005, str(e)) @@ -4508,8 +4642,8 @@ def _mirror_slash_side_effects(sid: str, session: dict, command: str) -> str: agent.ephemeral_system_prompt = new_prompt or None agent._cached_system_prompt = None elif name == "compress" and agent: - with session["history_lock"]: - _compress_session_history(session, arg) + _compress_session_history(session, arg) + _sync_session_key_after_compress(sid, session) _emit("session.info", sid, _session_info(agent)) elif name == "fast" and agent: mode = arg.lower() diff --git a/ui-tui/src/__tests__/createGatewayEventHandler.test.ts b/ui-tui/src/__tests__/createGatewayEventHandler.test.ts index 378f873b4b..1729f0c273 100644 --- a/ui-tui/src/__tests__/createGatewayEventHandler.test.ts +++ b/ui-tui/src/__tests__/createGatewayEventHandler.test.ts @@ -119,6 +119,19 @@ describe('createGatewayEventHandler', () => { expect(getTurnState().todos).toEqual(todos) }) + it('prints compaction progress status into the transcript', () => { + const appended: Msg[] = [] + const ctx = buildCtx(appended) + const onEvent = createGatewayEventHandler(ctx) + + onEvent({ + payload: { kind: 'compressing', text: 'compressing 968 messages (~123,400 tok)…' }, + type: 'status.update' + } as any) + + expect(ctx.system.sys).toHaveBeenCalledWith('compressing 968 messages (~123,400 tok)…') + }) + it('clears the visible todo list when the todo tool returns an empty list', () => { const appended: Msg[] = [] const todos = [{ content: 'Boil water', id: 'boil', status: 'in_progress' }] diff --git a/ui-tui/src/app/createGatewayEventHandler.ts b/ui-tui/src/app/createGatewayEventHandler.ts index 0dd190c10e..86295f67d9 100644 --- a/ui-tui/src/app/createGatewayEventHandler.ts +++ b/ui-tui/src/app/createGatewayEventHandler.ts @@ -282,6 +282,11 @@ export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: setStatus(p.text) + if (p.kind === 'compressing') { + sys(p.text) + return + } + if (!p.kind || p.kind === 'status') { return } diff --git a/ui-tui/src/app/slash/commands/session.ts b/ui-tui/src/app/slash/commands/session.ts index cfe84f9421..0a5324ef55 100644 --- a/ui-tui/src/app/slash/commands/session.ts +++ b/ui-tui/src/app/slash/commands/session.ts @@ -154,6 +154,22 @@ export const sessionCommands: SlashCommand[] = [ patchUiState(state => ({ ...state, usage: { ...state.usage, ...r.usage } })) } + if (r.summary?.headline) { + const prefix = r.summary.noop ? '' : '✓ ' + + ctx.transcript.sys(`${prefix}${r.summary.headline}`) + + if (r.summary.token_line) { + ctx.transcript.sys(` ${r.summary.token_line}`) + } + + if (r.summary.note) { + ctx.transcript.sys(` ${r.summary.note}`) + } + + return + } + if ((r.removed ?? 0) <= 0) { return ctx.transcript.sys('nothing to compress') } @@ -163,6 +179,7 @@ export const sessionCommands: SlashCommand[] = [ ) }) ) + .catch(ctx.guardedErr) } }, diff --git a/ui-tui/src/gatewayTypes.ts b/ui-tui/src/gatewayTypes.ts index 1f43096340..19f36afb2f 100644 --- a/ui-tui/src/gatewayTypes.ts +++ b/ui-tui/src/gatewayTypes.ts @@ -167,9 +167,19 @@ export interface SessionUsageResponse { } export interface SessionCompressResponse { + after_messages?: number + after_tokens?: number + before_messages?: number + before_tokens?: number info?: SessionInfo messages?: GatewayTranscriptMessage[] removed?: number + summary?: { + headline?: string + noop?: boolean + note?: null | string + token_line?: string + } usage?: Usage }