diff --git a/gateway/run.py b/gateway/run.py index 696f9b29b81..469db48e5b1 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3034,6 +3034,44 @@ class GatewayRunner: if agent is not _AGENT_PENDING_SENTINEL } + @staticmethod + def _agent_has_active_subagents(running_agent: Any) -> bool: + """Return True when *running_agent* is currently driving subagents + via the ``delegate_task`` tool. + + Background (#30170): ``AIAgent.interrupt()`` cascades through the + parent's ``_active_children`` list and calls ``interrupt()`` on + every child synchronously, which aborts in-flight subagent work + and produces a fallback cascade with no actionable signal. + Demoting ``busy_input_mode='interrupt'`` to ``queue`` semantics + whenever this helper returns True protects subagent work from + conversational follow-ups while leaving the explicit ``/stop`` + path (which goes through ``_interrupt_and_clear_session``) + untouched. Safe-by-default: returns False on any attribute or + lock error so a missing/broken parent never blocks the existing + interrupt path. + """ + if running_agent is None or running_agent is _AGENT_PENDING_SENTINEL: + return False + children = getattr(running_agent, "_active_children", None) + # AIAgent always initialises this as a concrete list (see + # agent/agent_init.py). Reject anything that isn't a real + # collection — this guards against ``MagicMock()._active_children`` + # auto-creating a truthy stub in tests and triggering the demotion + # against an agent that doesn't actually have subagents. + if not isinstance(children, (list, tuple, set)): + return False + if not children: + return False + lock = getattr(running_agent, "_active_children_lock", None) + try: + if lock is not None: + with lock: + return bool(children) + return bool(children) + except Exception: + return False + def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None: adapter = self.adapters.get(event.source.platform) if not adapter: @@ -3105,6 +3143,25 @@ class GatewayRunner: # queueing + interrupting. If the agent isn't running yet # (sentinel) or lacks steer(), or the payload is empty, fall back # to queue semantics so nothing is lost. + # #30170 — Subagent protection. ``AIAgent.interrupt()`` cascades + # to every entry in the parent's ``_active_children`` list and + # aborts in-flight ``delegate_task`` work. Demote ``interrupt`` + # to ``queue`` when the parent is currently driving subagents so + # a conversational follow-up doesn't destroy minutes of subagent + # work. Explicit ``/stop`` and ``/new`` slash commands go through + # ``_interrupt_and_clear_session`` and are unaffected — the + # operator still has a way to force-cancel everything. + demoted_for_subagents = ( + effective_mode == "interrupt" + and self._agent_has_active_subagents(running_agent) + ) + if demoted_for_subagents: + logger.info( + "Demoting busy_input_mode 'interrupt' to 'queue' for session %s " + "because the running agent has active subagents (#30170)", + session_key, + ) + effective_mode = "queue" steered = False if effective_mode == "steer": steer_text = (event.text or "").strip() @@ -3192,6 +3249,14 @@ class GatewayRunner: f"⏩ Steered into current run{status_detail}. " f"Your message arrives after the next tool call." ) + elif is_queue_mode and demoted_for_subagents: + # #30170 — explain the demotion so the user knows their + # follow-up didn't accidentally kill the subagent and + # discovers `/stop` as the explicit escape hatch. + message = ( + f"⏳ Subagent working{status_detail} — your message is queued for " + f"when it finishes (use /stop to cancel everything)." + ) elif is_queue_mode: message = ( f"⏳ Queued for the next turn{status_detail}. " @@ -7246,6 +7311,22 @@ class GatewayRunner: logger.debug("PRIORITY steer-fallback-to-queue for session %s", _quick_key) self._queue_or_replace_pending_event(_quick_key, event) return None + # #30170 — Subagent protection (PRIORITY path). Same rationale + # as ``_handle_active_session_busy_message``: an interrupt + # cascades through ``_active_children`` and aborts in-flight + # delegate_task work. Demote to queue semantics when the + # parent is currently driving subagents so a conversational + # follow-up doesn't destroy minutes of subagent progress. + # /stop reaches its dedicated handler above (line ~6771), so + # the operator still has a clean escape hatch. + if self._agent_has_active_subagents(running_agent): + logger.info( + "PRIORITY interrupt demoted to queue for session %s " + "because the running agent has active subagents (#30170)", + _quick_key, + ) + self._queue_or_replace_pending_event(_quick_key, event) + return None logger.debug("PRIORITY interrupt for session %s", _quick_key) running_agent.interrupt(event.text) # NOTE: self._pending_messages was write-only (never consumed).