Merge pull request #33817 from sweetcornna/fix/28503-busy-input-fifo

fix(gateway): use FIFO queue for busy_input_mode pending messages
This commit is contained in:
kshitij 2026-06-08 02:02:02 -07:00 committed by GitHub
commit 4eb8972390
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 195 additions and 7 deletions

View file

@ -3427,11 +3427,52 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
except Exception:
return False
# Hard cap on per-session pending follow-ups for busy_input_mode=queue
# (and the draining/steer-fallback/subagent-demotion paths that share
# this entry point). Without a cap, a stuck agent + a rapid-fire user
# could grow the overflow list unboundedly. 32 turns of queued
# follow-ups is far beyond any realistic conversational backlog while
# still small enough to never threaten memory.
_BUSY_QUEUE_MAX_PENDING = 32
def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None:
adapter = self.adapters.get(event.source.platform)
if not adapter:
return
merge_pending_message_event(adapter._pending_messages, session_key, event)
# #28503 — Previously this called ``merge_pending_message_event``
# with the default ``merge_text=False``, which silently OVERWROTE
# the single pending slot when consecutive text messages arrived
# in ``busy_input_mode: queue``. Route through the FIFO
# infrastructure shared with ``/queue`` so each follow-up gets
# its own turn in arrival order. Photo bursts still merge into
# the head slot via ``merge_pending_message_event`` (album
# semantics); everything else appends to the overflow tail.
pending_slot = getattr(adapter, "_pending_messages", None)
existing = pending_slot.get(session_key) if isinstance(pending_slot, dict) else None
if existing is not None and (
getattr(existing, "message_type", None) == MessageType.PHOTO
or event.message_type == MessageType.PHOTO
or bool(getattr(existing, "media_urls", None))
or bool(getattr(event, "media_urls", None))
):
# Preserve photo-burst / media-merge semantics for the head slot.
merge_pending_message_event(
adapter._pending_messages,
session_key,
event,
merge_text=event.message_type == MessageType.TEXT,
)
return
if self._queue_depth(session_key, adapter=adapter) >= self._BUSY_QUEUE_MAX_PENDING:
logger.warning(
"Dropping busy-mode follow-up for session %s — pending queue at cap (%d).",
session_key,
self._BUSY_QUEUE_MAX_PENDING,
)
return
self._enqueue_fifo(session_key, event, adapter)
async def _handle_active_session_busy_message(self, event: MessageEvent, session_key: str) -> bool:
# --- Authorization gate (#17775) ---
@ -7024,12 +7065,15 @@ class GatewayRunner(GatewayKanbanWatchersMixin, GatewaySlashCommandsMixin):
)
adapter = self.adapters.get(source.platform)
if adapter:
merge_pending_message_event(
adapter._pending_messages,
_quick_key,
event,
merge_text=True,
)
if self._busy_input_mode == "queue":
self._enqueue_fifo(_quick_key, event, adapter)
else:
merge_pending_message_event(
adapter._pending_messages,
_quick_key,
event,
merge_text=True,
)
return None
running_agent = self._running_agents.get(_quick_key)

View file

@ -27,6 +27,7 @@ sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext"))
from gateway.platforms.base import (
MessageEvent,
MessageType,
Platform,
SessionSource,
build_session_key,
)
@ -66,6 +67,8 @@ def _make_runner():
runner._busy_text_mode = "interrupt"
runner.adapters = {}
runner.config = MagicMock()
runner.config.group_sessions_per_user = True
runner.config.thread_sessions_per_user = False
runner.session_store = None
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
@ -119,6 +122,55 @@ class TestBusySessionAck:
assert sk not in runner._pending_messages
running_agent.interrupt.assert_not_called()
@pytest.mark.asyncio
async def test_telegram_grace_followups_respect_queue_fifo(self, monkeypatch):
"""Rapid Telegram text follow-ups in queue mode must not merge."""
from gateway.run import GatewayRunner
monkeypatch.setenv("HERMES_TELEGRAM_FOLLOWUP_GRACE_SECONDS", "3.0")
runner, _sentinel = _make_runner()
runner._busy_input_mode = "queue"
runner._queued_events = {}
adapter = _make_adapter()
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id="123",
chat_type="dm",
user_id="user1",
)
sk = build_session_key(source)
runner.adapters[source.platform] = adapter
agent = MagicMock()
agent.get_activity_summary.return_value = {
"seconds_since_activity": 0.0,
}
runner._running_agents[sk] = agent
runner._running_agents_ts[sk] = time.time()
events = [
MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
message_id=f"m-{idx}",
)
for idx, text in enumerate(("first", "second", "third"), start=1)
]
for event in events:
result = await GatewayRunner._handle_message(runner, event)
assert result is None
assert adapter._pending_messages[sk].text == "first"
assert [event.text for event in runner._queued_events[sk]] == [
"second",
"third",
]
agent.interrupt.assert_not_called()
@pytest.mark.asyncio
async def test_sends_ack_when_agent_running(self):
"""First message during busy session should get a status ack."""

