mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
A prompt sent while a turn was in flight got rejected with 4009 "session busy", which pushed clients (the desktop app) into a deadline-bounded busy-retry. When turn teardown outlived that deadline — e.g. the user hits stop while a slow, non-interruptible tool (web_search, read_file, an MCP call) is mid-flight, since the sequential executor only checks the interrupt flag between tools — the resubmitted message was silently dropped: "it just doesn't listen". Wire the previously-dead display.busy_input_mode config into prompt.submit: instead of rejecting, apply the policy and queue the message to run as the next turn (drained in run()'s tail, ahead of goal/notification follow-ups). Modes: interrupt (default) interrupts the live turn so it winds down promptly then runs the queued message; queue runs it after the current turn finishes; steer injects it into the live turn when accepted, else queues. The queued slot pins the sender's transport and losslessly merges a second arrival. No client deadline, no dropped sends.
140 lines
5.9 KiB
Python
140 lines
5.9 KiB
Python
"""A prompt that lands mid-turn is interrupted + queued, never dropped.
|
|
|
|
Before this, ``prompt.submit`` on a running session returned ``session busy``,
|
|
forcing clients into a deadline-bounded busy-retry. When turn teardown outlived
|
|
the deadline — e.g. a slow, non-interruptible tool (``web_search``) still
|
|
running when the user hit stop — the resubmitted message was silently dropped
|
|
("it just doesn't listen"). The gateway now applies the ``busy_input_mode``
|
|
policy: interrupt the live turn (default) and queue the message to run as the
|
|
next turn, drained in ``run``'s tail.
|
|
"""
|
|
|
|
import threading
|
|
import types
|
|
|
|
from tui_gateway import server
|
|
|
|
|
|
def _session(agent=None, **extra):
|
|
return {
|
|
"agent": agent if agent is not None else types.SimpleNamespace(),
|
|
"session_key": "session-key",
|
|
"history": [],
|
|
"history_lock": threading.Lock(),
|
|
"history_version": 0,
|
|
"running": False,
|
|
"transport": None,
|
|
"attached_images": [],
|
|
**extra,
|
|
}
|
|
|
|
|
|
# ── _enqueue_prompt ────────────────────────────────────────────────────────
|
|
|
|
def test_enqueue_pins_text_and_transport():
|
|
session = _session()
|
|
server._enqueue_prompt(session, "hello", "ws-1")
|
|
assert session["queued_prompt"] == {"text": "hello", "transport": "ws-1"}
|
|
|
|
|
|
def test_enqueue_merges_second_arrival_losslessly():
|
|
session = _session()
|
|
server._enqueue_prompt(session, "first", "ws-1")
|
|
server._enqueue_prompt(session, "second", "ws-2")
|
|
assert session["queued_prompt"]["text"] == "first\n\nsecond"
|
|
# Latest transport wins so the drain streams to the most recent client.
|
|
assert session["queued_prompt"]["transport"] == "ws-2"
|
|
|
|
|
|
# ── _handle_busy_submit (policy) ───────────────────────────────────────────
|
|
|
|
def test_busy_interrupt_mode_interrupts_and_queues(monkeypatch):
|
|
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "interrupt")
|
|
calls = {"interrupt": 0}
|
|
agent = types.SimpleNamespace(interrupt=lambda *a, **k: calls.__setitem__("interrupt", calls["interrupt"] + 1))
|
|
session = _session(agent=agent)
|
|
|
|
resp = server._handle_busy_submit("r1", "sid", session, "redirect", "ws-1")
|
|
|
|
assert resp["result"]["status"] == "queued"
|
|
assert calls["interrupt"] == 1
|
|
assert session["queued_prompt"]["text"] == "redirect"
|
|
|
|
|
|
def test_busy_queue_mode_queues_without_interrupting(monkeypatch):
|
|
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "queue")
|
|
calls = {"interrupt": 0}
|
|
agent = types.SimpleNamespace(interrupt=lambda *a, **k: calls.__setitem__("interrupt", calls["interrupt"] + 1))
|
|
session = _session(agent=agent)
|
|
|
|
resp = server._handle_busy_submit("r1", "sid", session, "later", "ws-1")
|
|
|
|
assert resp["result"]["status"] == "queued"
|
|
assert calls["interrupt"] == 0
|
|
assert session["queued_prompt"]["text"] == "later"
|
|
|
|
|
|
def test_busy_steer_mode_injects_when_accepted(monkeypatch):
|
|
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "steer")
|
|
agent = types.SimpleNamespace(steer=lambda text: True, interrupt=lambda *a, **k: None)
|
|
session = _session(agent=agent)
|
|
|
|
resp = server._handle_busy_submit("r1", "sid", session, "nudge", "ws-1")
|
|
|
|
assert resp["result"]["status"] == "steered"
|
|
assert session.get("queued_prompt") is None
|
|
|
|
|
|
def test_busy_steer_mode_falls_back_to_queue_when_rejected(monkeypatch):
|
|
monkeypatch.setattr(server, "_load_busy_input_mode", lambda: "steer")
|
|
agent = types.SimpleNamespace(steer=lambda text: False, interrupt=lambda *a, **k: None)
|
|
session = _session(agent=agent)
|
|
|
|
resp = server._handle_busy_submit("r1", "sid", session, "nudge", "ws-1")
|
|
|
|
assert resp["result"]["status"] == "queued"
|
|
assert session["queued_prompt"]["text"] == "nudge"
|
|
|
|
|
|
# ── _drain_queued_prompt ───────────────────────────────────────────────────
|
|
|
|
def test_drain_fires_queued_prompt_and_claims_running(monkeypatch):
|
|
fired = {}
|
|
monkeypatch.setattr(
|
|
server, "_run_prompt_submit",
|
|
lambda rid, sid, session, text: fired.update(rid=rid, sid=sid, text=text),
|
|
)
|
|
session = _session(queued_prompt={"text": "go", "transport": "ws-9"})
|
|
|
|
assert server._drain_queued_prompt("r1", "sid", session) is True
|
|
assert fired == {"rid": "r1", "sid": "sid", "text": "go"}
|
|
assert session["running"] is True
|
|
assert session["queued_prompt"] is None
|
|
assert session["transport"] == "ws-9"
|
|
|
|
|
|
def test_drain_noop_when_nothing_queued(monkeypatch):
|
|
monkeypatch.setattr(server, "_run_prompt_submit", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not fire")))
|
|
session = _session()
|
|
assert server._drain_queued_prompt("r1", "sid", session) is False
|
|
assert session["running"] is False
|
|
|
|
|
|
def test_drain_noop_when_session_already_running(monkeypatch):
|
|
"""A fresh turn that claimed the session beats a stale queued entry —
|
|
the drain leaves it for that turn's own tail."""
|
|
monkeypatch.setattr(server, "_run_prompt_submit", lambda *a, **k: (_ for _ in ()).throw(AssertionError("should not fire")))
|
|
session = _session(running=True, queued_prompt={"text": "go", "transport": None})
|
|
assert server._drain_queued_prompt("r1", "sid", session) is False
|
|
assert session["queued_prompt"]["text"] == "go"
|
|
|
|
|
|
def test_drain_releases_running_on_dispatch_failure(monkeypatch):
|
|
def _boom(*a, **k):
|
|
raise RuntimeError("dispatch failed")
|
|
monkeypatch.setattr(server, "_run_prompt_submit", _boom)
|
|
session = _session(queued_prompt={"text": "go", "transport": None})
|
|
|
|
assert server._drain_queued_prompt("r1", "sid", session) is True
|
|
# Failure must not leave the session wedged as running.
|
|
assert session["running"] is False
|