From 1ca1f9f2c7a19e92da5e5f13f4880f409b99635b Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Thu, 25 Jun 2026 13:54:53 -0500 Subject: [PATCH] refactor(tui_gateway): DRY the deferred-session paths Collapse the duplicated cold-resume / lazy-watch / create scaffolding into shared helpers: _deferred_session_record (the live-session dict minus the agent), _lazy_resume_info (the not-yet-built session.info), _claim_or_reuse_live (lock + double-checked register-or-reuse), and _schedule_agent_build (the pre-warm timer). Net -12 lines, three copies of the ~30-key session dict and the lazy-info block down to one each. No behavior change. --- .../app/session/hooks/use-session-actions.ts | 11 +- tui_gateway/server.py | 337 +++++++++--------- 2 files changed, 168 insertions(+), 180 deletions(-) diff --git a/apps/desktop/src/app/session/hooks/use-session-actions.ts b/apps/desktop/src/app/session/hooks/use-session-actions.ts index efc99d5b84c..fb06c5a6048 100644 --- a/apps/desktop/src/app/session/hooks/use-session-actions.ts +++ b/apps/desktop/src/app/session/hooks/use-session-actions.ts @@ -759,14 +759,9 @@ export function useSessionActions({ return chatMessageArraysEquivalent(currentMessages, resumedMessages) ? currentMessages : resumedMessages })() - // When the prefetch already painted these exact messages (the common - // cold-resume path), `preferredMessages` IS the live `$messages` array. - // Re-running preserveLocalAssistantErrors there would build a 1000-entry - // Map and map the whole transcript into a throwaway array on every - // switch — pure main-thread cost on the hot path (the downstream - // sameMessageList guard already drops the publish, so it buys nothing). - // The prefetch branch already merged local assistant errors when it - // built `localSnapshot`, so reuse the ref instead. + // Prefetch-hit fast path: `preferredMessages` IS the live `$messages` + // array (already error-merged when `localSnapshot` was built), so reuse + // the ref instead of rebuilding a throwaway transcript+Map every switch. const messagesForView = preferredMessages === currentMessages ? currentMessages diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 6891c49eceb..624ce6b7be8 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -801,19 +801,16 @@ def _enforce_session_cap() -> None: def _schedule_session_cap_enforcement() -> None: """Run the LRU sweep off the response path (eviction can call agent.close).""" - try: - timer = threading.Timer(0.1, lambda: _safe_enforce_session_cap()) - timer.daemon = True - timer.start() - except Exception: - pass + def _run(): + try: + _enforce_session_cap() + except Exception: + logger.debug("session cap enforcement failed", exc_info=True) -def _safe_enforce_session_cap() -> None: - try: - _enforce_session_cap() - except Exception: - logger.debug("session cap enforcement failed", exc_info=True) + timer = threading.Timer(0.1, _run) + timer.daemon = True + timer.start() def _start_idle_reaper() -> None: @@ -4689,16 +4686,8 @@ def _(rid, params: dict) -> dict: # + skeleton panel, then build the real AIAgent just after this response is # flushed. This keeps startup responsive while still hydrating tools/skills # without requiring the user to submit a first prompt. - def _deferred_build() -> None: - session = _sessions.get(sid) - if session is not None: - _start_agent_build(sid, session) - - build_timer = threading.Timer(0.05, _deferred_build) - build_timer.daemon = True - build_timer.start() - # A new live session just landed; trim detached idle ones over the cap. - _schedule_session_cap_enforcement() + _schedule_agent_build(sid) + _schedule_session_cap_enforcement() # trim detached idle sessions over the cap return _ok( rid, @@ -4866,6 +4855,110 @@ def _(rid, params: dict) -> dict: return _ok(rid, {"verification": {"status": "unknown", "evidence": None}}) +def _lazy_resume_info(cwd: str, *, model: str = "", provider: str = "") -> dict: + """session.info for a not-yet-built session (the shape session.create + returns). tools/skills land later when the deferred build emits session.info.""" + info = { + "cwd": cwd, + "branch": _git_branch_for_cwd(cwd), + "model": model or _resolve_model(), + "tools": {}, + "skills": {}, + "lazy": True, + "desktop_contract": DESKTOP_BACKEND_CONTRACT, + "profile_name": _current_profile_name(), + } + if provider: + info["provider"] = provider + return info + + +def _deferred_session_record( + session_key: str, + *, + cols: int, + cwd: str, + history: list, + lease, + source: str = "tui", + close_on_disconnect: bool = False, + display_history_prefix: list | None = None, + profile_home: Path | None = None, + lazy: bool = False, + model_override=None, + resume_runtime_overrides: dict | None = None, +) -> dict: + """A live-session record whose AIAgent is built later (lazy watch / cold + resume) — _init_session's shape minus the agent.""" + now = time.time() + return { + "agent": None, + "agent_error": None, + "agent_ready": threading.Event(), + "attached_images": [], + "close_on_disconnect": close_on_disconnect, + "active_session_lease": lease, + "cols": cols, + "created_at": now, + "cwd": cwd, + "display_history_prefix": display_history_prefix or [], + "edit_snapshots": {}, + "explicit_cwd": False, + "history": history, + "history_lock": threading.Lock(), + "history_version": 0, + "image_counter": 0, + "inflight_turn": None, + "last_active": now, + "lazy": lazy, + "model_override": model_override, + "pending_title": None, + "profile_home": str(profile_home) if profile_home is not None else None, + "resume_runtime_overrides": resume_runtime_overrides, + "resume_session_id": session_key, + "running": False, + "session_key": session_key, + "show_reasoning": _load_show_reasoning(), + "slash_worker": None, + "source": source, + "tool_progress_mode": _load_tool_progress_mode(), + "tool_started_at": {}, + "transport": current_transport() or _stdio_transport, + } + + +def _claim_or_reuse_live( + sid: str, session_key: str, record: dict, lease +) -> tuple[str, dict] | None: + """Register ``record`` as the live session for ``session_key`` under the + resume lock, or — if a concurrent resume already won — release ``lease`` and + return the winner for the caller to reuse.""" + with _session_resume_lock: + live = _find_live_session_by_key(session_key) + if live is not None: + if lease is not None: + lease.release() + return live + with _sessions_lock: + _sessions[sid] = record + _register_session_cwd(_sessions[sid]) + return None + + +def _schedule_agent_build(sid: str, delay: float = 0.05) -> None: + """Pre-warm a deferred session's agent off the response path (session.create + and cold resume both build through here; _sess() also builds on demand).""" + + def _run(): + session = _sessions.get(sid) + if session is not None: + _start_agent_build(sid, session) + + timer = threading.Timer(delay, _run) + timer.daemon = True + timer.start() + + @method("session.resume") def _(rid, params: dict) -> dict: target = params.get("session_id", "") @@ -4973,65 +5066,31 @@ def _(rid, params: dict) -> dict: return _err(rid, 4090, limit_message) try: db.reopen_session(target) - # The child's OWN conversation only. Delegation children are - # parent-linked rows, so include_ancestors would prepend the - # parent's entire transcript — a watch window opened on a subagent - # must show the subagent's branch, not the parent's prompt. + # The child's OWN conversation only — include_ancestors would prepend + # the parent's transcript onto the subagent's branch. history = db.get_messages_as_conversation(target) except Exception as e: if lease is not None: lease.release() return _err(rid, 5000, f"resume failed: {e}") - messages = _history_to_messages(history) cwd = profile_resume_cwd or os.getenv("TERMINAL_CWD", os.getcwd()) - now = time.time() - # A delegated child mid-run emits no native session events of its own — - # report its liveness from the relay registry so the window paints a - # busy indicator instead of a dead idle transcript. + record = _deferred_session_record( + target, + cols=cols, + cwd=cwd, + history=history, + lease=lease, + source=str(params.get("source") or "tui").strip() or "tui", + close_on_disconnect=is_truthy_value(params.get("close_on_disconnect", False)), + profile_home=profile_home, + lazy=True, + ) + if (live := _claim_or_reuse_live(sid, target, record, lease)) is not None: + return _ok(rid, _reuse_live_payload(*live)) + # A delegated child mid-run emits no session events of its own — report + # its liveness from the relay registry so the window shows a busy turn. child_running = _child_run_active(target) - source = str(params.get("source") or "tui").strip() or "tui" - with _session_resume_lock: - live = _find_live_session_by_key(target) - if live is not None: - if lease is not None: - lease.release() - return _ok(rid, _reuse_live_payload(*live)) - with _sessions_lock: - _sessions[sid] = { - "agent": None, - "agent_error": None, - "agent_ready": threading.Event(), - "attached_images": [], - "close_on_disconnect": is_truthy_value( - params.get("close_on_disconnect", False) - ), - "active_session_lease": lease, - "cols": cols, - "created_at": now, - "display_history_prefix": [], - "edit_snapshots": {}, - "explicit_cwd": False, - "history": history, - "history_lock": threading.Lock(), - "history_version": 0, - "image_counter": 0, - "cwd": cwd, - "inflight_turn": None, - "last_active": now, - "lazy": True, - "pending_title": None, - "profile_home": str(profile_home) if profile_home is not None else None, - "resume_session_id": target, - "running": False, - "session_key": target, - "show_reasoning": _load_show_reasoning(), - "source": source, - "slash_worker": None, - "tool_progress_mode": _load_tool_progress_mode(), - "tool_started_at": {}, - "transport": current_transport() or _stdio_transport, - } - _register_session_cwd(_sessions[sid]) + messages = _history_to_messages(history) return _ok( rid, { @@ -5039,20 +5098,11 @@ def _(rid, params: dict) -> dict: "resumed": target, "message_count": len(messages), "messages": messages, - "info": { - "cwd": cwd, - "branch": _git_branch_for_cwd(cwd), - "model": _resolve_model(), - "tools": {}, - "skills": {}, - "lazy": True, - "desktop_contract": DESKTOP_BACKEND_CONTRACT, - "profile_name": _current_profile_name(), - }, + "info": _lazy_resume_info(cwd), "inflight": None, "running": child_running, "session_key": target, - "started_at": now, + "started_at": record["created_at"], "status": "streaming" if child_running else "idle", }, ) @@ -5081,99 +5131,38 @@ def _(rid, params: dict) -> dict: try: db.reopen_session(target) history = db.get_messages_as_conversation(target) - display_history = db.get_messages_as_conversation( - target, include_ancestors=True - ) + display_history = db.get_messages_as_conversation(target, include_ancestors=True) except Exception as e: if lease is not None: lease.release() return _err(rid, 5000, f"resume failed: {e}") - display_history_prefix = display_history[ - : max(0, len(display_history) - len(history)) - ] - messages = _history_to_messages(display_history) - # Restore the model/provider/reasoning/tier this chat actually used so - # the deferred build (and the immediate info payload) match the eager - # path — without these a deferred build drops the provider and resume - # fails with "No LLM provider configured". - stored_runtime_overrides = _stored_session_runtime_overrides(found) or {} + prefix = display_history[: max(0, len(display_history) - len(history))] + # Restore the model/provider/reasoning/tier this chat last used so the + # deferred build (and the info below) match the eager path — without them + # the build drops the provider ("No LLM provider configured"). + overrides = _stored_session_runtime_overrides(found) or {} + model_override = overrides.get("model_override") or {} cwd = profile_resume_cwd or os.getenv("TERMINAL_CWD", os.getcwd()) - now = time.time() - source = str(params.get("source") or "tui").strip() or "tui" - with _session_resume_lock: - live = _find_live_session_by_key(target) - if live is not None: - if lease is not None: - lease.release() - return _ok(rid, _reuse_live_payload(*live)) - with _sessions_lock: - _sessions[sid] = { - "agent": None, - "agent_error": None, - "agent_ready": threading.Event(), - "attached_images": [], - "close_on_disconnect": is_truthy_value( - params.get("close_on_disconnect", False) - ), - "active_session_lease": lease, - "cols": cols, - "created_at": now, - "display_history_prefix": display_history_prefix, - "edit_snapshots": {}, - "explicit_cwd": False, - "history": history, - "history_lock": threading.Lock(), - "history_version": 0, - "image_counter": 0, - "cwd": cwd, - "inflight_turn": None, - "last_active": now, - "model_override": stored_runtime_overrides.get("model_override"), - "pending_title": None, - "profile_home": str(profile_home) if profile_home is not None else None, - "resume_session_id": target, - "resume_runtime_overrides": stored_runtime_overrides or None, - "running": False, - "session_key": target, - "show_reasoning": _load_show_reasoning(), - "source": source, - "slash_worker": None, - "tool_progress_mode": _load_tool_progress_mode(), - "tool_started_at": {}, - "transport": current_transport() or _stdio_transport, - } - _register_session_cwd(_sessions[sid]) + record = _deferred_session_record( + target, + cols=cols, + cwd=cwd, + history=history, + lease=lease, + source=str(params.get("source") or "tui").strip() or "tui", + close_on_disconnect=is_truthy_value(params.get("close_on_disconnect", False)), + display_history_prefix=prefix, + profile_home=profile_home, + model_override=overrides.get("model_override"), + resume_runtime_overrides=overrides or None, + ) + if (live := _claim_or_reuse_live(sid, target, record, lease)) is not None: + return _ok(rid, _reuse_live_payload(*live)) - def _deferred_build() -> None: - session = _sessions.get(sid) - if session is not None: - _start_agent_build(sid, session) + _schedule_agent_build(sid) + _schedule_session_cap_enforcement() # trim detached idle sessions over the cap - build_timer = threading.Timer(0.05, _deferred_build) - build_timer.daemon = True - build_timer.start() - # A new live session just landed; trim detached idle ones over the cap. - _schedule_session_cap_enforcement() - - model_override = stored_runtime_overrides.get("model_override") - resumed_model = "" - if isinstance(model_override, dict): - resumed_model = str(model_override.get("model") or "").strip() - elif isinstance(model_override, str): - resumed_model = model_override.strip() - info = { - "cwd": cwd, - "branch": _git_branch_for_cwd(cwd), - "model": resumed_model or _resolve_model(), - "tools": {}, - "skills": {}, - "lazy": True, - "desktop_contract": DESKTOP_BACKEND_CONTRACT, - "profile_name": _current_profile_name(), - } - provider_override = stored_runtime_overrides.get("provider_override") - if provider_override: - info["provider"] = provider_override + messages = _history_to_messages(display_history) return _ok( rid, { @@ -5181,11 +5170,15 @@ def _(rid, params: dict) -> dict: "resumed": target, "message_count": len(messages), "messages": messages, - "info": info, + "info": _lazy_resume_info( + cwd, + model=model_override.get("model") or "", + provider=overrides.get("provider_override") or "", + ), "inflight": None, "running": False, "session_key": target, - "started_at": now, + "started_at": record["created_at"], "status": "idle", }, )