View file

@ -360,3 +360,95 @@ class TestQueueConsumptionAfterCompletion:
e.text for e in runner._queued_events[session_key]
]
assert collected == texts
class TestBusyInputModeQueueFifo:
"""Regression coverage for issue #28503.
``busy_input_mode: queue`` rapid follow-ups used to silently overwrite
a single pending slot, losing every message except the last. The
runner's busy/queue/steer-fallback entry point now routes through
the same FIFO infrastructure as ``/queue``, so each follow-up gets
its own turn in arrival order.
"""
def _make_runner_and_adapter(self):
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._queued_events = {}
adapter = _StubAdapter()
runner.adapters = {Platform.TELEGRAM: adapter}
return runner, adapter
def _text_event(self, text: str) -> MessageEvent:
source = MagicMock(chat_id="c1", platform=Platform.TELEGRAM)
return MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=source,
message_id=f"m-{text}",
)
def test_rapid_text_followups_are_queued_in_fifo_order(self):
"""Five rapid texts in queue mode must all survive (none silently dropped)."""
runner, adapter = self._make_runner_and_adapter()
session_key = "telegram:user:fifo"
texts = ["one", "two", "three", "four", "five"]
for text in texts:
runner._queue_or_replace_pending_event(session_key, self._text_event(text))
# Head slot keeps the first; overflow keeps the rest in order.
assert adapter._pending_messages[session_key].text == "one"
assert [e.text for e in runner._queued_events[session_key]] == [
"two",
"three",
"four",
"five",
]
assert runner._queue_depth(session_key, adapter=adapter) == len(texts)
def test_queue_respects_bounded_cap(self):
"""Beyond the per-session cap, follow-ups are dropped (with a warning)."""
from gateway.run import GatewayRunner
runner, adapter = self._make_runner_and_adapter()
session_key = "telegram:user:cap"
cap = GatewayRunner._BUSY_QUEUE_MAX_PENDING
for i in range(cap + 5):
runner._queue_or_replace_pending_event(
session_key, self._text_event(f"msg-{i:03d}")
)
# Exactly ``cap`` follow-ups retained (head + cap-1 in overflow).
assert runner._queue_depth(session_key, adapter=adapter) == cap
assert adapter._pending_messages[session_key].text == "msg-000"
# The last accepted overflow item is msg-{cap-1}.
assert runner._queued_events[session_key][-1].text == f"msg-{cap - 1:03d}"
def test_photo_burst_still_merges_in_head_slot(self):
"""Photo bursts must keep album-merge semantics, not split into N turns."""
runner, adapter = self._make_runner_and_adapter()
session_key = "telegram:user:burst"
source = MagicMock(chat_id="c1", platform=Platform.TELEGRAM)
for i in range(3):
runner._queue_or_replace_pending_event(
session_key,
MessageEvent(
text="",
message_type=MessageType.PHOTO,
source=source,
message_id=f"p-{i}",
media_urls=[f"http://example.com/{i}.jpg"],
media_types=["image/jpeg"],
),
)
# Single merged head event with all three media URLs.
assert session_key not in runner._queued_events or not runner._queued_events[session_key]
head = adapter._pending_messages[session_key]
assert head.message_type == MessageType.PHOTO
assert len(head.media_urls) == 3