From 70319626a9221f85cfa119a3126fd3360d0fc9b2 Mon Sep 17 00:00:00 2001 From: Brooklyn Nicholson Date: Thu, 25 Jun 2026 12:29:49 -0500 Subject: [PATCH] fix(tui_gateway): queue mid-turn prompts instead of dropping them on a busy retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A prompt sent while a turn was in flight got rejected with 4009 "session busy", which pushed clients (the desktop app) into a deadline-bounded busy-retry. When turn teardown outlived that deadline — e.g. the user hits stop while a slow, non-interruptible tool (web_search, read_file, an MCP call) is mid-flight, since the sequential executor only checks the interrupt flag between tools — the resubmitted message was silently dropped: "it just doesn't listen". Wire the previously-dead display.busy_input_mode config into prompt.submit: instead of rejecting, apply the policy and queue the message to run as the next turn (drained in run()'s tail, ahead of goal/notification follow-ups). Modes: interrupt (default) interrupts the live turn so it winds down promptly then runs the queued message; queue runs it after the current turn finishes; steer injects it into the live turn when accepted, else queues. The queued slot pins the sender's transport and losslessly merges a second arrival. No client deadline, no dropped sends. --- tests/test_tui_gateway_queue_on_busy.py | 140 ++++++++++++++++++++++++ tui_gateway/server.py | 94 +++++++++++++++- 2 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 tests/test_tui_gateway_queue_on_busy.py diff --git a/tests/test_tui_gateway_queue_on_busy.py b/tests/test_tui_gateway_queue_on_busy.py new file mode 100644 index 00000000000..e1c5050295d --- /dev/null +++ b/tests/test_tui_gateway_queue_on_busy.py @@ -0,0 +1,140 @@ +"""A prompt that lands mid-turn is interrupted + queued, never dropped. + +Before this, ``prompt.submit`` on a running session returned ``session busy``, +forcing clients into a deadline-bounded busy-retry. When turn teardown outlived +the deadline — e.g. a slow, non-interruptible tool (``web_search``) still +running when the user hit stop — the resubmitted message was silently dropped +("it just doesn't listen"). The gateway now applies the ``busy_input_mode`` +policy: interrupt the live turn (default) and queue the message to run as the +next turn, drained in ``run``'s tail. +""" + +import threading +import types + +from tui_gateway import server + + +def _session(agent=None, **extra): + return { + "agent": agent if agent is not None else types.SimpleNamespace(), + "session_key": "session-key", + "history": [], + "history_lock": threading.Lock(), + "history_version": 0, + "running": False, + "transport": None, + "attached_images": [], + **extra, + } + + +# ── _enqueue_prompt ──────────────────────────────────────────────────────── + +def test_enqueue_pins_text_and_transport(): + session = _session() + server._enqueue_prompt(session, "hello", "ws-1") + assert session["queued_prompt"] == {"text": "hello", "transport": "ws-1"} + + +def test_enqueue_merges_second_arrival_losslessly(): + session = _session() + server._enqueue_prompt(session, "first", "ws-1") + server._enqueue_prompt(session, "second", "ws-2") + assert session["queued_prompt"]["text"] == "first\n\nsecond" + # Latest transport wins so the drain streams to the most recent client. + assert session["queued_prompt"]["transport"] == "ws-2" + + +# ── _handle_busy_submit (policy) ─────────────────────────────────────────── + +def test_busy_interrupt_mode_interrupts_and_queues(monkeypatch): + monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "interrupt") + calls = {"interrupt": 0} + agent = types.SimpleNamespace(interrupt=lambda *a, **k: calls.__setitem__("interrupt", calls["interrupt"] + 1)) + session = _session(agent=agent) + + resp = server._handle_busy_submit("r1", "sid", session, "redirect", "ws-1") + + assert resp["result"]["status"] == "queued" + assert calls["interrupt"] == 1 + assert session["queued_prompt"]["text"] == "redirect" + + +def test_busy_queue_mode_queues_without_interrupting(monkeypatch): + monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "queue") + calls = {"interrupt": 0} + agent = types.SimpleNamespace(interrupt=lambda *a, **k: calls.__setitem__("interrupt", calls["interrupt"] + 1)) + session = _session(agent=agent) + + resp = server._handle_busy_submit("r1", "sid", session, "later", "ws-1") + + assert resp["result"]["status"] == "queued" + assert calls["interrupt"] == 0 + assert session["queued_prompt"]["text"] == "later" + + +def test_busy_steer_mode_injects_when_accepted(monkeypatch): + monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "steer") + agent = types.SimpleNamespace(steer=lambda text: True, interrupt=lambda *a, **k: None) + session = _session(agent=agent) + + resp = server._handle_busy_submit("r1", "sid", session, "nudge", "ws-1") + + assert resp["result"]["status"] == "steered" + assert session.get("queued_prompt") is None + + +def test_busy_steer_mode_falls_back_to_queue_when_rejected(monkeypatch): + monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "steer") + agent = types.SimpleNamespace(steer=lambda text: False, interrupt=lambda *a, **k: None) + session = _session(agent=agent) + + resp = server._handle_busy_submit("r1", "sid", session, "nudge", "ws-1") + + assert resp["result"]["status"] == "queued" + assert session["queued_prompt"]["text"] == "nudge" + + +# ── _drain_queued_prompt ─────────────────────────────────────────────────── + +def test_drain_fires_queued_prompt_and_claims_running(monkeypatch): + fired = {} + monkeypatch.setattr( + server, "_run_prompt_submit", + lambda rid, sid, session, text: fired.update(rid=rid, sid=sid, text=text), + ) + session = _session(queued_prompt={"text": "go", "transport": "ws-9"}) + + assert server._drain_queued_prompt("r1", "sid", session) is True + assert fired == {"rid": "r1", "sid": "sid", "text": "go"} + assert session["running"] is True + assert session["queued_prompt"] is None + assert session["transport"] == "ws-9" + + +def test_drain_noop_when_nothing_queued(monkeypatch): + monkeypatch.setattr(server, "_run_prompt_submit", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not fire"))) + session = _session() + assert server._drain_queued_prompt("r1", "sid", session) is False + assert session["running"] is False + + +def test_drain_noop_when_session_already_running(monkeypatch): + """A fresh turn that claimed the session beats a stale queued entry — + the drain leaves it for that turn's own tail.""" + monkeypatch.setattr(server, "_run_prompt_submit", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not fire"))) + session = _session(running=True, queued_prompt={"text": "go", "transport": None}) + assert server._drain_queued_prompt("r1", "sid", session) is False + assert session["queued_prompt"]["text"] == "go" + + +def test_drain_releases_running_on_dispatch_failure(monkeypatch): + def _boom(*a, **k): + raise RuntimeError("dispatch failed") + monkeypatch.setattr(server, "_run_prompt_submit", _boom) + session = _session(queued_prompt={"text": "go", "transport": None}) + + assert server._drain_queued_prompt("r1", "sid", session) is True + # Failure must not leave the session wedged as running. + assert session["running"] is False diff --git a/tui_gateway/server.py b/tui_gateway/server.py index b328efa8d6c..4333a16e575 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -4392,6 +4392,88 @@ def _clear_inflight_turn(session: dict) -> None: session["inflight_turn"] = None +def _enqueue_prompt(session: dict, text: Any, transport: Any) -> None: + """Stash a message to run as the very next turn once the live one ends. + + Used when a prompt arrives mid-turn (see ``_handle_busy_submit``). A single + slot is kept; a second arrival is merged (lossless, mirroring the + consecutive-user merge in ``repair_message_sequence``) so nothing the user + typed is dropped. ``transport`` is pinned so the drained turn streams back to + the client that sent it even if the session transport is rebound meanwhile. + """ + existing = session.get("queued_prompt") + if ( + existing + and isinstance(existing.get("text"), str) + and isinstance(text, str) + ): + prev = existing["text"] + text = f"{prev}\n\n{text}" if prev and text else (prev or text) + session["queued_prompt"] = {"text": text, "transport": transport} + + +def _handle_busy_submit(rid, sid: str, session: dict, text: Any, transport: Any) -> dict: + """Apply the ``display.busy_input_mode`` policy to a prompt that lands while + a turn is in flight, instead of rejecting it with ``session busy``. + + The old rejection forced clients into a deadline-bounded busy-retry that + silently dropped the send when turn teardown outlived the deadline (e.g. a + slow, non-interruptible tool like ``web_search`` running when the user hits + stop). The message is instead queued to run as the next turn — and, for the + default ``interrupt`` policy, the live turn is interrupted so it winds down + promptly. Drained in ``run``'s tail (see ``_run_prompt_submit``). + + Modes: ``interrupt`` (default) → interrupt + queue; ``queue`` → queue + without interrupting; ``steer`` → inject into the live turn if accepted, + else queue. + """ + mode = _load_busy_input_mode() + agent = session.get("agent") + if mode == "steer" and agent is not None and hasattr(agent, "steer"): + try: + if agent.steer(text): + session["last_active"] = time.time() + return _ok(rid, {"status": "steered"}) + except Exception: + pass # fall through to queue + if mode != "queue" and agent is not None and hasattr(agent, "interrupt"): + try: + agent.interrupt() + except Exception: + pass + _enqueue_prompt(session, text, transport) + session["last_active"] = time.time() + return _ok(rid, {"status": "queued"}) + + +def _drain_queued_prompt(rid, sid: str, session: dict) -> bool: + """Fire a queued next-turn prompt if one is waiting and the session is idle. + + Returns True if a queued prompt was dispatched (the caller should then skip + lower-priority follow-ups this cycle — the user's message wins). Mirrors the + claim-under-lock pattern used by the goal-continuation re-fire. + """ + with session["history_lock"]: + queued = session.get("queued_prompt") + if not queued or session.get("running"): + return False + session["queued_prompt"] = None + session["running"] = True + if queued.get("transport") is not None: + session["transport"] = queued["transport"] + try: + _run_prompt_submit(rid, sid, session, queued["text"]) + except Exception as exc: + print( + f"[tui_gateway] queued prompt dispatch failed: " + f"{type(exc).__name__}: {exc}", + file=sys.stderr, + ) + with session["history_lock"]: + session["running"] = False + return True + + def _inflight_snapshot(session: dict) -> dict | None: turn = session.get("inflight_turn") if not isinstance(turn, dict): @@ -7456,7 +7538,11 @@ def _(rid, params: dict) -> dict: session["transport"] = t with session["history_lock"]: if session.get("running"): - return _err(rid, 4009, "session busy") + # Don't reject a mid-turn prompt — queue it (and, by default, + # interrupt the live turn) so it runs as the next turn. See + # _handle_busy_submit for why the old "session busy" rejection + # dropped messages when teardown outlived the client's retry window. + return _handle_busy_submit(rid, sid, session, text, t or session.get("transport")) # A watch session's run lives in the PARENT turn, so its own running # flag is False — without this, typing mid-run builds a second agent # racing the in-flight child on the same stored session (interleaved @@ -8078,6 +8164,12 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None: _clear_inflight_turn(session) _emit("session.info", sid, _session_info(agent, session)) + # A user prompt that arrived mid-turn (interrupt + queue) wins over + # every auto follow-up below — drain it first and skip them this cycle; + # the goal judge / notifications re-evaluate at the end of that turn. + if _drain_queued_prompt(rid, sid, session): + return + # Chain a goal-continuation turn if the judge said so. We do # this AFTER the finally releases session["running"], so the # nested _run_prompt_submit doesn't deadlock on the busy