From 680732c104a80504e95085b4272794792bb89721 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 20 Jun 2026 10:57:41 -0700 Subject: [PATCH] fix(gateway): never interrupt a busy session with an internal completion event (#49738) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Async-delegation completions (delegate_task(background=true)) and background-process completions (terminal notify_on_complete) re-enter the originating session as internal MessageEvents. When the session was busy, _handle_active_session_busy_message treated them like a user TEXT message and the default busy_input_mode='interrupt' aborted the active turn (and sent a 'Interrupting current task' ack) — the opposite of the design invariant that a completion surfaces as a new turn only when idle. Short-circuit internal events to return False so the base adapter queues them silently (it already excludes internal events from debounce), cascading them as the next turn after the current one finishes. --- gateway/run.py | 14 ++ ...nal_event_never_interrupts_busy_session.py | 151 ++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 tests/gateway/test_internal_event_never_interrupts_busy_session.py diff --git a/gateway/run.py b/gateway/run.py index 9c280f3dc12..f5a411244aa 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4169,6 +4169,20 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if not adapter: return False # let default path handle it + # --- Internal synthetic events must never interrupt/steer --- + # Async-delegation completions (delegate_task(background=true)) and + # background-process completions (terminal notify_on_complete) re-enter + # the originating session as internal MessageEvents. When the session + # is busy, treating them like a user TEXT message means interrupt-mode + # (the default busy_text_mode) aborts the active turn AND sends a "⚡ + # Interrupting current task" ack — exactly the opposite of the design + # invariant that a completion surfaces as a NEW turn only when idle and + # never splices into a running turn. Fall through to the base adapter, + # which queues internal events silently (no interrupt, no ack) so they + # cascade after the current turn finishes. + if getattr(event, "internal", False): + return False + running_agent = self._running_agents.get(session_key) effective_mode = self._busy_input_mode diff --git a/tests/gateway/test_internal_event_never_interrupts_busy_session.py b/tests/gateway/test_internal_event_never_interrupts_busy_session.py new file mode 100644 index 00000000000..5b8467e5b48 --- /dev/null +++ b/tests/gateway/test_internal_event_never_interrupts_busy_session.py @@ -0,0 +1,151 @@ +"""Regression test: internal synthetic events must never interrupt a busy session. + +Reported by @Heeervas (June 2026): an ``async_delegation`` completion from a +``delegate_task(background=true)`` subagent re-enters the originating gateway +session as an internal ``MessageEvent``. When that session was busy running a +turn, the completion was treated exactly like a user TEXT message and hit the +default ``busy_input_mode='interrupt'`` path — calling +``running_agent.interrupt()`` and aborting the active turn, plus sending a +"⚡ Interrupting current task" ack. The same shape affects background-process +completions (terminal ``notify_on_complete``), which also re-enter as internal +events. + +The fix: ``_handle_active_session_busy_message`` returns ``False`` early for any +event with ``internal=True``, so the base adapter queues it silently (no +interrupt, no ack) and it cascades as a new turn after the current one finishes. +This preserves strict message-role alternation and the design invariant that a +completion surfaces as a NEW turn only when idle, never spliced into a running +turn. +""" + +from __future__ import annotations + +import sys +import threading +import types +from unittest.mock import AsyncMock, MagicMock + +import pytest + +# Minimal telegram stubs so gateway imports cleanly (mirrors sibling tests). +_tg = types.ModuleType("telegram") +_tg.constants = types.ModuleType("telegram.constants") +_ct = MagicMock() +_ct.SUPERGROUP = "supergroup" +_ct.GROUP = "group" +_ct.PRIVATE = "private" +_tg.constants.ChatType = _ct +sys.modules.setdefault("telegram", _tg) +sys.modules.setdefault("telegram.constants", _tg.constants) +sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext")) + +from gateway.platforms.base import ( # noqa: E402 + MessageEvent, + MessageType, + SessionSource, + build_session_key, +) +from gateway.run import GatewayRunner # noqa: E402 + + +def _make_internal_event(text: str = "[async delegation completed]") -> MessageEvent: + source = SessionSource( + platform=MagicMock(value="telegram"), + chat_id="123", + chat_type="private", + user_id="user1", + ) + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id="msg1", + internal=True, + ) + + +def _make_runner() -> GatewayRunner: + runner = object.__new__(GatewayRunner) + runner._running_agents = {} + runner._running_agents_ts = {} + runner._pending_messages = {} + runner._busy_ack_ts = {} + runner._draining = False + runner.adapters = {} + runner.config = MagicMock() + runner.session_store = None + runner.hooks = MagicMock() + runner.hooks.emit = AsyncMock() + runner.pairing_store = MagicMock() + runner.pairing_store.is_approved.return_value = True + runner._is_user_authorized = lambda _source: True + return runner + + +def _make_adapter() -> MagicMock: + adapter = MagicMock() + adapter._pending_messages = {} + adapter._send_with_retry = AsyncMock() + adapter.config = MagicMock() + adapter.config.extra = {} + adapter.platform = MagicMock(value="telegram") + return adapter + + +def _make_running_parent() -> MagicMock: + parent = MagicMock() + parent._active_children = [] # no active subagents at completion time + parent._active_children_lock = threading.Lock() + parent.get_activity_summary.return_value = { + "api_call_count": 4, + "max_iterations": 60, + "current_tool": "terminal", + } + return parent + + +@pytest.mark.asyncio +async def test_internal_event_does_not_interrupt_busy_session() -> None: + """The async-delegation completion must not abort the active turn.""" + runner = _make_runner() + runner._busy_input_mode = "interrupt" # the default that caused the bug + adapter = _make_adapter() + event = _make_internal_event() + sk = build_session_key(event.source) + parent = _make_running_parent() + runner._running_agents[sk] = parent + runner.adapters[event.source.platform] = adapter + + handled = await runner._handle_active_session_busy_message(event, sk) + + # Returns False so the base adapter silently queues the internal event + # as a cascading next turn — it must NOT be handled-with-interrupt here. + assert handled is False + # The active turn must survive. + parent.interrupt.assert_not_called() + # No "⚡ Interrupting current task" (or any) ack for a synthetic event. + adapter._send_with_retry.assert_not_called() + + +@pytest.mark.asyncio +async def test_non_internal_event_still_interrupts() -> None: + """Regression-guard the other direction: a real user message in interrupt + mode with no subagents still interrupts (behaviour unchanged).""" + runner = _make_runner() + runner._busy_input_mode = "interrupt" + adapter = _make_adapter() + event = _make_internal_event(text="please stop") + # Flip to a real user message. + object.__setattr__(event, "internal", False) + sk = build_session_key(event.source) + parent = _make_running_parent() + runner._running_agents[sk] = parent + runner.adapters[event.source.platform] = adapter + + from unittest.mock import patch + + with patch("gateway.run.merge_pending_message_event"): + handled = await runner._handle_active_session_busy_message(event, sk) + + assert handled is True + parent.interrupt.assert_called_once_with("please stop")