diff --git a/apps/desktop/src/app/session/hooks/use-session-actions.test.tsx b/apps/desktop/src/app/session/hooks/use-session-actions.test.tsx index a84a854ded4..31b2cc58afc 100644 --- a/apps/desktop/src/app/session/hooks/use-session-actions.test.tsx +++ b/apps/desktop/src/app/session/hooks/use-session-actions.test.tsx @@ -256,4 +256,29 @@ describe('resumeSession failure recovery', () => { expect($resumeFailedSessionId.get()).toBeNull() }) + + it('asks the backend to DEFER the agent build on a normal cold resume', async () => { + // The switch-latency fix: a non-watch cold resume tells the gateway to + // return the transcript immediately and build the agent in the background, + // rather than blocking the RPC (and the whole switch) on _make_agent. + let resumeParams: Record | undefined + + const requestGateway = vi.fn(async (method: string, params?: Record) => { + if (method === 'session.resume') { + resumeParams = params + + return { session_id: 'runtime-1', resumed: params?.session_id, messages: [], info: {} } as never + } + + return {} as never + }) + + vi.mocked(getSessionMessages).mockResolvedValue({ messages: [] } as never) + + await runResume(requestGateway) + + expect(resumeParams).toMatchObject({ defer_build: true }) + // Watch-window lazy attach is the OTHER mode; a normal resume isn't lazy. + expect(resumeParams).not.toHaveProperty('lazy') + }) }) diff --git a/apps/desktop/src/app/session/hooks/use-session-actions.ts b/apps/desktop/src/app/session/hooks/use-session-actions.ts index 36dfea759f2..5c8be9762e1 100644 --- a/apps/desktop/src/app/session/hooks/use-session-actions.ts +++ b/apps/desktop/src/app/session/hooks/use-session-actions.ts @@ -706,7 +706,12 @@ export function useSessionActions({ const resumePromise = requestGateway('session.resume', { session_id: storedSessionId, cols: 96, - ...(watchWindow ? { lazy: true } : {}), + // Watch windows attach lazily (live mirror); every other cold resume + // asks the backend to DEFER the agent build so the RPC returns the + // transcript immediately instead of blocking the switch on + // _make_agent (MCP discovery / prompt build). The agent pre-warms in + // the background and the prefetch above paints the transcript. + ...(watchWindow ? { lazy: true } : { defer_build: true }), ...(sessionProfile ? { profile: sessionProfile } : {}) }) // The rejection is consumed by the `await` below; this guard only @@ -754,7 +759,18 @@ export function useSessionActions({ return chatMessageArraysEquivalent(currentMessages, resumedMessages) ? currentMessages : resumedMessages })() - const messagesForView = preserveLocalAssistantErrors(preferredMessages, currentMessages) + // When the prefetch already painted these exact messages (the common + // cold-resume path), `preferredMessages` IS the live `$messages` array. + // Re-running preserveLocalAssistantErrors there would build a 1000-entry + // Map and map the whole transcript into a throwaway array on every + // switch — pure main-thread cost on the hot path (the downstream + // sameMessageList guard already drops the publish, so it buys nothing). + // The prefetch branch already merged local assistant errors when it + // built `localSnapshot`, so reuse the ref instead. + const messagesForView = + preferredMessages === currentMessages + ? currentMessages + : preserveLocalAssistantErrors(preferredMessages, currentMessages) setActiveSessionId(resumed.session_id) activeSessionIdRef.current = resumed.session_id diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 06e3ad5d7b9..b64086c91e8 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -901,6 +901,11 @@ DEFAULT_CONFIG = { # Global active chat session cap across CLI, TUI/dashboard, and messaging. # None/0 = unbounded. "max_concurrent_sessions": None, + # Soft LRU cap on in-memory TUI/desktop/dashboard sessions. When more than + # this many are live, the gateway evicts the least-recently-active DETACHED + # sessions (no live client) so accumulated agents don't pile up under memory + # pressure. Reopening one re-resumes it from disk. 0/null disables. + "max_live_sessions": 16, "agent": { "max_turns": 90, # Inactivity timeout for gateway agent execution (seconds). diff --git a/tests/tui_gateway/test_protocol.py b/tests/tui_gateway/test_protocol.py index 3b385bf825a..8b45a672aed 100644 --- a/tests/tui_gateway/test_protocol.py +++ b/tests/tui_gateway/test_protocol.py @@ -336,6 +336,146 @@ def test_session_resume_returns_hydrated_messages(server, monkeypatch): ] +def test_session_resume_defer_build_returns_transcript_without_blocking(server, monkeypatch): + """``defer_build: true`` (desktop cold resume) must return the full display + transcript immediately and register an upgradable live session WITHOUT + building the agent on the response path — that eager build is the + multi-second switch latency.""" + + target = "20260409_010101_abc123" + + class _DB: + def get_session(self, _sid): + return { + "id": target, + "model": "vendor/cool-model", + "model_config": {"provider": "vendor"}, + } + + def get_session_by_title(self, _title): + return None + + def resolve_resume_session_id(self, sid): + return sid + + def reopen_session(self, _sid): + return None + + def get_messages_as_conversation(self, _sid, include_ancestors=False): + return [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "yo"}, + ] + + builds: list = [] + + monkeypatch.setattr(server, "_get_db", lambda: _DB()) + # The response path must never call _make_agent; route the deferred timer + # through a recorder so a 50ms fire can't build (or crash) under the test. + monkeypatch.setattr( + server, "_make_agent", lambda *a, **k: (_ for _ in ()).throw(AssertionError("no eager build")) + ) + monkeypatch.setattr(server, "_start_agent_build", lambda sid, session: builds.append(sid)) + monkeypatch.setattr(server, "_schedule_session_cap_enforcement", lambda: None) + + resp = server.handle_request( + { + "id": "r1", + "method": "session.resume", + "params": {"session_id": target, "cols": 100, "defer_build": True}, + } + ) + + assert "error" not in resp + result = resp["result"] + assert result["resumed"] == target + assert result["session_key"] == target + assert result["message_count"] == 2 + assert result["messages"] == [ + {"role": "user", "text": "hello"}, + {"role": "assistant", "text": "yo"}, + ] + # Lazy info contract (same shape session.create returns), with the session's + # persisted model/provider restored rather than the global default. + assert result["info"]["lazy"] is True + assert result["info"]["model"] == "vendor/cool-model" + assert result["info"]["provider"] == "vendor" + assert result["info"]["desktop_contract"] == server.DESKTOP_BACKEND_CONTRACT + + sid = result["session_id"] + session = server._sessions[sid] + # Registered but not built: agent is None and the resume key is carried so a + # later prompt.submit / _sess() upgrade continues THIS stored conversation. + assert session["agent"] is None + assert session["resume_session_id"] == target + assert not session["agent_ready"].is_set() + # Not a watch spectator: a normal deferred resume is a real session. + assert not session.get("lazy") + # The persisted runtime identity is stashed for the deferred build so it + # can't drop the provider ("No LLM provider configured"). + assert session["resume_runtime_overrides"]["model_override"]["model"] == "vendor/cool-model" + assert server._find_live_session_by_key(target) == (sid, session) + + +def test_enforce_session_cap_evicts_oldest_detached_only(server, monkeypatch): + """The LRU cap frees the least-recently-active DETACHED sessions when over + the limit, and never a live-transport / running / mid-build one.""" + + monkeypatch.setattr(server, "_load_cfg", lambda: {"max_live_sessions": 2}) + evicted: list[str] = [] + monkeypatch.setattr( + server, "_close_session_by_id", lambda sid, end_reason=None: evicted.append(sid) + ) + + def _ready() -> threading.Event: + ev = threading.Event() + ev.set() + return ev + + detached = server._detached_ws_transport + live = object() # no _closed attr -> live transport, never evictable + + server._sessions.clear() + server._sessions.update( + { + "old_detached": {"transport": detached, "last_active": 100.0, "agent_ready": _ready()}, + "new_detached": {"transport": detached, "last_active": 300.0, "agent_ready": _ready()}, + "running_detached": { + "transport": detached, + "last_active": 50.0, + "running": True, + "agent_ready": _ready(), + }, + "focused_live": {"transport": live, "last_active": 200.0, "agent_ready": _ready()}, + } + ) + + server._enforce_session_cap() + + # 4 sessions, cap 2 -> evict 2. Only detached+idle+built are eligible, oldest + # first; the running one and the live-transport one are exempt. + assert evicted == ["old_detached", "new_detached"] + + +def test_enforce_session_cap_disabled_is_noop(server, monkeypatch): + monkeypatch.setattr(server, "_load_cfg", lambda: {"max_live_sessions": 0}) + evicted: list[str] = [] + monkeypatch.setattr( + server, "_close_session_by_id", lambda sid, end_reason=None: evicted.append(sid) + ) + server._sessions.clear() + server._sessions.update( + { + f"s{i}": {"transport": server._detached_ws_transport, "last_active": float(i)} + for i in range(5) + } + ) + + server._enforce_session_cap() + + assert evicted == [] + + def test_session_resume_handles_multimodal_list_content(server, monkeypatch): """A user message persisted with list-shaped multimodal content used to crash session resume with ``'list' object has no attribute 'strip'``.""" diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 2bd20968590..40910e291ba 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -741,6 +741,79 @@ def _reap_idle_sessions() -> None: victims = [sid for sid, s in _sessions.items() if _session_is_evictable(sid, s, now)] for sid in victims: _close_session_by_id(sid, end_reason="idle_timeout") + _enforce_session_cap() + + +# Soft LRU cap on in-memory sessions. The 6h TTL reaper above only frees +# sessions that have been idle for hours; a heavy user who reconnects often +# accumulates detached sessions (the report's ``detached_sessions=5``) whose +# agents sit resident for the full TTL. The cap evicts the least-recently-active +# DETACHED sessions sooner so live agents don't pile up under memory pressure. +# Default-on but provably safe: it only touches sessions with no live client +# (reopening re-resumes them from the DB) and never a running / pending / +# mid-build / live-transport one. 0/null disables. +def _max_live_sessions() -> int: + try: + from hermes_cli.active_sessions import coerce_max_concurrent_sessions + + cfg = _load_cfg() or {} + raw = cfg.get("max_live_sessions") + if raw is None: + gateway_cfg = cfg.get("gateway") + if isinstance(gateway_cfg, dict): + raw = gateway_cfg.get("max_live_sessions") + coerced = coerce_max_concurrent_sessions(raw, key="max_live_sessions") + return int(coerced) if coerced else 0 + except Exception: + return 0 + + +def _session_is_lru_evictable(sid: str, session: dict) -> bool: + # Same hard exemptions as the TTL reaper (never evict a session mid-turn, + # awaiting input, or still building), but WITHOUT the hours-scale age gate: + # a detached session is eligible the moment it loses its client. + if session.get("running") or _session_pending_kind(sid): + return False + ready = session.get("agent_ready") + if ready is not None and not ready.is_set() and not session.get("lazy"): + return False + return _transport_is_dead(session.get("transport")) + + +def _enforce_session_cap() -> None: + cap = _max_live_sessions() + if cap <= 0: + return + with _sessions_lock: + total = len(_sessions) + if total <= cap: + return + evictable = [ + (sid, s) for sid, s in _sessions.items() if _session_is_lru_evictable(sid, s) + ] + # Oldest-touched first; only evict down to the cap (live/focused sessions on + # a live transport are never eligible, so we may stop short of the cap). + evictable.sort(key=lambda kv: float(kv[1].get("last_active") or 0.0)) + overflow = total - cap + for sid, _s in evictable[:overflow]: + _close_session_by_id(sid, end_reason="lru_evict") + + +def _schedule_session_cap_enforcement() -> None: + """Run the LRU sweep off the response path (eviction can call agent.close).""" + try: + timer = threading.Timer(0.1, lambda: _safe_enforce_session_cap()) + timer.daemon = True + timer.start() + except Exception: + pass + + +def _safe_enforce_session_cap() -> None: + try: + _enforce_session_cap() + except Exception: + logger.debug("session cap enforcement failed", exc_info=True) def _start_idle_reaper() -> None: @@ -1111,15 +1184,24 @@ def _start_agent_build(sid: str, session: dict) -> None: kw = {"session_db": session_db} if resume_sid := current.get("resume_session_id"): kw["session_id"] = resume_sid - # Model/effort/fast the desktop picked for a brand-new chat ride - # in as per-session overrides so the first build uses them - # directly (no global config, no build-then-switch). - if override := current.get("model_override"): - kw["model_override"] = override - if (reasoning := current.get("create_reasoning_override")) is not None: - kw["reasoning_config_override"] = reasoning - if (tier := current.get("create_service_tier_override")) is not None: - kw["service_tier_override"] = tier + resume_overrides = current.get("resume_runtime_overrides") + if isinstance(resume_overrides, dict) and resume_overrides: + # Cold deferred resume: restore the full persisted runtime + # identity (model/provider/base_url/api_mode/reasoning/tier) + # exactly as the eager resume path's _stored_session_runtime_ + # overrides splat did, so a deferred build can't drop the + # provider and fail with "No LLM provider configured". + kw.update(resume_overrides) + else: + # Model/effort/fast the desktop picked for a brand-new chat + # ride in as per-session overrides so the first build uses + # them directly (no global config, no build-then-switch). + if override := current.get("model_override"): + kw["model_override"] = override + if (reasoning := current.get("create_reasoning_override")) is not None: + kw["reasoning_config_override"] = reasoning + if (tier := current.get("create_service_tier_override")) is not None: + kw["service_tier_override"] = tier agent = _make_agent(sid, key, **kw) finally: _clear_session_context(tokens) @@ -4602,6 +4684,8 @@ def _(rid, params: dict) -> dict: build_timer = threading.Timer(0.05, _deferred_build) build_timer.daemon = True build_timer.start() + # A new live session just landed; trim detached idle ones over the cap. + _schedule_session_cap_enforcement() return _ok( rid, @@ -4960,6 +5044,135 @@ def _(rid, params: dict) -> dict: }, ) + # Deferred build (desktop cold resume): register the live session and read + # its stored transcript WITHOUT building the agent on the response path. + # _make_agent can block for seconds (MCP discovery, prompt/skill build, + # AIAgent construction), and the desktop awaits this RPC before it paints + # the chat — so the eager build below is the bulk of the multi-second + # "switching sessions is frozen" latency. We return the full display + # transcript immediately and pre-warm the agent on a short timer (the same + # deferred-build contract session.create uses); _sess() also builds on + # demand if the first prompt beats the timer. Distinct from the lazy/watch + # branch above: a normal resume restores the full ancestor history and the + # session's persisted runtime identity, and is a real (upgradable) session, + # not a never-built spectator window. + if is_truthy_value(params.get("defer_build", False)): + sid = uuid.uuid4().hex[:8] + lease, limit_message = _claim_active_session_slot(target, live_session_id=sid) + if limit_message is not None: + return _err(rid, 4090, limit_message) + try: + db.reopen_session(target) + history = db.get_messages_as_conversation(target) + display_history = db.get_messages_as_conversation( + target, include_ancestors=True + ) + except Exception as e: + if lease is not None: + lease.release() + return _err(rid, 5000, f"resume failed: {e}") + display_history_prefix = display_history[ + : max(0, len(display_history) - len(history)) + ] + messages = _history_to_messages(display_history) + # Restore the model/provider/reasoning/tier this chat actually used so + # the deferred build (and the immediate info payload) match the eager + # path — without these a deferred build drops the provider and resume + # fails with "No LLM provider configured". + stored_runtime_overrides = _stored_session_runtime_overrides(found) or {} + cwd = profile_resume_cwd or os.getenv("TERMINAL_CWD", os.getcwd()) + now = time.time() + source = str(params.get("source") or "tui").strip() or "tui" + with _session_resume_lock: + live = _find_live_session_by_key(target) + if live is not None: + if lease is not None: + lease.release() + return _ok(rid, _reuse_live_payload(*live)) + with _sessions_lock: + _sessions[sid] = { + "agent": None, + "agent_error": None, + "agent_ready": threading.Event(), + "attached_images": [], + "close_on_disconnect": is_truthy_value( + params.get("close_on_disconnect", False) + ), + "active_session_lease": lease, + "cols": cols, + "created_at": now, + "display_history_prefix": display_history_prefix, + "edit_snapshots": {}, + "explicit_cwd": False, + "history": history, + "history_lock": threading.Lock(), + "history_version": 0, + "image_counter": 0, + "cwd": cwd, + "inflight_turn": None, + "last_active": now, + "model_override": stored_runtime_overrides.get("model_override"), + "pending_title": None, + "profile_home": str(profile_home) if profile_home is not None else None, + "resume_session_id": target, + "resume_runtime_overrides": stored_runtime_overrides or None, + "running": False, + "session_key": target, + "show_reasoning": _load_show_reasoning(), + "source": source, + "slash_worker": None, + "tool_progress_mode": _load_tool_progress_mode(), + "tool_started_at": {}, + "transport": current_transport() or _stdio_transport, + } + _register_session_cwd(_sessions[sid]) + + def _deferred_build() -> None: + session = _sessions.get(sid) + if session is not None: + _start_agent_build(sid, session) + + build_timer = threading.Timer(0.05, _deferred_build) + build_timer.daemon = True + build_timer.start() + # A new live session just landed; trim detached idle ones over the cap. + _schedule_session_cap_enforcement() + + model_override = stored_runtime_overrides.get("model_override") + resumed_model = "" + if isinstance(model_override, dict): + resumed_model = str(model_override.get("model") or "").strip() + elif isinstance(model_override, str): + resumed_model = model_override.strip() + info = { + "cwd": cwd, + "branch": _git_branch_for_cwd(cwd), + "model": resumed_model or _resolve_model(), + "tools": {}, + "skills": {}, + "lazy": True, + "desktop_contract": DESKTOP_BACKEND_CONTRACT, + "profile_name": _current_profile_name(), + } + provider_override = stored_runtime_overrides.get("provider_override") + if provider_override: + info["provider"] = provider_override + return _ok( + rid, + { + "session_id": sid, + "resumed": target, + "message_count": len(messages), + "messages": messages, + "info": info, + "inflight": None, + "running": False, + "session_key": target, + "started_at": now, + "status": "idle", + }, + ) + # Build the agent OUTSIDE the lock — _make_agent can block for seconds # (MCP discovery, prompt/skill build, AIAgent construction). Holding # _session_resume_lock across it would stall session.close on the main