From 99d62f6ba1fe7d59542cc4350d90c46bada108b2 Mon Sep 17 00:00:00 2001 From: xxxigm Date: Fri, 22 May 2026 09:39:40 +0700 Subject: [PATCH] fix(gateway): protect in-flight subagents from busy-mode interrupts (#30170) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user sends a conversational follow-up while delegate_task is running, gateway/run.py calls running_agent.interrupt(event.text) on the PARENT agent. AIAgent.interrupt() then cascades synchronously through self._active_children and calls interrupt() on every child subagent, aborting in-flight delegate_task work. The user sees the fallback cascade with no root-cause in the gateway log, and minutes of subagent progress are destroyed — the exact failure mode reported in Add GatewayRunner._agent_has_active_subagents(running_agent) — a static helper that returns True iff the parent is currently driving subagents via delegate_task. The helper is type-defensive: it ignores truthy MagicMock auto-attributes (so this doesn't accidentally fire in every test mock that hits the busy path), the _AGENT_PENDING_SENTINEL placeholder, and missing locks. Wire the helper into both interrupt branches: 1. _handle_active_session_busy_message — the adapter-level busy handler. When busy_input_mode == 'interrupt' AND the parent has active subagents, demote to 'queue' semantics: skip the parent.interrupt() call, merge the message into the pending queue, and surface a dedicated ack ("⏳ Subagent working — your message is queued for when it finishes (use /stop to cancel everything).") so the operator knows the message wasn't lost and discovers the explicit escape hatch. 2. The PRIORITY interrupt branch inside _handle_message — the non-command fast path. Same rationale, same demotion. Routes through _queue_or_replace_pending_event so the next-turn pickup stays unchanged. Explicit /stop and /new commands take a completely different path (_interrupt_and_clear_session in the slash-command dispatch at line ~6771) and are NOT affected by this guard — the operator still has a way to force-cancel everything when they actually mean it. Configured 'queue' and 'steer' modes are also untouched: 'queue' already does the right thing, and 'steer' goes through running_agent.steer() which does NOT cascade to children (so subagents survive a steer too). This is Phase 1 of the fix outlined in #30170 — the minimum viable change that stops subagent loss. Phase 2 (delegation-aware steer forwarding to active children) and Phase 3 (async delegation, #11508) are intentionally out of scope. Refs #30170. --- gateway/run.py | 81 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/gateway/run.py b/gateway/run.py index 696f9b29b81..469db48e5b1 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3034,6 +3034,44 @@ class GatewayRunner: if agent is not _AGENT_PENDING_SENTINEL } + @staticmethod + def _agent_has_active_subagents(running_agent: Any) -> bool: + """Return True when *running_agent* is currently driving subagents + via the ``delegate_task`` tool. + + Background (#30170): ``AIAgent.interrupt()`` cascades through the + parent's ``_active_children`` list and calls ``interrupt()`` on + every child synchronously, which aborts in-flight subagent work + and produces a fallback cascade with no actionable signal. + Demoting ``busy_input_mode='interrupt'`` to ``queue`` semantics + whenever this helper returns True protects subagent work from + conversational follow-ups while leaving the explicit ``/stop`` + path (which goes through ``_interrupt_and_clear_session``) + untouched. Safe-by-default: returns False on any attribute or + lock error so a missing/broken parent never blocks the existing + interrupt path. + """ + if running_agent is None or running_agent is _AGENT_PENDING_SENTINEL: + return False + children = getattr(running_agent, "_active_children", None) + # AIAgent always initialises this as a concrete list (see + # agent/agent_init.py). Reject anything that isn't a real + # collection — this guards against ``MagicMock()._active_children`` + # auto-creating a truthy stub in tests and triggering the demotion + # against an agent that doesn't actually have subagents. + if not isinstance(children, (list, tuple, set)): + return False + if not children: + return False + lock = getattr(running_agent, "_active_children_lock", None) + try: + if lock is not None: + with lock: + return bool(children) + return bool(children) + except Exception: + return False + def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None: adapter = self.adapters.get(event.source.platform) if not adapter: @@ -3105,6 +3143,25 @@ class GatewayRunner: # queueing + interrupting. If the agent isn't running yet # (sentinel) or lacks steer(), or the payload is empty, fall back # to queue semantics so nothing is lost. + # #30170 — Subagent protection. ``AIAgent.interrupt()`` cascades + # to every entry in the parent's ``_active_children`` list and + # aborts in-flight ``delegate_task`` work. Demote ``interrupt`` + # to ``queue`` when the parent is currently driving subagents so + # a conversational follow-up doesn't destroy minutes of subagent + # work. Explicit ``/stop`` and ``/new`` slash commands go through + # ``_interrupt_and_clear_session`` and are unaffected — the + # operator still has a way to force-cancel everything. + demoted_for_subagents = ( + effective_mode == "interrupt" + and self._agent_has_active_subagents(running_agent) + ) + if demoted_for_subagents: + logger.info( + "Demoting busy_input_mode 'interrupt' to 'queue' for session %s " + "because the running agent has active subagents (#30170)", + session_key, + ) + effective_mode = "queue" steered = False if effective_mode == "steer": steer_text = (event.text or "").strip() @@ -3192,6 +3249,14 @@ class GatewayRunner: f"⏩ Steered into current run{status_detail}. " f"Your message arrives after the next tool call." ) + elif is_queue_mode and demoted_for_subagents: + # #30170 — explain the demotion so the user knows their + # follow-up didn't accidentally kill the subagent and + # discovers `/stop` as the explicit escape hatch. + message = ( + f"⏳ Subagent working{status_detail} — your message is queued for " + f"when it finishes (use /stop to cancel everything)." + ) elif is_queue_mode: message = ( f"⏳ Queued for the next turn{status_detail}. " @@ -7246,6 +7311,22 @@ class GatewayRunner: logger.debug("PRIORITY steer-fallback-to-queue for session %s", _quick_key) self._queue_or_replace_pending_event(_quick_key, event) return None + # #30170 — Subagent protection (PRIORITY path). Same rationale + # as ``_handle_active_session_busy_message``: an interrupt + # cascades through ``_active_children`` and aborts in-flight + # delegate_task work. Demote to queue semantics when the + # parent is currently driving subagents so a conversational + # follow-up doesn't destroy minutes of subagent progress. + # /stop reaches its dedicated handler above (line ~6771), so + # the operator still has a clean escape hatch. + if self._agent_has_active_subagents(running_agent): + logger.info( + "PRIORITY interrupt demoted to queue for session %s " + "because the running agent has active subagents (#30170)", + _quick_key, + ) + self._queue_or_replace_pending_event(_quick_key, event) + return None logger.debug("PRIORITY interrupt for session %s", _quick_key) running_agent.interrupt(event.text) # NOTE: self._pending_messages was write-only (never consumed).