From ea8a8b4af8612b655a5bbfc74eba21e1e806758d Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 20 Jun 2026 11:27:12 -0700 Subject: [PATCH] =?UTF-8?q?feat(delegation):=20background=20fan-out=20?= =?UTF-8?q?=E2=80=94=20parallel=20subagents,=20one=20consolidated=20return?= =?UTF-8?q?=20(#49734)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(delegation): single-task delegate_task always runs in the background The model no longer decides whether a subagent runs in the background — a single-task delegate_task from the top-level agent is now always dispatched async, so the parent turn returns immediately and the subagent's result re-enters the conversation when it finishes. - run_agent._dispatch_delegate_task (the live model path) forces background=True for top-level single-task calls; the schema-level `background` param is ignored. - A batch (tasks with >1 item) stays synchronous (fan-out can't go async). - A delegation from an orchestrator subagent (depth > 0) stays synchronous — it needs its workers' results within its own turn. - The function-level default is unchanged, so direct Python callers/tests keep the historical synchronous behavior. - On async-pool capacity rejection, single-task now falls through to a synchronous run instead of erroring (the child stays attached for interrupt propagation; detach happens only on a successful dispatch). - Schema `background` param marked deprecated/ignored; tool description updated to state the always-background single-task rule. * feat(delegation): all delegate_task fan-out runs in the background Extend the always-background behavior to the full fan-out. A batch is now dispatched as N independent async subagents (one handle each), instead of running synchronously. Single task and batch both return immediately; each subagent's result re-enters the conversation as its own message when it finishes. - delegate_task: when background is set, loop over ALL built children and dispatch each via dispatch_async_delegation; return a combined handle block (count + per-task delegation_ids). Children the async pool rejects (at capacity) run synchronously inline and are reported alongside the dispatched handles, so nothing is silently dropped. - run_agent._dispatch_delegate_task + registry handler: force background for any top-level model delegation (single OR batch); orchestrator subagents (depth > 0) still run synchronously since they need workers' results within their own turn. - Removed the v1 'batch async not supported' rejection. - Tool description updated: BOTH MODES RUN IN THE BACKGROUND. - Tests updated to assert batch fan-out dispatches each task async (verified E2E: 3-task batch -> 3 independent completion-queue events). * fix(delegation): background fan-out joins and returns one consolidated block Correct the fan-out semantics: a backgrounded batch is dispatched as ONE async unit (one handle, one async-pool slot), not N independent dispatches. The unit runs all children in parallel, waits on every one, and emits a SINGLE completion event carrying the consolidated per-task results. The chat is never blocked; when all subagents finish, their full summaries re-enter the conversation together as one message. - async_delegation.dispatch_async_delegation_batch + _finalize_batch: a batch occupies one slot; its runner returns the combined {results:[...]} dict and one event with the full results list is pushed to the completion queue. - delegate_tool: extract the sync execution+aggregation into _execute_and_aggregate(); background dispatches it via the batch unit and returns one handle; on pool-capacity rejection it runs the batch inline. - process_registry._format_async_delegation: render a consolidated multi-task block (TASK i/N + per-task summary) when the event carries is_batch/results. - Tests updated; E2E verified: 3-task batch -> immediate return -> one combined completion block with all three summaries. --- run_agent.py | 14 +- tests/tools/test_async_delegation.py | 162 ++++++- tools/async_delegation.py | 170 +++++++ tools/delegate_tool.py | 640 ++++++++++++++------------- tools/process_registry.py | 64 +++ 5 files changed, 719 insertions(+), 331 deletions(-) diff --git a/run_agent.py b/run_agent.py index 87ad09dd915..6f0d9cb1d56 100644 --- a/run_agent.py +++ b/run_agent.py @@ -5197,6 +5197,18 @@ class AIAgent: invocation paths (concurrent, sequential, inline). """ from tools.delegate_tool import delegate_task as _delegate_task + # Delegations from the top-level MODEL always run in the background — + # the model does not get to choose. delegate_task returns immediately + # with a handle (one per task) and each subagent's result re-enters the + # conversation as a new message when it finishes. This applies to BOTH + # a single task and a fan-out batch (each task becomes its own + # independent background subagent). The one exception: + # - A delegation from an ORCHESTRATOR SUBAGENT (depth > 0) stays + # synchronous: the orchestrator needs its workers' results within + # its own turn to compose a summary, and a subagent doesn't own the + # gateway session the async result would route back to. + # The schema-level `background` param is intentionally ignored here. + _is_subagent = getattr(self, "_delegate_depth", 0) > 0 return _delegate_task( goal=function_args.get("goal"), context=function_args.get("context"), @@ -5206,7 +5218,7 @@ class AIAgent: acp_command=function_args.get("acp_command"), acp_args=function_args.get("acp_args"), role=function_args.get("role"), - background=function_args.get("background"), + background=(not _is_subagent), parent_agent=self, ) diff --git a/tests/tools/test_async_delegation.py b/tests/tools/test_async_delegation.py index 5dbecfc4bf5..8c3f2e7c673 100644 --- a/tests/tools/test_async_delegation.py +++ b/tests/tools/test_async_delegation.py @@ -227,7 +227,8 @@ def test_completed_records_pruned_to_cap(): def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch): """delegate_task(background=True) returns a handle without running the - child synchronously, and the child completes on the background thread.""" + child synchronously, and the child completes on the background thread. + A single task is dispatched as a one-item background batch unit.""" from unittest.mock import MagicMock, patch import tools.delegate_tool as dt @@ -235,6 +236,8 @@ def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch): parent._delegate_depth = 0 parent.session_id = "sess" parent._interrupt_requested = False + parent._active_children = [] + parent._active_children_lock = None fake_child = MagicMock() fake_child._delegate_role = "leaf" fake_child._subagent_id = "s1" @@ -253,55 +256,170 @@ def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch): "model": "m", "provider": None, "base_url": None, "api_key": None, "api_mode": None, "command": None, "args": None, } - with patch.object(dt, "_build_child_agent", return_value=fake_child), \ - patch.object(dt, "_run_single_child", side_effect=slow_child), \ - patch.object(dt, "_resolve_delegation_credentials", return_value=creds): - out = dt.delegate_task( - goal="the real task", context="ctx", toolsets=["web"], - background=True, parent_agent=parent, - ) + # monkeypatch (not `with`) so patches outlive delegate_task's return and + # remain active while the background worker runs. + monkeypatch.setattr(dt, "_build_child_agent", lambda **kw: fake_child) + monkeypatch.setattr(dt, "_run_single_child", slow_child) + monkeypatch.setattr(dt, "_resolve_delegation_credentials", lambda *a, **k: creds) + out = dt.delegate_task( + goal="the real task", context="ctx", toolsets=["web"], + background=True, parent_agent=parent, + ) import json parsed = json.loads(out) assert parsed["status"] == "dispatched" assert parsed["mode"] == "background" assert parsed["delegation_id"].startswith("deleg_") - # The real non-blocking invariant (environment-independent — no wall-clock - # threshold that flakes on a loaded CI runner): delegate_task returned - # while the child is STILL blocked on the closed gate, so no completion - # event exists yet. A synchronous impl could not have returned here — it - # would still be inside slow_child waiting on the gate. + # Non-blocking invariant: delegate_task returned while the child is STILL + # blocked on the closed gate, so no completion event exists yet. assert process_registry.completion_queue.empty() - assert ad.active_count() == 1 # child running in background, not finished + assert ad.active_count() == 1 # one background batch unit, not finished gate.set() evt = _drain_one() assert evt is not None assert evt["type"] == "async_delegation" - assert evt["summary"] == "done: the real task" + # Single task rides the batch path → carries a 1-item results list. + assert evt.get("is_batch") is True + assert len(evt["results"]) == 1 + assert evt["results"][0]["summary"] == "done: the real task" text = format_process_notification(evt) assert text is not None - assert "the real task" in text and "ctx" in text + assert "the real task" in text -def test_delegate_task_background_rejects_batch(monkeypatch): - """background=True with a multi-item tasks batch is rejected (v1: single-task only).""" +def test_delegate_task_background_batch_runs_as_one_unit(monkeypatch): + """A multi-item batch with background=True dispatches the WHOLE fan-out as + ONE background unit (one handle, one async slot). The children run in + parallel and join; the consolidated results come back as a single + completion event when ALL of them finish.""" import json - from unittest.mock import MagicMock + from unittest.mock import MagicMock, patch import tools.delegate_tool as dt parent = MagicMock() parent._delegate_depth = 0 parent.session_id = "sess" + parent._interrupt_requested = False + parent._active_children = [] + parent._active_children_lock = None + fake_child = MagicMock() + fake_child._delegate_role = "leaf" + + gate = threading.Event() + + def _blocking_child(task_index, goal, child=None, parent_agent=None, **kw): + gate.wait(timeout=5) + return { + "task_index": task_index, "status": "completed", + "summary": f"done: {goal}", "api_calls": 1, + "duration_seconds": 0.1, "model": "m", "exit_reason": "completed", + } + + creds = { + "model": "m", "provider": None, "base_url": None, "api_key": None, + "api_mode": None, "command": None, "args": None, + } + + # Use monkeypatch (not a `with` block) so the patches stay active while the + # background worker thread runs _execute_and_aggregate AFTER delegate_task + # has already returned. + monkeypatch.setattr(dt, "_build_child_agent", lambda **kw: fake_child) + monkeypatch.setattr(dt, "_run_single_child", _blocking_child) + monkeypatch.setattr(dt, "_resolve_delegation_credentials", lambda *a, **k: creds) out = dt.delegate_task( - tasks=[{"goal": "a"}, {"goal": "b"}], + tasks=[{"goal": "a"}, {"goal": "b"}, {"goal": "c"}], background=True, parent_agent=parent, ) + parsed = json.loads(out) - assert "error" in parsed - assert "single-task only" in parsed["error"] + assert parsed["status"] == "dispatched" + assert parsed["mode"] == "background" + assert parsed["count"] == 3 + assert parsed["delegation_id"].startswith("deleg_") + assert parsed["goals"] == ["a", "b", "c"] + # ONE background unit for the whole fan-out (not three), and the call + # returned while all children are still blocked → chat not blocked. + assert process_registry.completion_queue.empty() + assert ad.active_count() == 1 + + # Release the children; the whole batch joins and emits ONE event. + gate.set() + evt = _drain_one() + assert evt is not None + assert evt["type"] == "async_delegation" + assert evt.get("is_batch") is True + assert len(evt["results"]) == 3 + summaries = sorted(r["summary"] for r in evt["results"]) + assert summaries == ["done: a", "done: b", "done: c"] + # The consolidated notification names all three tasks in one block. + text = format_process_notification(evt) + assert text is not None + assert "TASK 1/3" in text and "TASK 2/3" in text and "TASK 3/3" in text + assert "done: a" in text and "done: b" in text and "done: c" in text + # No more events — it's a single combined completion, not N of them. + assert _drain_one() is None + + +def test_model_dispatch_forces_background(): + """The MODEL-facing dispatch path forces background=True for any top-level + delegation (single task OR batch), and keeps it off for an orchestrator + subagent (depth > 0). Direct delegate_task() callers are unaffected (they + keep the synchronous default).""" + import tools.delegate_tool as dt + from unittest.mock import MagicMock + + top = MagicMock() + top._delegate_depth = 0 + sub = MagicMock() + sub._delegate_depth = 1 + + # Registry-fallback helper: top-level always background, regardless of + # single vs batch; subagent never. + assert dt._model_background_value({"goal": "x"}, top) is True + assert dt._model_background_value( + {"tasks": [{"goal": "a"}, {"goal": "b"}]}, top + ) is True + assert dt._model_background_value({"tasks": [{"goal": "a"}]}, top) is True + assert dt._model_background_value({"goal": "x"}, sub) is False + assert dt._model_background_value( + {"tasks": [{"goal": "a"}, {"goal": "b"}]}, sub + ) is False + + +def test_run_agent_dispatch_forces_background(): + """run_agent._dispatch_delegate_task — the live model path — forces + background on for any top-level delegation (single OR batch) and off for a + subagent.""" + from unittest.mock import patch + import run_agent + + class _FakeAgent: + _delegate_depth = 0 + + captured = {} + + def _fake_delegate(**kwargs): + captured.update(kwargs) + return "{}" + + with patch("tools.delegate_tool.delegate_task", _fake_delegate): + agent = _FakeAgent() + run_agent.AIAgent._dispatch_delegate_task(agent, {"goal": "x"}) + assert captured["background"] is True + + run_agent.AIAgent._dispatch_delegate_task( + agent, {"tasks": [{"goal": "a"}, {"goal": "b"}]} + ) + assert captured["background"] is True + + sub = _FakeAgent() + sub._delegate_depth = 1 + run_agent.AIAgent._dispatch_delegate_task(sub, {"goal": "x"}) + assert captured["background"] is False def test_delegate_task_background_detaches_child_from_parent(monkeypatch): diff --git a/tools/async_delegation.py b/tools/async_delegation.py index 5975e9b1385..92f58c83afb 100644 --- a/tools/async_delegation.py +++ b/tools/async_delegation.py @@ -334,6 +334,176 @@ def _push_completion_event( ) +def dispatch_async_delegation_batch( + *, + goals: List[str], + context: Optional[str], + toolsets: Optional[List[str]], + role: str, + model: Optional[str], + session_key: str, + runner: Callable[[], Dict[str, Any]], + interrupt_fn: Optional[Callable[[], None]] = None, + max_async_children: int = _DEFAULT_MAX_ASYNC_CHILDREN, +) -> Dict[str, Any]: + """Dispatch a WHOLE fan-out batch as ONE background unit. + + Unlike ``dispatch_async_delegation`` (which backs a single subagent), + ``runner`` here runs the entire batch — it builds and joins on every child + in parallel and returns the combined ``{"results": [...], + "total_duration_seconds": N}`` dict that the synchronous path would have + returned. We occupy ONE async slot for the whole batch (the in-batch + parallelism is bounded separately by ``max_concurrent_children``), so a + single ``delegate_task`` fan-out never exhausts the async pool by itself. + + When the batch finishes, a SINGLE completion event is pushed onto the + shared ``process_registry.completion_queue`` carrying the full per-task + ``results`` list, so the consolidated summaries re-enter the conversation + as one message once every child is done — the chat is never blocked while + they run. + + Returns ``{"status": "dispatched", "delegation_id": ...}`` on success or + ``{"status": "rejected", "error": ...}`` when the async pool is at + capacity. + """ + delegation_id = _new_delegation_id() + dispatched_at = time.time() + n = len(goals) + # A combined goal label for status listings / the completion header. + combined_goal = ( + goals[0] if n == 1 else f"{n} parallel subagents: " + "; ".join(g[:40] for g in goals) + ) + record: Dict[str, Any] = { + "delegation_id": delegation_id, + "goal": combined_goal, + "goals": list(goals), + "context": context, + "toolsets": list(toolsets) if toolsets else None, + "role": role, + "model": model, + "session_key": session_key, + "status": "running", + "dispatched_at": dispatched_at, + "completed_at": None, + "interrupt_fn": interrupt_fn, + "is_batch": True, + } + with _records_lock: + running = sum( + 1 for r in _records.values() if r.get("status") == "running" + ) + if running >= max_async_children: + return { + "status": "rejected", + "error": ( + f"Async delegation capacity reached ({max_async_children} " + f"running). Wait for one to finish (its result will re-enter " + f"the chat), or raise delegation.max_async_children in " + f"config.yaml to allow more concurrent background units." + ), + } + _records[delegation_id] = record + + executor = _get_executor(max_async_children) + + def _worker() -> None: + combined: Dict[str, Any] = {} + status = "error" + try: + combined = runner() or {} + # Batch status: completed unless every child errored/was interrupted. + child_results = combined.get("results") or [] + if child_results and all( + (r.get("status") not in ("completed", "success")) + for r in child_results + ): + status = "error" + else: + status = "completed" + except Exception as exc: # noqa: BLE001 — must never crash the worker + logger.exception("Async delegation batch %s crashed", delegation_id) + combined = { + "results": [], + "error": f"{type(exc).__name__}: {exc}", + "total_duration_seconds": round(time.time() - dispatched_at, 2), + } + status = "error" + finally: + _finalize_batch(delegation_id, combined, status) + + try: + executor.submit(_worker) + except Exception as exc: # pragma: no cover + with _records_lock: + _records.pop(delegation_id, None) + return { + "status": "rejected", + "error": f"Failed to schedule async delegation batch: {exc}", + } + + logger.info( + "Dispatched async delegation batch %s (%d task(s), session_key=%s)", + delegation_id, n, session_key or "", + ) + return {"status": "dispatched", "delegation_id": delegation_id} + + +def _finalize_batch( + delegation_id: str, combined: Dict[str, Any], status: str +) -> None: + """Mark a batch record complete and push ONE combined completion event.""" + with _records_lock: + record = _records.get(delegation_id) + if record is None: + return + record["status"] = status + record["completed_at"] = time.time() + record["interrupt_fn"] = None + event_record = dict(record) + _prune_completed_locked() + + try: + from tools.process_registry import process_registry + except Exception as exc: # pragma: no cover + logger.error( + "Async delegation batch %s finished but process_registry import " + "failed; result lost: %s", + delegation_id, exc, + ) + return + + dispatched_at = event_record.get("dispatched_at") or time.time() + completed_at = event_record.get("completed_at") or time.time() + evt = { + "type": "async_delegation", + "delegation_id": delegation_id, + "session_key": event_record.get("session_key", ""), + "goal": event_record.get("goal", ""), + "goals": event_record.get("goals"), + "context": event_record.get("context"), + "toolsets": event_record.get("toolsets"), + "role": event_record.get("role"), + "model": event_record.get("model"), + "status": status, + "is_batch": True, + # The full per-task results list — the formatter renders a + # consolidated multi-task block from this. + "results": combined.get("results") or [], + "error": combined.get("error"), + "total_duration_seconds": combined.get("total_duration_seconds"), + "dispatched_at": dispatched_at, + "completed_at": completed_at, + } + try: + process_registry.completion_queue.put(evt) + except Exception as exc: # pragma: no cover + logger.error( + "Async delegation batch %s: failed to enqueue completion event; " + "result lost: %s", + delegation_id, exc, + ) + + def list_async_delegations() -> List[Dict[str, Any]]: """Snapshot of async delegations (running + recently completed). diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 2613b13a8db..2160bbc279b 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -2103,18 +2103,12 @@ def delegate_task( # Normalise the top-level role once; per-task overrides re-normalise. top_role = _normalize_role(role) - # Async (background) delegation is single-task only in v1. A batch carries - # fan-out semantics (N handles, partial completion) that double the state - # model — reject early with a clear message rather than silently running - # the batch synchronously. + # Background (async) delegation now applies to BOTH single tasks and + # batches. A batch simply becomes N independent async dispatches: each + # child runs on the daemon executor and re-enters the conversation via + # the completion queue on its own, carrying its own handle. There's no + # combined "wait for all" — fan-out is exactly N background subagents. background = is_truthy_value(background, default=False) if background is not None else False - if background and tasks and isinstance(tasks, list) and len(tasks) > 1: - return tool_error( - "background=true is single-task only. Dispatch one background " - "subagent per delegate_task call (each returns its own handle and " - "re-enters the conversation independently), or run the batch " - "synchronously with background=false." - ) # Depth limit — configurable via delegation.max_spawn_depth, # default 2 for parity with the original MAX_DEPTH constant. @@ -2250,150 +2244,101 @@ def delegate_task( # Authoritative restore: reset global to parent's tool names after all children built _model_tools._last_resolved_tool_names = _parent_tool_names - if n_tasks == 1: - # Single task -- run directly (no thread pool overhead) - _i, _t, child = children[0] + def _execute_and_aggregate() -> dict: + """Run all built children (1 or N), join on them, aggregate results, + fire subagent_stop hooks + cost rollup, and return the combined result + dict. Used by BOTH the synchronous path and the background runner. In + the background case this whole function runs on the daemon executor, so + the parent turn isn't blocked — but the batch still JOINS on itself + here (all children must finish) before producing ONE consolidated + results block. That is the contract: fan-out runs in the background, + waits on each other, and returns together. + """ + if n_tasks == 1: + # Single task -- run directly (no thread pool overhead) + _i, _t, child = children[0] + result = _run_single_child(_i, _t["goal"], child, parent_agent) + results.append(result) + else: + # Batch -- run in parallel with per-task progress lines + completed_count = 0 + spinner_ref = getattr(parent_agent, "_delegate_spinner", None) - # ----- Async / background dispatch ----- - # When background=true, hand the already-built child to the async - # delegation registry and return a handle immediately. The child runs - # on a daemon executor; its result re-enters the conversation as a - # fresh turn via process_registry.completion_queue (see - # tools/async_delegation.py). Batch async is intentionally NOT - # supported in v1 — the rejection is handled before we get here. - if background: - from tools.async_delegation import dispatch_async_delegation - from tools.approval import get_current_session_key + with ThreadPoolExecutor(max_workers=max_children) as executor: + futures = {} + for i, t, child in children: + future = executor.submit( + _run_single_child, + task_index=i, + goal=t["goal"], + child=child, + parent_agent=parent_agent, + ) + futures[future] = i - # Capture the gateway routing key on THIS (parent) thread — the - # daemon worker won't carry the session contextvar. - _session_key = get_current_session_key(default="") + # Poll futures with interrupt checking. as_completed() blocks + # until ALL futures finish — if a child agent gets stuck, + # the parent blocks forever even after interrupt propagation. + # Instead, use wait() with a short timeout so we can bail + # when the parent is interrupted. + # Map task_index -> child agent, so fabricated entries for + # still-pending futures can carry the correct _delegate_role. + _child_by_index = {i: child for (i, _, child) in children} - # Detach the child from the parent's interrupt-propagation list. - # _build_child_agent registered it there (correct for sync - # children, which block the parent's turn), but a BACKGROUND - # child must survive parent-turn interrupts (Ctrl+C, mid-turn - # steering), cache evicts (release_clients), and session close - # (/new) — otherwise the detached subagent dies with whatever - # the parent was doing when it was dispatched. Its lifecycle is - # owned by the async-delegation registry (interrupt_fn below), - # and _run_single_child's finally block closes its resources - # when it finishes. - if hasattr(parent_agent, "_active_children"): - try: - _ac_lock = getattr(parent_agent, "_active_children_lock", None) - if _ac_lock: - with _ac_lock: - parent_agent._active_children.remove(child) - else: - parent_agent._active_children.remove(child) - except ValueError: - pass - - def _async_runner(_child=child, _goal=_t["goal"]): - return _run_single_child(0, _goal, _child, parent_agent) - - def _async_interrupt(_child=child): - try: - if hasattr(_child, "interrupt"): - _child.interrupt("Async delegation cancelled") - elif hasattr(_child, "_interrupt_requested"): - _child._interrupt_requested = True - except Exception: - pass - - dispatch = dispatch_async_delegation( - goal=_t["goal"], - context=_t.get("context"), - toolsets=_t.get("toolsets") or toolsets, - role=_normalize_role(_t.get("role") or top_role), - model=creds["model"], - session_key=_session_key, - runner=_async_runner, - interrupt_fn=_async_interrupt, - max_async_children=_get_max_async_children(), - ) - - if dispatch.get("status") == "dispatched": - return json.dumps( - { - "status": "dispatched", - "delegation_id": dispatch["delegation_id"], - "goal": _t["goal"], - "mode": "background", - "note": ( - "Subagent is running in the background. You and the " - "user can keep working; the full task source and " - "result will re-enter the conversation as a new " - "message when it finishes. Do not wait or poll — " - "just continue." - ), - }, - ensure_ascii=False, - ) - # Rejected (at capacity or schedule failure) — surface as a tool - # error so the model can fall back to synchronous delegation. - return tool_error( - dispatch.get("error", "Async delegation could not be scheduled.") - ) - - result = _run_single_child(0, _t["goal"], child, parent_agent) - results.append(result) - else: - # Batch -- run in parallel with per-task progress lines - completed_count = 0 - spinner_ref = getattr(parent_agent, "_delegate_spinner", None) - - with ThreadPoolExecutor(max_workers=max_children) as executor: - futures = {} - for i, t, child in children: - future = executor.submit( - _run_single_child, - task_index=i, - goal=t["goal"], - child=child, - parent_agent=parent_agent, - ) - futures[future] = i - - # Poll futures with interrupt checking. as_completed() blocks - # until ALL futures finish — if a child agent gets stuck, - # the parent blocks forever even after interrupt propagation. - # Instead, use wait() with a short timeout so we can bail - # when the parent is interrupted. - # Map task_index -> child agent, so fabricated entries for - # still-pending futures can carry the correct _delegate_role. - _child_by_index = {i: child for (i, _, child) in children} - - pending = set(futures.keys()) - while pending: - if getattr(parent_agent, "_interrupt_requested", False) is True: - # Parent interrupted — collect whatever finished and - # abandon the rest. Children already received the - # interrupt signal; we just can't wait forever. - for f in pending: - idx = futures[f] - if f.done(): - try: - entry = f.result() - except Exception as exc: + pending = set(futures.keys()) + while pending: + if getattr(parent_agent, "_interrupt_requested", False) is True: + # Parent interrupted — collect whatever finished and + # abandon the rest. Children already received the + # interrupt signal; we just can't wait forever. + for f in pending: + idx = futures[f] + if f.done(): + try: + entry = f.result() + except Exception as exc: + entry = { + "task_index": idx, + "status": "error", + "summary": None, + "error": str(exc), + "api_calls": 0, + "duration_seconds": 0, + "_child_role": getattr( + _child_by_index.get(idx), "_delegate_role", None + ), + } + else: entry = { "task_index": idx, - "status": "error", + "status": "interrupted", "summary": None, - "error": str(exc), + "error": "Parent agent interrupted — child did not finish in time", "api_calls": 0, "duration_seconds": 0, "_child_role": getattr( _child_by_index.get(idx), "_delegate_role", None ), } - else: + results.append(entry) + completed_count += 1 + break + + from concurrent.futures import wait as _cf_wait, FIRST_COMPLETED + + done, pending = _cf_wait( + pending, timeout=0.5, return_when=FIRST_COMPLETED + ) + for future in done: + try: + entry = future.result() + except Exception as exc: + idx = futures[future] entry = { "task_index": idx, - "status": "interrupted", + "status": "error", "summary": None, - "error": "Parent agent interrupted — child did not finish in time", + "error": str(exc), "api_calls": 0, "duration_seconds": 0, "_child_role": getattr( @@ -2402,165 +2347,229 @@ def delegate_task( } results.append(entry) completed_count += 1 - break - from concurrent.futures import wait as _cf_wait, FIRST_COMPLETED - - done, pending = _cf_wait( - pending, timeout=0.5, return_when=FIRST_COMPLETED - ) - for future in done: - try: - entry = future.result() - except Exception as exc: - idx = futures[future] - entry = { - "task_index": idx, - "status": "error", - "summary": None, - "error": str(exc), - "api_calls": 0, - "duration_seconds": 0, - "_child_role": getattr( - _child_by_index.get(idx), "_delegate_role", None - ), - } - results.append(entry) - completed_count += 1 - - # Print per-task completion line above the spinner - idx = entry["task_index"] - label = ( - task_labels[idx] if idx < len(task_labels) else f"Task {idx}" - ) - dur = entry.get("duration_seconds", 0) - status = entry.get("status", "?") - icon = "✓" if status == "completed" else "✗" - remaining = n_tasks - completed_count - completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)" - if spinner_ref: - try: - spinner_ref.print_above(completion_line) - except Exception: + # Print per-task completion line above the spinner + idx = entry["task_index"] + label = ( + task_labels[idx] if idx < len(task_labels) else f"Task {idx}" + ) + dur = entry.get("duration_seconds", 0) + status = entry.get("status", "?") + icon = "✓" if status == "completed" else "✗" + remaining = n_tasks - completed_count + completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)" + if spinner_ref: + try: + spinner_ref.print_above(completion_line) + except Exception: + print(f" {completion_line}") + else: print(f" {completion_line}") - else: - print(f" {completion_line}") - # Update spinner text to show remaining count - if spinner_ref and remaining > 0: - try: - spinner_ref.update_text( - f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining" - ) - except Exception as e: - logger.debug("Spinner update_text failed: %s", e) + # Update spinner text to show remaining count + if spinner_ref and remaining > 0: + try: + spinner_ref.update_text( + f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining" + ) + except Exception as e: + logger.debug("Spinner update_text failed: %s", e) - # Sort by task_index so results match input order - results.sort(key=lambda r: r["task_index"]) + # Sort by task_index so results match input order + results.sort(key=lambda r: r["task_index"]) - # Notify parent's memory provider of delegation outcomes - if ( - parent_agent - and hasattr(parent_agent, "_memory_manager") - and parent_agent._memory_manager - ): - for entry in results: - try: - _task_goal = ( - task_list[entry["task_index"]]["goal"] - if entry["task_index"] < len(task_list) - else "" - ) - parent_agent._memory_manager.on_delegation( - task=_task_goal, - result=entry.get("summary", "") or "", - child_session_id=( - getattr(children[entry["task_index"]][2], "session_id", "") - if entry["task_index"] < len(children) + # Notify parent's memory provider of delegation outcomes + if ( + parent_agent + and hasattr(parent_agent, "_memory_manager") + and parent_agent._memory_manager + ): + for entry in results: + try: + _task_goal = ( + task_list[entry["task_index"]]["goal"] + if entry["task_index"] < len(task_list) else "" - ), + ) + parent_agent._memory_manager.on_delegation( + task=_task_goal, + result=entry.get("summary", "") or "", + child_session_id=( + getattr(children[entry["task_index"]][2], "session_id", "") + if entry["task_index"] < len(children) + else "" + ), + ) + except Exception: + pass + + # Fire subagent_stop hooks once per child, serialised on the parent thread. + # This keeps Python-plugin and shell-hook callbacks off of the worker threads + # that ran the children, so hook authors don't need to reason about + # concurrent invocation. Role was captured into the entry dict in + # _run_single_child (or the fabricated-entry branches above) before the + # child was closed. + _parent_session_id = getattr(parent_agent, "session_id", None) + try: + from hermes_cli.plugins import invoke_hook as _invoke_hook + except Exception: + _invoke_hook = None + # Aggregate child spend here so the parent's footer/UI reflect the true + # cost of a subagent-heavy turn. Port of Kilo-Org/kilocode#9448. Each + # child's cost was captured in _run_single_child before its AIAgent was + # closed; we fold them into the parent in one pass alongside the + # subagent_stop hook loop so we don't walk `results` twice. + _children_cost_total = 0.0 + for entry in results: + child_role = entry.pop("_child_role", None) + child_cost = entry.pop("_child_cost_usd", 0.0) + try: + if child_cost: + _children_cost_total += float(child_cost) + except (TypeError, ValueError): + pass + if _invoke_hook is None: + continue + try: + _child_index = entry.get("task_index", -1) + _child_agent = ( + children[_child_index][2] + if isinstance(_child_index, int) and 0 <= _child_index < len(children) + else None + ) + _invoke_hook( + "subagent_stop", + parent_session_id=_parent_session_id, + parent_turn_id=getattr(parent_agent, "_current_turn_id", "") or "", + child_session_id=getattr(_child_agent, "session_id", None), + child_role=child_role, + child_summary=entry.get("summary"), + child_status=entry.get("status"), + duration_ms=int((entry.get("duration_seconds") or 0) * 1000), ) except Exception: - pass + logger.debug("subagent_stop hook invocation failed", exc_info=True) - # Fire subagent_stop hooks once per child, serialised on the parent thread. - # This keeps Python-plugin and shell-hook callbacks off of the worker threads - # that ran the children, so hook authors don't need to reason about - # concurrent invocation. Role was captured into the entry dict in - # _run_single_child (or the fabricated-entry branches above) before the - # child was closed. - _parent_session_id = getattr(parent_agent, "session_id", None) - try: - from hermes_cli.plugins import invoke_hook as _invoke_hook - except Exception: - _invoke_hook = None - # Aggregate child spend here so the parent's footer/UI reflect the true - # cost of a subagent-heavy turn. Port of Kilo-Org/kilocode#9448. Each - # child's cost was captured in _run_single_child before its AIAgent was - # closed; we fold them into the parent in one pass alongside the - # subagent_stop hook loop so we don't walk `results` twice. - _children_cost_total = 0.0 - for entry in results: - child_role = entry.pop("_child_role", None) - child_cost = entry.pop("_child_cost_usd", 0.0) - try: - if child_cost: - _children_cost_total += float(child_cost) - except (TypeError, ValueError): - pass - if _invoke_hook is None: - continue - try: - _child_index = entry.get("task_index", -1) - _child_agent = ( - children[_child_index][2] - if isinstance(_child_index, int) and 0 <= _child_index < len(children) - else None - ) - _invoke_hook( - "subagent_stop", - parent_session_id=_parent_session_id, - parent_turn_id=getattr(parent_agent, "_current_turn_id", "") or "", - child_session_id=getattr(_child_agent, "session_id", None), - child_role=child_role, - child_summary=entry.get("summary"), - child_status=entry.get("status"), - duration_ms=int((entry.get("duration_seconds") or 0) * 1000), - ) - except Exception: - logger.debug("subagent_stop hook invocation failed", exc_info=True) + # Fold the aggregated child cost into the parent's session total. This is + # additive — each delegate_task call contributes its own children — so + # nested orchestrator→worker trees roll up naturally: each layer's own + # delegate_task() folds its direct children in, and when the orchestrator + # itself finishes, its parent folds the orchestrator's now-inflated total + # on top. Degrades silently if the parent lacks the counter (older test + # fixtures, etc.). + if _children_cost_total > 0.0: + try: + current = float(getattr(parent_agent, "session_estimated_cost_usd", 0.0) or 0.0) + parent_agent.session_estimated_cost_usd = current + _children_cost_total + # Upgrade the cost_source so the UI doesn't label a partially-real + # total as "none" when the parent itself hadn't billed any calls + # yet (rare but possible when the parent's only action this turn + # was delegate_task). + if getattr(parent_agent, "session_cost_source", "none") in {None, "", "none"}: + parent_agent.session_cost_source = "subagent" + if getattr(parent_agent, "session_cost_status", "unknown") in {None, "", "unknown"}: + parent_agent.session_cost_status = "estimated" + except Exception: + logger.debug("Subagent cost rollup failed", exc_info=True) - # Fold the aggregated child cost into the parent's session total. This is - # additive — each delegate_task call contributes its own children — so - # nested orchestrator→worker trees roll up naturally: each layer's own - # delegate_task() folds its direct children in, and when the orchestrator - # itself finishes, its parent folds the orchestrator's now-inflated total - # on top. Degrades silently if the parent lacks the counter (older test - # fixtures, etc.). - if _children_cost_total > 0.0: - try: - current = float(getattr(parent_agent, "session_estimated_cost_usd", 0.0) or 0.0) - parent_agent.session_estimated_cost_usd = current + _children_cost_total - # Upgrade the cost_source so the UI doesn't label a partially-real - # total as "none" when the parent itself hadn't billed any calls - # yet (rare but possible when the parent's only action this turn - # was delegate_task). - if getattr(parent_agent, "session_cost_source", "none") in {None, "", "none"}: - parent_agent.session_cost_source = "subagent" - if getattr(parent_agent, "session_cost_status", "unknown") in {None, "", "unknown"}: - parent_agent.session_cost_status = "estimated" - except Exception: - logger.debug("Subagent cost rollup failed", exc_info=True) + total_duration = round(time.monotonic() - overall_start, 2) - total_duration = round(time.monotonic() - overall_start, 2) - - return json.dumps( - { + return { "results": results, "total_duration_seconds": total_duration, - }, - ensure_ascii=False, - ) + } + + # ----- Background dispatch: run the WHOLE batch as one async unit ----- + # When background is true, the entire fan-out runs on the daemon executor + # via a single async delegation. _execute_and_aggregate() joins on every + # child and produces ONE consolidated results block, which re-enters the + # conversation as a single message when ALL children finish. The chat is + # not blocked in the meantime. This is the contract: dispatch N subagents, + # keep chatting, get the combined summaries back together at the end. + if background: + from tools.async_delegation import dispatch_async_delegation_batch + from tools.approval import get_current_session_key + + _session_key = get_current_session_key(default="") + _child_agents = [c for (_, _, c) in children] + + # Detach every child from the parent's interrupt-propagation list — the + # batch's lifecycle is owned by the async registry now, not the parent + # turn. _build_child_agent attached them (correct for sync runs). + if hasattr(parent_agent, "_active_children"): + _ac_lock = getattr(parent_agent, "_active_children_lock", None) + for _c in _child_agents: + try: + if _ac_lock: + with _ac_lock: + parent_agent._active_children.remove(_c) + else: + parent_agent._active_children.remove(_c) + except ValueError: + pass + + def _batch_runner(): + return _execute_and_aggregate() + + def _batch_interrupt(): + for _c in _child_agents: + try: + if hasattr(_c, "interrupt"): + _c.interrupt("Async delegation cancelled") + elif hasattr(_c, "_interrupt_requested"): + _c._interrupt_requested = True + except Exception: + pass + + _goals = [t["goal"] for t in task_list] + dispatch = dispatch_async_delegation_batch( + goals=_goals, + context=context, + toolsets=toolsets, + role=top_role, + model=creds["model"], + session_key=_session_key, + runner=_batch_runner, + interrupt_fn=_batch_interrupt, + max_async_children=_get_max_async_children(), + ) + + if dispatch.get("status") == "dispatched": + n = len(_goals) + note = ( + "Subagent is running in the background. You and the user can " + "keep working; its full result re-enters the conversation as a " + "new message when it finishes. Do not wait or poll — just " + "continue." + if n == 1 else + f"{n} subagents are running in parallel in the background. You " + f"and the user can keep working; they wait on each other and " + f"their consolidated results re-enter the conversation as a " + f"single message once ALL of them finish. Do not wait or poll " + f"— just continue." + ) + payload = { + "status": "dispatched", + "mode": "background", + "count": n, + "delegation_id": dispatch["delegation_id"], + "goals": _goals, + "note": note, + } + return json.dumps(payload, ensure_ascii=False) + + # Pool at capacity / schedule failure — children are still attached + # (we detach above only on the parent list, but the async unit was + # never accepted, so re-attaching isn't needed: we just run inline). + logger.info( + "delegate_task: async pool at capacity (%s); running the whole " + "batch synchronously instead.", + dispatch.get("error", "rejected"), + ) + return json.dumps(_execute_and_aggregate(), ensure_ascii=False) + + # ----- Synchronous path ----- + return json.dumps(_execute_and_aggregate(), ensure_ascii=False) def _resolve_child_credential_pool( @@ -2842,11 +2851,16 @@ def _build_top_level_description() -> str: "Only the final summary is returned -- intermediate tool results " "never enter your context window.\n\n" "TWO MODES (one of 'goal' or 'tasks' is required):\n" - "1. Single task: provide 'goal' (+ optional context, toolsets)\n" + "1. Single task: provide 'goal' (+ optional context, toolsets).\n" f"2. Batch (parallel): provide 'tasks' array with up to {max_children} " f"items concurrently for this user (configured via " - f"delegation.max_concurrent_children in config.yaml). " - f"All run in parallel and results are returned together. {nesting_clause}\n\n" + f"delegation.max_concurrent_children in config.yaml). {nesting_clause}\n\n" + "BOTH MODES RUN IN THE BACKGROUND. delegate_task returns immediately — " + "you and the user keep working, and each subagent's full result " + "re-enters the conversation as its own new message when it finishes. A " + "batch is just N independent background subagents (N handles, each " + "completes on its own). Do NOT wait or poll; just continue with other " + "work after dispatching.\n\n" "WHEN TO USE delegate_task:\n" "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n" "- Tasks that would flood your context with intermediate data\n" @@ -2857,11 +2871,10 @@ def _build_top_level_description() -> str: "- Tasks needing user interaction -> subagents cannot use clarify\n" "- Durable long-running work that must outlive the current turn -> " "use cronjob (action='create') or terminal(background=True, " - "notify_on_complete=True) instead. delegate_task runs SYNCHRONOUSLY " - "inside the parent turn: if the parent is interrupted (user sends a " - "new message, /stop, /new) the child is cancelled with status=" - "'interrupted' and its work is discarded. Children cannot continue " - "in the background.\n\n" + "notify_on_complete=True) instead. Background delegations are NOT " + "durable: if the parent session is closed (/new) or the process exits " + "before a subagent finishes, that subagent's work is discarded, and " + "/stop cancels every running background subagent.\n\n" "IMPORTANT:\n" "- Subagents have NO memory of your conversation. Pass all relevant " "info (file paths, error messages, constraints) via the 'context' field.\n" @@ -3059,19 +3072,13 @@ DELEGATE_TASK_SCHEMA = { "background": { "type": "boolean", "description": ( - "Run the subagent asynchronously in the BACKGROUND " - "instead of blocking this turn. When true, delegate_task " - "returns immediately with a delegation_id; you and the " - "user keep working while the subagent runs, and its full " - "result re-enters the conversation as a new message when " - "it finishes (similar to terminal background=true + " - "notify_on_complete). The re-injected message includes the " - "original goal/context so you can act on it even after " - "moving on. Single-task only — cannot be combined with the " - "'tasks' batch array. Use for long-running independent work " - "the user shouldn't have to wait on (research, builds, " - "multi-step investigations). Do NOT poll or wait after " - "dispatching — just continue; the result will come to you." + "DEPRECATED / IGNORED. Single-task delegations always run " + "in the background automatically — you do not need to (and " + "cannot) opt in or out. The result re-enters the " + "conversation as a new message when the subagent finishes; " + "just continue working in the meantime. Setting this has no " + "effect; the parameter remains only for backward " + "compatibility." ), }, "acp_command": { @@ -3105,6 +3112,23 @@ DELEGATE_TASK_SCHEMA = { # --- Registry --- from tools.registry import registry, tool_error + +def _model_background_value(args: dict, parent_agent=None) -> bool: + """Background flag for the MODEL-facing dispatch path (registry fallback). + + Delegations from the top-level agent always run in the background — the + model does not choose. This applies to both a single task and a fan-out + batch (each task becomes its own independent background subagent). The one + exception is a delegation from an orchestrator subagent (depth > 0), which + needs its workers' results within its own turn. The live path is + ``run_agent._dispatch_delegate_task``; this lambda mirrors it for the rare + case the intercept is bypassed. Direct Python callers of ``delegate_task`` + keep the historical synchronous default. + """ + is_subagent = getattr(parent_agent, "_delegate_depth", 0) > 0 + return not is_subagent + + registry.register( name="delegate_task", toolset="delegation", @@ -3118,7 +3142,7 @@ registry.register( acp_command=args.get("acp_command"), acp_args=args.get("acp_args"), role=args.get("role"), - background=args.get("background"), + background=_model_background_value(args, kw.get("parent_agent")), parent_agent=kw.get("parent_agent"), ), check_fn=check_delegate_requirements, diff --git a/tools/process_registry.py b/tools/process_registry.py index e9f3276ffb6..fdda0adc663 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -1572,6 +1572,70 @@ def _format_async_delegation(evt: dict) -> str: dispatched_at = evt.get("dispatched_at") completed_at = evt.get("completed_at") or _time.time() + # ----- Batch (fan-out) completion: consolidated multi-task block ----- + # A whole delegate_task fan-out dispatched as one background unit finishes + # together and carries a per-task `results` list. Render every subagent's + # summary in one block so the model gets the consolidated outcome at once. + batch_results = evt.get("results") + if evt.get("is_batch") or isinstance(batch_results, list): + results = batch_results or [] + goals = evt.get("goals") or [] + n = len(results) if results else len(goals) + total_dur = evt.get("total_duration_seconds", duration) + lines = [ + f"[ASYNC DELEGATION BATCH COMPLETE — {deleg_id}]", + f"A background fan-out of {n} subagent(s) you dispatched earlier " + "has finished. All ran in parallel and waited on each other; their " + "consolidated results are below. You may have moved on since " + "dispatching — act on these or re-dispatch if things have changed.", + "", + ] + if isinstance(dispatched_at, (int, float)): + ts = _time.strftime("%Y-%m-%d %H:%M:%S", _time.localtime(dispatched_at)) + age = f" ({_format_age(completed_at - dispatched_at)} ago)" + lines.append(f"Dispatched: {ts}{age}") + if context: + lines.append(f"Context you provided: {context}") + if toolsets: + lines.append(f"Toolsets: {', '.join(toolsets)}") + lines.append(f"Role: {role} Model: {model} Total duration: {total_dur}s") + if error and not results: + lines.append("--- ERROR ---") + lines.append(f"The batch did not complete successfully: {error}") + return "\n".join(lines) + for r in sorted(results, key=lambda x: x.get("task_index", 0)): + idx = r.get("task_index", 0) + r_status = r.get("status", "?") + r_summary = r.get("summary") + r_error = r.get("error") + r_goal = goals[idx] if idx < len(goals) else r.get("goal", "") + icon = "✓" if r_status in ("completed", "success") else "✗" + lines.append("") + header = f"--- {icon} TASK {idx + 1}/{n}" + if r_goal: + header += f": {r_goal}" + header += f" (status={r_status}" + if r.get("api_calls"): + header += f", api_calls={r['api_calls']}" + if r.get("duration_seconds") is not None: + header += f", {r['duration_seconds']}s" + header += ") ---" + lines.append(header) + if r_status in ("completed", "success") and r_summary: + lines.append(r_summary) + elif r_summary: + if r_error: + lines.append(f"({r_status}: {r_error})") + lines.append("Partial output:") + lines.append(r_summary) + else: + lines.append( + f"(no summary — status={r_status}" + + (f": {r_error}" if r_error else "") + + ")" + ) + return "\n".join(lines) + age = "" if isinstance(dispatched_at, (int, float)): age = f" ({_format_age(completed_at - dispatched_at)} ago)"