diff --git a/agent/conversation_compression.py b/agent/conversation_compression.py index a5f0f9e3fba..d5469a1b344 100644 --- a/agent/conversation_compression.py +++ b/agent/conversation_compression.py @@ -40,6 +40,16 @@ from agent.model_metadata import estimate_request_tokens_rough logger = logging.getLogger(__name__) +# Stable marker the gateway matches on to re-tag the auto-compaction lifecycle +# status as ``kind="compacting"`` (tui_gateway/server.py::_status_update), so +# drivers like the desktop app can show an explicit "Summarizing…" indicator +# instead of the transcript appearing to silently reset. Keep the marker phrase +# intact if you reword COMPACTION_STATUS. +COMPACTION_STATUS_MARKER = "Compacting context" +COMPACTION_STATUS = ( + f"πŸ—œοΈ {COMPACTION_STATUS_MARKER} β€” summarizing earlier conversation so I can continue..." +) + def _compression_lock_holder(agent: Any) -> str: """Build a unique holder id for the lock: pid:tid:agent-instance:uuid. @@ -324,9 +334,7 @@ def compress_context( f"{approx_tokens:,}" if approx_tokens else "unknown", agent.model, focus_topic, ) - agent._emit_status( - "πŸ—œοΈ Compacting context β€” summarizing earlier conversation so I can continue..." - ) + agent._emit_status(COMPACTION_STATUS) # ── Compression lock ──────────────────────────────────────────────── # Atomic, state.db-backed lock per session_id. Without this, two @@ -799,6 +807,8 @@ def try_shrink_image_parts_in_messages( __all__ = [ + "COMPACTION_STATUS", + "COMPACTION_STATUS_MARKER", "check_compression_model_feasibility", "replay_compression_warning", "compress_context", diff --git a/apps/desktop/src/app/session/hooks/use-message-stream.ts b/apps/desktop/src/app/session/hooks/use-message-stream.ts index bd2f6b64ccd..c4718b88ac3 100644 --- a/apps/desktop/src/app/session/hooks/use-message-stream.ts +++ b/apps/desktop/src/app/session/hooks/use-message-stream.ts @@ -27,6 +27,7 @@ import { triggerHaptic } from '@/lib/haptics' import { isProviderSetupErrorMessage } from '@/lib/provider-setup-errors' import { parseTodos } from '@/lib/todos' import { setClarifyRequest } from '@/store/clarify' +import { setSessionCompacting } from '@/store/compaction' import { refreshBackgroundProcesses } from '@/store/composer-status' import { $gateway } from '@/store/gateway' import { dispatchNativeNotification } from '@/store/native-notifications' @@ -333,6 +334,8 @@ export function useMessageStream({ const flushHandleRef = useRef(null) const lastFlushAtRef = useRef(0) const nativeSubagentSessionsRef = useRef>(new Set()) + // Turns that auto-compacted: skip post-turn hydrate so live scrollback survives. + const compactedTurnRef = useRef>(new Set()) const flushQueuedDeltas = useCallback( (sessionId?: string) => { @@ -639,6 +642,10 @@ export function useMessageStream({ void refreshSessions().catch(() => undefined) + if (compactedTurnRef.current.delete(sessionId)) { + shouldHydrate = false + } + if (shouldHydrate) { void hydrateFromStoredSession(3, completedState.storedSessionId, sessionId) } @@ -825,6 +832,8 @@ export function useMessageStream({ flushQueuedDeltas(sessionId) clearSessionSubagents(sessionId) + setSessionCompacting(sessionId, false) + compactedTurnRef.current.delete(sessionId) nativeSubagentSessionsRef.current.delete(sessionId) if (isActiveEvent) { @@ -870,6 +879,7 @@ export function useMessageStream({ // session so a background turn finishing can't wipe the active chat's // prompt, and vice versa. clearAllPrompts(sessionId) + setSessionCompacting(sessionId, false) flushQueuedDeltas(sessionId) @@ -904,10 +914,7 @@ export function useMessageStream({ // terminal/process tool calls are the only things that spawn or reap // background processes β€” sync the composer status stack right after. - if ( - !sessionInterrupted(sessionId) && - (payload?.name === 'terminal' || payload?.name === 'process') - ) { + if (!sessionInterrupted(sessionId) && (payload?.name === 'terminal' || payload?.name === 'process')) { void refreshBackgroundProcesses(sessionId) } } @@ -1061,9 +1068,12 @@ export function useMessageStream({ }) } } else if (event.type === 'status.update') { - // The gateway's notification poller announces background process - // completions / watch matches here β€” re-sync the status stack. - if (sessionId && payload?.kind === 'process') { + if (sessionId && payload?.kind === 'compacting') { + setSessionCompacting(sessionId, true) + compactedTurnRef.current.add(sessionId) + } else if (sessionId && payload?.kind === 'process') { + // The gateway's notification poller announces background process + // completions / watch matches here β€” re-sync the status stack. void refreshBackgroundProcesses(sessionId) } } else if (event.type === 'error') { @@ -1075,6 +1085,8 @@ export function useMessageStream({ // the failed turn (same intent as the message.complete clear). if (sessionId) { clearAllPrompts(sessionId) + setSessionCompacting(sessionId, false) + compactedTurnRef.current.delete(sessionId) } dispatchNativeNotification({ diff --git a/apps/desktop/src/components/assistant-ui/thread.tsx b/apps/desktop/src/components/assistant-ui/thread.tsx index 48b6fd0c823..8fbf695e012 100644 --- a/apps/desktop/src/components/assistant-ui/thread.tsx +++ b/apps/desktop/src/components/assistant-ui/thread.tsx @@ -96,6 +96,7 @@ import { extractPreviewTargets } from '@/lib/preview-targets' import { useEnterAnimation } from '@/lib/use-enter-animation' import { cn } from '@/lib/utils' import { playSpeechText, stopVoicePlayback } from '@/lib/voice-playback' +import { $compactionActive } from '@/store/compaction' import type { ComposerAttachment } from '@/store/composer' import { notifyError } from '@/store/notifications' import { $connection } from '@/store/session' @@ -273,10 +274,7 @@ const AssistantMessage: FC<{ onBranchInNewChat?: (messageId: string) => void }> return pickPrimaryPreviewTarget(extractPreviewTargets(completedText)) }, [completedText]) - const getMessageText = useCallback( - () => messageContentText(messageRuntime.getState().content), - [messageRuntime] - ) + const getMessageText = useCallback(() => messageContentText(messageRuntime.getState().content), [messageRuntime]) const enterRef = useEnterAnimation(isRunning, `assistant-message:${messageId}`) @@ -339,13 +337,25 @@ const StatusRow: FC<{ children: ReactNode; label: string } & React.ComponentProp ) +// Fixed label while auto-compaction runs β€” decoupled from backend status text. +const COMPACTION_LABEL = 'Summarizing thread' + +const CompactionHint: FC = () => ( + {COMPACTION_LABEL} +) + const ResponseLoadingIndicator: FC = () => { const { t } = useI18n() const elapsed = useElapsedSeconds() + const compacting = useStore($compactionActive) return ( - + ) @@ -380,6 +390,7 @@ const StreamStallIndicator: FC = () => { }) const [stalled, setStalled] = useState(false) + const compacting = useStore($compactionActive) useEffect(() => { setStalled(false) @@ -388,15 +399,21 @@ const StreamStallIndicator: FC = () => { return () => window.clearTimeout(id) }, [activity]) - const elapsed = useElapsedSeconds(stalled) + const active = stalled || compacting + const elapsed = useElapsedSeconds(active) - if (!stalled) { + if (!active) { return null } return ( - + ) diff --git a/apps/desktop/src/store/compaction.test.ts b/apps/desktop/src/store/compaction.test.ts new file mode 100644 index 00000000000..f8388a3afc0 --- /dev/null +++ b/apps/desktop/src/store/compaction.test.ts @@ -0,0 +1,53 @@ +import { afterEach, beforeEach, describe, expect, it } from 'vitest' + +import { $compactingSessions, $compactionActive, setSessionCompacting } from './compaction' +import { $activeSessionId } from './session' + +describe('compaction store', () => { + beforeEach(() => { + $compactingSessions.set({}) + $activeSessionId.set(null) + }) + + afterEach(() => { + $compactingSessions.set({}) + $activeSessionId.set(null) + }) + + it('tracks compaction per session independently', () => { + setSessionCompacting('session-a', true) + setSessionCompacting('session-b', true) + + expect($compactingSessions.get()).toEqual({ 'session-a': true, 'session-b': true }) + }) + + it('exposes only the active session via the focus-scoped view', () => { + setSessionCompacting('session-a', true) + + expect($compactionActive.get()).toBe(false) + + $activeSessionId.set('session-a') + expect($compactionActive.get()).toBe(true) + + $activeSessionId.set('session-b') + expect($compactionActive.get()).toBe(false) + }) + + it('clears a session without disturbing the others', () => { + setSessionCompacting('session-a', true) + setSessionCompacting('session-b', true) + + setSessionCompacting('session-a', false) + + expect($compactingSessions.get()).toEqual({ 'session-b': true }) + }) + + it('is a no-op when clearing an unknown session', () => { + setSessionCompacting('session-a', true) + const before = $compactingSessions.get() + + setSessionCompacting('session-missing', false) + + expect($compactingSessions.get()).toBe(before) + }) +}) diff --git a/apps/desktop/src/store/compaction.ts b/apps/desktop/src/store/compaction.ts new file mode 100644 index 00000000000..b35e35b4b69 --- /dev/null +++ b/apps/desktop/src/store/compaction.ts @@ -0,0 +1,38 @@ +import { atom, computed } from 'nanostores' + +import { $activeSessionId } from './session' + +// Per-session flag while auto-compaction runs mid-turn. Without it the +// transcript looks like it reset; per-session so a background chat can't +// clobber the foreground view. +const keyFor = (sessionId: string | null | undefined): string => sessionId ?? '' + +export const $compactingSessions = atom>({}) + +export const $compactionActive = computed( + [$compactingSessions, $activeSessionId], + (sessions, activeId) => keyFor(activeId) in sessions +) + +export function setSessionCompacting(sessionId: string | null | undefined, active: boolean): void { + const key = keyFor(sessionId) + const sessions = $compactingSessions.get() + + if (active) { + if (key in sessions) { + return + } + + $compactingSessions.set({ ...sessions, [key]: true }) + + return + } + + if (!(key in sessions)) { + return + } + + const next = { ...sessions } + delete next[key] + $compactingSessions.set(next) +} diff --git a/tests/tui_gateway/test_compaction_status.py b/tests/tui_gateway/test_compaction_status.py new file mode 100644 index 00000000000..0e98bde1854 --- /dev/null +++ b/tests/tui_gateway/test_compaction_status.py @@ -0,0 +1,73 @@ +"""Auto-compaction status re-tagging for the desktop "Summarizing…" indicator. + +Auto-compaction reaches the gateway as a generic ``lifecycle`` status. The +gateway re-tags it as ``kind="compacting"`` so drivers (the desktop app) can +show an explicit summarizing indicator instead of the transcript appearing to +silently reset mid-turn. +""" + +from __future__ import annotations + +import importlib + +from unittest.mock import MagicMock, patch + +import pytest + + +@pytest.fixture() +def server(): + with patch.dict( + "sys.modules", + { + "hermes_constants": MagicMock( + get_hermes_home=MagicMock(return_value="/tmp/hermes_test_compaction") + ), + "hermes_cli.env_loader": MagicMock(), + "hermes_cli.banner": MagicMock(), + "hermes_state": MagicMock(), + }, + ): + yield importlib.import_module("tui_gateway.server") + + +def _capture(server, monkeypatch): + events: list[dict] = [] + monkeypatch.setattr( + server, "_emit", lambda event, sid, payload=None: events.append(payload or {}) + ) + return events + + +def test_compaction_lifecycle_is_retagged(server, monkeypatch): + from agent.conversation_compression import COMPACTION_STATUS + + events = _capture(server, monkeypatch) + server._status_update("sid", "lifecycle", COMPACTION_STATUS) + + assert events == [{"kind": "compacting", "text": COMPACTION_STATUS}] + + +def test_other_lifecycle_status_stays_lifecycle(server, monkeypatch): + events = _capture(server, monkeypatch) + server._status_update("sid", "lifecycle", "❌ Rate limited after 5 retries") + + assert events[0]["kind"] == "lifecycle" + + +def test_manual_compressing_kind_is_preserved(server, monkeypatch): + events = _capture(server, monkeypatch) + server._status_update("sid", "compressing", "β ‹ compressing 40 messages…") + + assert events[0]["kind"] == "compressing" + + +def test_compaction_status_contains_marker(): + # Contract: the gateway matches COMPACTION_STATUS_MARKER inside the emitted + # status text. If the message is reworded, the marker must survive. + from agent.conversation_compression import ( + COMPACTION_STATUS, + COMPACTION_STATUS_MARKER, + ) + + assert COMPACTION_STATUS_MARKER in COMPACTION_STATUS diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 774deb89f6a..58cc406deb5 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -757,11 +757,16 @@ def _status_update(sid: str, kind: str, text: str | None = None): body = (text if text is not None else kind).strip() if not body: return - _emit( - "status.update", - sid, - {"kind": kind if text is not None else "status", "text": body}, - ) + out_kind = kind if text is not None else "status" + # Auto-compaction reaches us as a generic "lifecycle" status. Re-tag it so + # drivers (desktop app) can show an explicit "Summarizing…" indicator β€” + # otherwise a mid-turn compaction looks like the transcript reset itself. + if out_kind == "lifecycle": + from agent.conversation_compression import COMPACTION_STATUS_MARKER + + if COMPACTION_STATUS_MARKER in body: + out_kind = "compacting" + _emit("status.update", sid, {"kind": out_kind, "text": body}) def _estimate_image_tokens(width: int, height: int) -> int: