Merge pull request #52594 from NousResearch/bb/queue-resubmit-on-busy

fix(tui_gateway): queue mid-turn prompts instead of dropping them on a busy retry
This commit is contained in:
brooklyn! 2026-06-25 12:50:18 -05:00 committed by GitHub
commit a53fc78c02
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 233 additions and 1 deletions

View file

@ -0,0 +1,140 @@
"""A prompt that lands mid-turn is interrupted + queued, never dropped.
Before this, ``prompt.submit`` on a running session returned ``session busy``,
forcing clients into a deadline-bounded busy-retry. When turn teardown outlived
the deadline e.g. a slow, non-interruptible tool (``web_search``) still
running when the user hit stop the resubmitted message was silently dropped
("it just doesn't listen"). The gateway now applies the ``busy_input_mode``
policy: interrupt the live turn (default) and queue the message to run as the
next turn, drained in ``run``'s tail.
"""
import threading
import types
from tui_gateway import server
def _session(agent=None, **extra):
return {
"agent": agent if agent is not None else types.SimpleNamespace(),
"session_key": "session-key",
"history": [],
"history_lock": threading.Lock(),
"history_version": 0,
"running": False,
"transport": None,
"attached_images": [],
**extra,
}
# ── _enqueue_prompt ────────────────────────────────────────────────────────
def test_enqueue_pins_text_and_transport():
session = _session()
server._enqueue_prompt(session, "hello", "ws-1")
assert session["queued_prompt"] == {"text": "hello", "transport": "ws-1"}
def test_enqueue_merges_second_arrival_losslessly():
session = _session()
server._enqueue_prompt(session, "first", "ws-1")
server._enqueue_prompt(session, "second", "ws-2")
assert session["queued_prompt"]["text"] == "first\n\nsecond"
# Latest transport wins so the drain streams to the most recent client.
assert session["queued_prompt"]["transport"] == "ws-2"
# ── _handle_busy_submit (policy) ───────────────────────────────────────────
def test_busy_interrupt_mode_interrupts_and_queues(monkeypatch):
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "interrupt")
calls = {"interrupt": 0}
agent = types.SimpleNamespace(interrupt=lambda *a, **k: calls.__setitem__("interrupt", calls["interrupt"] + 1))
session = _session(agent=agent)
resp = server._handle_busy_submit("r1", "sid", session, "redirect", "ws-1")
assert resp["result"]["status"] == "queued"
assert calls["interrupt"] == 1
assert session["queued_prompt"]["text"] == "redirect"
def test_busy_queue_mode_queues_without_interrupting(monkeypatch):
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "queue")
calls = {"interrupt": 0}
agent = types.SimpleNamespace(interrupt=lambda *a, **k: calls.__setitem__("interrupt", calls["interrupt"] + 1))
session = _session(agent=agent)
resp = server._handle_busy_submit("r1", "sid", session, "later", "ws-1")
assert resp["result"]["status"] == "queued"
assert calls["interrupt"] == 0
assert session["queued_prompt"]["text"] == "later"
def test_busy_steer_mode_injects_when_accepted(monkeypatch):
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "steer")
agent = types.SimpleNamespace(steer=lambda text: True, interrupt=lambda *a, **k: None)
session = _session(agent=agent)
resp = server._handle_busy_submit("r1", "sid", session, "nudge", "ws-1")
assert resp["result"]["status"] == "steered"
assert session.get("queued_prompt") is None
def test_busy_steer_mode_falls_back_to_queue_when_rejected(monkeypatch):
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "steer")
agent = types.SimpleNamespace(steer=lambda text: False, interrupt=lambda *a, **k: None)
session = _session(agent=agent)
resp = server._handle_busy_submit("r1", "sid", session, "nudge", "ws-1")
assert resp["result"]["status"] == "queued"
assert session["queued_prompt"]["text"] == "nudge"
# ── _drain_queued_prompt ───────────────────────────────────────────────────
def test_drain_fires_queued_prompt_and_claims_running(monkeypatch):
fired = {}
monkeypatch.setattr(
server, "_run_prompt_submit",
lambda rid, sid, session, text: fired.update(rid=rid, sid=sid, text=text),
)
session = _session(queued_prompt={"text": "go", "transport": "ws-9"})
assert server._drain_queued_prompt("r1", "sid", session) is True
assert fired == {"rid": "r1", "sid": "sid", "text": "go"}
assert session["running"] is True
assert session["queued_prompt"] is None
assert session["transport"] == "ws-9"
def test_drain_noop_when_nothing_queued(monkeypatch):
monkeypatch.setattr(server, "_run_prompt_submit", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not fire")))
session = _session()
assert server._drain_queued_prompt("r1", "sid", session) is False
assert session["running"] is False
def test_drain_noop_when_session_already_running(monkeypatch):
"""A fresh turn that claimed the session beats a stale queued entry —
the drain leaves it for that turn's own tail."""
monkeypatch.setattr(server, "_run_prompt_submit", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not fire")))
session = _session(running=True, queued_prompt={"text": "go", "transport": None})
assert server._drain_queued_prompt("r1", "sid", session) is False
assert session["queued_prompt"]["text"] == "go"
def test_drain_releases_running_on_dispatch_failure(monkeypatch):
def _boom(*a, **k):
raise RuntimeError("dispatch failed")
monkeypatch.setattr(server, "_run_prompt_submit", _boom)
session = _session(queued_prompt={"text": "go", "transport": None})
assert server._drain_queued_prompt("r1", "sid", session) is True
# Failure must not leave the session wedged as running.
assert session["running"] is False

View file

@ -4392,6 +4392,88 @@ def _clear_inflight_turn(session: dict) -> None:
session["inflight_turn"] = None
def _enqueue_prompt(session: dict, text: Any, transport: Any) -> None:
"""Stash a message to run as the very next turn once the live one ends.
Used when a prompt arrives mid-turn (see ``_handle_busy_submit``). A single
slot is kept; a second arrival is merged (lossless, mirroring the
consecutive-user merge in ``repair_message_sequence``) so nothing the user
typed is dropped. ``transport`` is pinned so the drained turn streams back to
the client that sent it even if the session transport is rebound meanwhile.
"""
existing = session.get("queued_prompt")
if (
existing
and isinstance(existing.get("text"), str)
and isinstance(text, str)
):
prev = existing["text"]
text = f"{prev}\n\n{text}" if prev and text else (prev or text)
session["queued_prompt"] = {"text": text, "transport": transport}
def _handle_busy_submit(rid, sid: str, session: dict, text: Any, transport: Any) -> dict:
"""Apply the ``display.busy_input_mode`` policy to a prompt that lands while
a turn is in flight, instead of rejecting it with ``session busy``.
The old rejection forced clients into a deadline-bounded busy-retry that
silently dropped the send when turn teardown outlived the deadline (e.g. a
slow, non-interruptible tool like ``web_search`` running when the user hits
stop). The message is instead queued to run as the next turn and, for the
default ``interrupt`` policy, the live turn is interrupted so it winds down
promptly. Drained in ``run``'s tail (see ``_run_prompt_submit``).
Modes: ``interrupt`` (default) interrupt + queue; ``queue`` queue
without interrupting; ``steer`` inject into the live turn if accepted,
else queue.
"""
mode = _load_busy_input_mode()
agent = session.get("agent")
if mode == "steer" and agent is not None and hasattr(agent, "steer"):
try:
if agent.steer(text):
session["last_active"] = time.time()
return _ok(rid, {"status": "steered"})
except Exception:
pass # fall through to queue
if mode != "queue" and agent is not None and hasattr(agent, "interrupt"):
try:
agent.interrupt()
except Exception:
pass
_enqueue_prompt(session, text, transport)
session["last_active"] = time.time()
return _ok(rid, {"status": "queued"})
def _drain_queued_prompt(rid, sid: str, session: dict) -> bool:
"""Fire a queued next-turn prompt if one is waiting and the session is idle.
Returns True if a queued prompt was dispatched (the caller should then skip
lower-priority follow-ups this cycle the user's message wins). Mirrors the
claim-under-lock pattern used by the goal-continuation re-fire.
"""
with session["history_lock"]:
queued = session.get("queued_prompt")
if not queued or session.get("running"):
return False
session["queued_prompt"] = None
session["running"] = True
if queued.get("transport") is not None:
session["transport"] = queued["transport"]
try:
_run_prompt_submit(rid, sid, session, queued["text"])
except Exception as exc:
print(
f"[tui_gateway] queued prompt dispatch failed: "
f"{type(exc).__name__}: {exc}",
file=sys.stderr,
)
with session["history_lock"]:
session["running"] = False
return True
def _inflight_snapshot(session: dict) -> dict | None:
turn = session.get("inflight_turn")
if not isinstance(turn, dict):
@ -7456,7 +7538,11 @@ def _(rid, params: dict) -> dict:
session["transport"] = t
with session["history_lock"]:
if session.get("running"):
return _err(rid, 4009, "session busy")
# Don't reject a mid-turn prompt — queue it (and, by default,
# interrupt the live turn) so it runs as the next turn. See
# _handle_busy_submit for why the old "session busy" rejection
# dropped messages when teardown outlived the client's retry window.
return _handle_busy_submit(rid, sid, session, text, t or session.get("transport"))
# A watch session's run lives in the PARENT turn, so its own running
# flag is False — without this, typing mid-run builds a second agent
# racing the in-flight child on the same stored session (interleaved
@ -8078,6 +8164,12 @@ def _run_prompt_submit(rid, sid: str, session: dict, text: Any) -> None:
_clear_inflight_turn(session)
_emit("session.info", sid, _session_info(agent, session))
# A user prompt that arrived mid-turn (interrupt + queue) wins over
# every auto follow-up below — drain it first and skip them this cycle;
# the goal judge / notifications re-evaluate at the end of that turn.
if _drain_queued_prompt(rid, sid, session):
return
# Chain a goal-continuation turn if the judge said so. We do
# this AFTER the finally releases session["running"], so the
# nested _run_prompt_submit doesn't deadlock on the busy