feat(delegation): background fan-out — parallel subagents, one consolidated return (#49734)

* feat(delegation): single-task delegate_task always runs in the background

The model no longer decides whether a subagent runs in the background — a
single-task delegate_task from the top-level agent is now always dispatched
async, so the parent turn returns immediately and the subagent's result
re-enters the conversation when it finishes.

- run_agent._dispatch_delegate_task (the live model path) forces
  background=True for top-level single-task calls; the schema-level
  `background` param is ignored.
- A batch (tasks with >1 item) stays synchronous (fan-out can't go async).
- A delegation from an orchestrator subagent (depth > 0) stays synchronous —
  it needs its workers' results within its own turn.
- The function-level default is unchanged, so direct Python callers/tests keep
  the historical synchronous behavior.
- On async-pool capacity rejection, single-task now falls through to a
  synchronous run instead of erroring (the child stays attached for interrupt
  propagation; detach happens only on a successful dispatch).
- Schema `background` param marked deprecated/ignored; tool description
  updated to state the always-background single-task rule.

* feat(delegation): all delegate_task fan-out runs in the background

Extend the always-background behavior to the full fan-out. A batch is now
dispatched as N independent async subagents (one handle each), instead of
running synchronously. Single task and batch both return immediately; each
subagent's result re-enters the conversation as its own message when it
finishes.

- delegate_task: when background is set, loop over ALL built children and
  dispatch each via dispatch_async_delegation; return a combined handle block
  (count + per-task delegation_ids). Children the async pool rejects (at
  capacity) run synchronously inline and are reported alongside the dispatched
  handles, so nothing is silently dropped.
- run_agent._dispatch_delegate_task + registry handler: force background for
  any top-level model delegation (single OR batch); orchestrator subagents
  (depth > 0) still run synchronously since they need workers' results within
  their own turn.
- Removed the v1 'batch async not supported' rejection.
- Tool description updated: BOTH MODES RUN IN THE BACKGROUND.
- Tests updated to assert batch fan-out dispatches each task async (verified
  E2E: 3-task batch -> 3 independent completion-queue events).

* fix(delegation): background fan-out joins and returns one consolidated block

Correct the fan-out semantics: a backgrounded batch is dispatched as ONE
async unit (one handle, one async-pool slot), not N independent dispatches.
The unit runs all children in parallel, waits on every one, and emits a
SINGLE completion event carrying the consolidated per-task results. The chat
is never blocked; when all subagents finish, their full summaries re-enter
the conversation together as one message.

- async_delegation.dispatch_async_delegation_batch + _finalize_batch: a batch
  occupies one slot; its runner returns the combined {results:[...]} dict and
  one event with the full results list is pushed to the completion queue.
- delegate_tool: extract the sync execution+aggregation into
  _execute_and_aggregate(); background dispatches it via the batch unit and
  returns one handle; on pool-capacity rejection it runs the batch inline.
- process_registry._format_async_delegation: render a consolidated multi-task
  block (TASK i/N + per-task summary) when the event carries is_batch/results.
- Tests updated; E2E verified: 3-task batch -> immediate return -> one combined
  completion block with all three summaries.
This commit is contained in:
Teknium 2026-06-20 11:27:12 -07:00 committed by GitHub
parent 680732c104
commit ea8a8b4af8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 719 additions and 331 deletions

View file

@ -5197,6 +5197,18 @@ class AIAgent:
invocation paths (concurrent, sequential, inline).
"""
from tools.delegate_tool import delegate_task as _delegate_task
# Delegations from the top-level MODEL always run in the background —
# the model does not get to choose. delegate_task returns immediately
# with a handle (one per task) and each subagent's result re-enters the
# conversation as a new message when it finishes. This applies to BOTH
# a single task and a fan-out batch (each task becomes its own
# independent background subagent). The one exception:
# - A delegation from an ORCHESTRATOR SUBAGENT (depth > 0) stays
# synchronous: the orchestrator needs its workers' results within
# its own turn to compose a summary, and a subagent doesn't own the
# gateway session the async result would route back to.
# The schema-level `background` param is intentionally ignored here.
_is_subagent = getattr(self, "_delegate_depth", 0) > 0
return _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
@ -5206,7 +5218,7 @@ class AIAgent:
acp_command=function_args.get("acp_command"),
acp_args=function_args.get("acp_args"),
role=function_args.get("role"),
background=function_args.get("background"),
background=(not _is_subagent),
parent_agent=self,
)

View file

@ -227,7 +227,8 @@ def test_completed_records_pruned_to_cap():
def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch):
"""delegate_task(background=True) returns a handle without running the
child synchronously, and the child completes on the background thread."""
child synchronously, and the child completes on the background thread.
A single task is dispatched as a one-item background batch unit."""
from unittest.mock import MagicMock, patch
import tools.delegate_tool as dt
@ -235,6 +236,8 @@ def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch):
parent._delegate_depth = 0
parent.session_id = "sess"
parent._interrupt_requested = False
parent._active_children = []
parent._active_children_lock = None
fake_child = MagicMock()
fake_child._delegate_role = "leaf"
fake_child._subagent_id = "s1"
@ -253,55 +256,170 @@ def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch):
"model": "m", "provider": None, "base_url": None, "api_key": None,
"api_mode": None, "command": None, "args": None,
}
with patch.object(dt, "_build_child_agent", return_value=fake_child), \
patch.object(dt, "_run_single_child", side_effect=slow_child), \
patch.object(dt, "_resolve_delegation_credentials", return_value=creds):
out = dt.delegate_task(
goal="the real task", context="ctx", toolsets=["web"],
background=True, parent_agent=parent,
)
# monkeypatch (not `with`) so patches outlive delegate_task's return and
# remain active while the background worker runs.
monkeypatch.setattr(dt, "_build_child_agent", lambda **kw: fake_child)
monkeypatch.setattr(dt, "_run_single_child", slow_child)
monkeypatch.setattr(dt, "_resolve_delegation_credentials", lambda *a, **k: creds)
out = dt.delegate_task(
goal="the real task", context="ctx", toolsets=["web"],
background=True, parent_agent=parent,
)
import json
parsed = json.loads(out)
assert parsed["status"] == "dispatched"
assert parsed["mode"] == "background"
assert parsed["delegation_id"].startswith("deleg_")
# The real non-blocking invariant (environment-independent — no wall-clock
# threshold that flakes on a loaded CI runner): delegate_task returned
# while the child is STILL blocked on the closed gate, so no completion
# event exists yet. A synchronous impl could not have returned here — it
# would still be inside slow_child waiting on the gate.
# Non-blocking invariant: delegate_task returned while the child is STILL
# blocked on the closed gate, so no completion event exists yet.
assert process_registry.completion_queue.empty()
assert ad.active_count() == 1 # child running in background, not finished
assert ad.active_count() == 1 # one background batch unit, not finished
gate.set()
evt = _drain_one()
assert evt is not None
assert evt["type"] == "async_delegation"
assert evt["summary"] == "done: the real task"
# Single task rides the batch path → carries a 1-item results list.
assert evt.get("is_batch") is True
assert len(evt["results"]) == 1
assert evt["results"][0]["summary"] == "done: the real task"
text = format_process_notification(evt)
assert text is not None
assert "the real task" in text and "ctx" in text
assert "the real task" in text
def test_delegate_task_background_rejects_batch(monkeypatch):
"""background=True with a multi-item tasks batch is rejected (v1: single-task only)."""
def test_delegate_task_background_batch_runs_as_one_unit(monkeypatch):
"""A multi-item batch with background=True dispatches the WHOLE fan-out as
ONE background unit (one handle, one async slot). The children run in
parallel and join; the consolidated results come back as a single
completion event when ALL of them finish."""
import json
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
parent._interrupt_requested = False
parent._active_children = []
parent._active_children_lock = None
fake_child = MagicMock()
fake_child._delegate_role = "leaf"
gate = threading.Event()
def _blocking_child(task_index, goal, child=None, parent_agent=None, **kw):
gate.wait(timeout=5)
return {
"task_index": task_index, "status": "completed",
"summary": f"done: {goal}", "api_calls": 1,
"duration_seconds": 0.1, "model": "m", "exit_reason": "completed",
}
creds = {
"model": "m", "provider": None, "base_url": None, "api_key": None,
"api_mode": None, "command": None, "args": None,
}
# Use monkeypatch (not a `with` block) so the patches stay active while the
# background worker thread runs _execute_and_aggregate AFTER delegate_task
# has already returned.
monkeypatch.setattr(dt, "_build_child_agent", lambda **kw: fake_child)
monkeypatch.setattr(dt, "_run_single_child", _blocking_child)
monkeypatch.setattr(dt, "_resolve_delegation_credentials", lambda *a, **k: creds)
out = dt.delegate_task(
tasks=[{"goal": "a"}, {"goal": "b"}],
tasks=[{"goal": "a"}, {"goal": "b"}, {"goal": "c"}],
background=True,
parent_agent=parent,
)
parsed = json.loads(out)
assert "error" in parsed
assert "single-task only" in parsed["error"]
assert parsed["status"] == "dispatched"
assert parsed["mode"] == "background"
assert parsed["count"] == 3
assert parsed["delegation_id"].startswith("deleg_")
assert parsed["goals"] == ["a", "b", "c"]
# ONE background unit for the whole fan-out (not three), and the call
# returned while all children are still blocked → chat not blocked.
assert process_registry.completion_queue.empty()
assert ad.active_count() == 1
# Release the children; the whole batch joins and emits ONE event.
gate.set()
evt = _drain_one()
assert evt is not None
assert evt["type"] == "async_delegation"
assert evt.get("is_batch") is True
assert len(evt["results"]) == 3
summaries = sorted(r["summary"] for r in evt["results"])
assert summaries == ["done: a", "done: b", "done: c"]
# The consolidated notification names all three tasks in one block.
text = format_process_notification(evt)
assert text is not None
assert "TASK 1/3" in text and "TASK 2/3" in text and "TASK 3/3" in text
assert "done: a" in text and "done: b" in text and "done: c" in text
# No more events — it's a single combined completion, not N of them.
assert _drain_one() is None
def test_model_dispatch_forces_background():
"""The MODEL-facing dispatch path forces background=True for any top-level
delegation (single task OR batch), and keeps it off for an orchestrator
subagent (depth > 0). Direct delegate_task() callers are unaffected (they
keep the synchronous default)."""
import tools.delegate_tool as dt
from unittest.mock import MagicMock
top = MagicMock()
top._delegate_depth = 0
sub = MagicMock()
sub._delegate_depth = 1
# Registry-fallback helper: top-level always background, regardless of
# single vs batch; subagent never.
assert dt._model_background_value({"goal": "x"}, top) is True
assert dt._model_background_value(
{"tasks": [{"goal": "a"}, {"goal": "b"}]}, top
) is True
assert dt._model_background_value({"tasks": [{"goal": "a"}]}, top) is True
assert dt._model_background_value({"goal": "x"}, sub) is False
assert dt._model_background_value(
{"tasks": [{"goal": "a"}, {"goal": "b"}]}, sub
) is False
def test_run_agent_dispatch_forces_background():
"""run_agent._dispatch_delegate_task — the live model path — forces
background on for any top-level delegation (single OR batch) and off for a
subagent."""
from unittest.mock import patch
import run_agent
class _FakeAgent:
_delegate_depth = 0
captured = {}
def _fake_delegate(**kwargs):
captured.update(kwargs)
return "{}"
with patch("tools.delegate_tool.delegate_task", _fake_delegate):
agent = _FakeAgent()
run_agent.AIAgent._dispatch_delegate_task(agent, {"goal": "x"})
assert captured["background"] is True
run_agent.AIAgent._dispatch_delegate_task(
agent, {"tasks": [{"goal": "a"}, {"goal": "b"}]}
)
assert captured["background"] is True
sub = _FakeAgent()
sub._delegate_depth = 1
run_agent.AIAgent._dispatch_delegate_task(sub, {"goal": "x"})
assert captured["background"] is False
def test_delegate_task_background_detaches_child_from_parent(monkeypatch):

View file

@ -334,6 +334,176 @@ def _push_completion_event(
)
def dispatch_async_delegation_batch(
*,
goals: List[str],
context: Optional[str],
toolsets: Optional[List[str]],
role: str,
model: Optional[str],
session_key: str,
runner: Callable[[], Dict[str, Any]],
interrupt_fn: Optional[Callable[[], None]] = None,
max_async_children: int = _DEFAULT_MAX_ASYNC_CHILDREN,
) -> Dict[str, Any]:
"""Dispatch a WHOLE fan-out batch as ONE background unit.
Unlike ``dispatch_async_delegation`` (which backs a single subagent),
``runner`` here runs the entire batch it builds and joins on every child
in parallel and returns the combined ``{"results": [...],
"total_duration_seconds": N}`` dict that the synchronous path would have
returned. We occupy ONE async slot for the whole batch (the in-batch
parallelism is bounded separately by ``max_concurrent_children``), so a
single ``delegate_task`` fan-out never exhausts the async pool by itself.
When the batch finishes, a SINGLE completion event is pushed onto the
shared ``process_registry.completion_queue`` carrying the full per-task
``results`` list, so the consolidated summaries re-enter the conversation
as one message once every child is done the chat is never blocked while
they run.
Returns ``{"status": "dispatched", "delegation_id": ...}`` on success or
``{"status": "rejected", "error": ...}`` when the async pool is at
capacity.
"""
delegation_id = _new_delegation_id()
dispatched_at = time.time()
n = len(goals)
# A combined goal label for status listings / the completion header.
combined_goal = (
goals[0] if n == 1 else f"{n} parallel subagents: " + "; ".join(g[:40] for g in goals)
)
record: Dict[str, Any] = {
"delegation_id": delegation_id,
"goal": combined_goal,
"goals": list(goals),
"context": context,
"toolsets": list(toolsets) if toolsets else None,
"role": role,
"model": model,
"session_key": session_key,
"status": "running",
"dispatched_at": dispatched_at,
"completed_at": None,
"interrupt_fn": interrupt_fn,
"is_batch": True,
}
with _records_lock:
running = sum(
1 for r in _records.values() if r.get("status") == "running"
)
if running >= max_async_children:
return {
"status": "rejected",
"error": (
f"Async delegation capacity reached ({max_async_children} "
f"running). Wait for one to finish (its result will re-enter "
f"the chat), or raise delegation.max_async_children in "
f"config.yaml to allow more concurrent background units."
),
}
_records[delegation_id] = record
executor = _get_executor(max_async_children)
def _worker() -> None:
combined: Dict[str, Any] = {}
status = "error"
try:
combined = runner() or {}
# Batch status: completed unless every child errored/was interrupted.
child_results = combined.get("results") or []
if child_results and all(
(r.get("status") not in ("completed", "success"))
for r in child_results
):
status = "error"
else:
status = "completed"
except Exception as exc: # noqa: BLE001 — must never crash the worker
logger.exception("Async delegation batch %s crashed", delegation_id)
combined = {
"results": [],
"error": f"{type(exc).__name__}: {exc}",
"total_duration_seconds": round(time.time() - dispatched_at, 2),
}
status = "error"
finally:
_finalize_batch(delegation_id, combined, status)
try:
executor.submit(_worker)
except Exception as exc: # pragma: no cover
with _records_lock:
_records.pop(delegation_id, None)
return {
"status": "rejected",
"error": f"Failed to schedule async delegation batch: {exc}",
}
logger.info(
"Dispatched async delegation batch %s (%d task(s), session_key=%s)",
delegation_id, n, session_key or "<cli>",
)
return {"status": "dispatched", "delegation_id": delegation_id}
def _finalize_batch(
delegation_id: str, combined: Dict[str, Any], status: str
) -> None:
"""Mark a batch record complete and push ONE combined completion event."""
with _records_lock:
record = _records.get(delegation_id)
if record is None:
return
record["status"] = status
record["completed_at"] = time.time()
record["interrupt_fn"] = None
event_record = dict(record)
_prune_completed_locked()
try:
from tools.process_registry import process_registry
except Exception as exc: # pragma: no cover
logger.error(
"Async delegation batch %s finished but process_registry import "
"failed; result lost: %s",
delegation_id, exc,
)
return
dispatched_at = event_record.get("dispatched_at") or time.time()
completed_at = event_record.get("completed_at") or time.time()
evt = {
"type": "async_delegation",
"delegation_id": delegation_id,
"session_key": event_record.get("session_key", ""),
"goal": event_record.get("goal", ""),
"goals": event_record.get("goals"),
"context": event_record.get("context"),
"toolsets": event_record.get("toolsets"),
"role": event_record.get("role"),
"model": event_record.get("model"),
"status": status,
"is_batch": True,
# The full per-task results list — the formatter renders a
# consolidated multi-task block from this.
"results": combined.get("results") or [],
"error": combined.get("error"),
"total_duration_seconds": combined.get("total_duration_seconds"),
"dispatched_at": dispatched_at,
"completed_at": completed_at,
}
try:
process_registry.completion_queue.put(evt)
except Exception as exc: # pragma: no cover
logger.error(
"Async delegation batch %s: failed to enqueue completion event; "
"result lost: %s",
delegation_id, exc,
)
def list_async_delegations() -> List[Dict[str, Any]]:
"""Snapshot of async delegations (running + recently completed).

View file

@ -2103,18 +2103,12 @@ def delegate_task(
# Normalise the top-level role once; per-task overrides re-normalise.
top_role = _normalize_role(role)
# Async (background) delegation is single-task only in v1. A batch carries
# fan-out semantics (N handles, partial completion) that double the state
# model — reject early with a clear message rather than silently running
# the batch synchronously.
# Background (async) delegation now applies to BOTH single tasks and
# batches. A batch simply becomes N independent async dispatches: each
# child runs on the daemon executor and re-enters the conversation via
# the completion queue on its own, carrying its own handle. There's no
# combined "wait for all" — fan-out is exactly N background subagents.
background = is_truthy_value(background, default=False) if background is not None else False
if background and tasks and isinstance(tasks, list) and len(tasks) > 1:
return tool_error(
"background=true is single-task only. Dispatch one background "
"subagent per delegate_task call (each returns its own handle and "
"re-enters the conversation independently), or run the batch "
"synchronously with background=false."
)
# Depth limit — configurable via delegation.max_spawn_depth,
# default 2 for parity with the original MAX_DEPTH constant.
@ -2250,150 +2244,101 @@ def delegate_task(
# Authoritative restore: reset global to parent's tool names after all children built
_model_tools._last_resolved_tool_names = _parent_tool_names
if n_tasks == 1:
# Single task -- run directly (no thread pool overhead)
_i, _t, child = children[0]
def _execute_and_aggregate() -> dict:
"""Run all built children (1 or N), join on them, aggregate results,
fire subagent_stop hooks + cost rollup, and return the combined result
dict. Used by BOTH the synchronous path and the background runner. In
the background case this whole function runs on the daemon executor, so
the parent turn isn't blocked — but the batch still JOINS on itself
here (all children must finish) before producing ONE consolidated
results block. That is the contract: fan-out runs in the background,
waits on each other, and returns together.
"""
if n_tasks == 1:
# Single task -- run directly (no thread pool overhead)
_i, _t, child = children[0]
result = _run_single_child(_i, _t["goal"], child, parent_agent)
results.append(result)
else:
# Batch -- run in parallel with per-task progress lines
completed_count = 0
spinner_ref = getattr(parent_agent, "_delegate_spinner", None)
# ----- Async / background dispatch -----
# When background=true, hand the already-built child to the async
# delegation registry and return a handle immediately. The child runs
# on a daemon executor; its result re-enters the conversation as a
# fresh turn via process_registry.completion_queue (see
# tools/async_delegation.py). Batch async is intentionally NOT
# supported in v1 — the rejection is handled before we get here.
if background:
from tools.async_delegation import dispatch_async_delegation
from tools.approval import get_current_session_key
with ThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
_run_single_child,
task_index=i,
goal=t["goal"],
child=child,
parent_agent=parent_agent,
)
futures[future] = i
# Capture the gateway routing key on THIS (parent) thread — the
# daemon worker won't carry the session contextvar.
_session_key = get_current_session_key(default="")
# Poll futures with interrupt checking. as_completed() blocks
# until ALL futures finish — if a child agent gets stuck,
# the parent blocks forever even after interrupt propagation.
# Instead, use wait() with a short timeout so we can bail
# when the parent is interrupted.
# Map task_index -> child agent, so fabricated entries for
# still-pending futures can carry the correct _delegate_role.
_child_by_index = {i: child for (i, _, child) in children}
# Detach the child from the parent's interrupt-propagation list.
# _build_child_agent registered it there (correct for sync
# children, which block the parent's turn), but a BACKGROUND
# child must survive parent-turn interrupts (Ctrl+C, mid-turn
# steering), cache evicts (release_clients), and session close
# (/new) — otherwise the detached subagent dies with whatever
# the parent was doing when it was dispatched. Its lifecycle is
# owned by the async-delegation registry (interrupt_fn below),
# and _run_single_child's finally block closes its resources
# when it finishes.
if hasattr(parent_agent, "_active_children"):
try:
_ac_lock = getattr(parent_agent, "_active_children_lock", None)
if _ac_lock:
with _ac_lock:
parent_agent._active_children.remove(child)
else:
parent_agent._active_children.remove(child)
except ValueError:
pass
def _async_runner(_child=child, _goal=_t["goal"]):
return _run_single_child(0, _goal, _child, parent_agent)
def _async_interrupt(_child=child):
try:
if hasattr(_child, "interrupt"):
_child.interrupt("Async delegation cancelled")
elif hasattr(_child, "_interrupt_requested"):
_child._interrupt_requested = True
except Exception:
pass
dispatch = dispatch_async_delegation(
goal=_t["goal"],
context=_t.get("context"),
toolsets=_t.get("toolsets") or toolsets,
role=_normalize_role(_t.get("role") or top_role),
model=creds["model"],
session_key=_session_key,
runner=_async_runner,
interrupt_fn=_async_interrupt,
max_async_children=_get_max_async_children(),
)
if dispatch.get("status") == "dispatched":
return json.dumps(
{
"status": "dispatched",
"delegation_id": dispatch["delegation_id"],
"goal": _t["goal"],
"mode": "background",
"note": (
"Subagent is running in the background. You and the "
"user can keep working; the full task source and "
"result will re-enter the conversation as a new "
"message when it finishes. Do not wait or poll — "
"just continue."
),
},
ensure_ascii=False,
)
# Rejected (at capacity or schedule failure) — surface as a tool
# error so the model can fall back to synchronous delegation.
return tool_error(
dispatch.get("error", "Async delegation could not be scheduled.")
)
result = _run_single_child(0, _t["goal"], child, parent_agent)
results.append(result)
else:
# Batch -- run in parallel with per-task progress lines
completed_count = 0
spinner_ref = getattr(parent_agent, "_delegate_spinner", None)
with ThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
_run_single_child,
task_index=i,
goal=t["goal"],
child=child,
parent_agent=parent_agent,
)
futures[future] = i
# Poll futures with interrupt checking. as_completed() blocks
# until ALL futures finish — if a child agent gets stuck,
# the parent blocks forever even after interrupt propagation.
# Instead, use wait() with a short timeout so we can bail
# when the parent is interrupted.
# Map task_index -> child agent, so fabricated entries for
# still-pending futures can carry the correct _delegate_role.
_child_by_index = {i: child for (i, _, child) in children}
pending = set(futures.keys())
while pending:
if getattr(parent_agent, "_interrupt_requested", False) is True:
# Parent interrupted — collect whatever finished and
# abandon the rest. Children already received the
# interrupt signal; we just can't wait forever.
for f in pending:
idx = futures[f]
if f.done():
try:
entry = f.result()
except Exception as exc:
pending = set(futures.keys())
while pending:
if getattr(parent_agent, "_interrupt_requested", False) is True:
# Parent interrupted — collect whatever finished and
# abandon the rest. Children already received the
# interrupt signal; we just can't wait forever.
for f in pending:
idx = futures[f]
if f.done():
try:
entry = f.result()
except Exception as exc:
entry = {
"task_index": idx,
"status": "error",
"summary": None,
"error": str(exc),
"api_calls": 0,
"duration_seconds": 0,
"_child_role": getattr(
_child_by_index.get(idx), "_delegate_role", None
),
}
else:
entry = {
"task_index": idx,
"status": "error",
"status": "interrupted",
"summary": None,
"error": str(exc),
"error": "Parent agent interrupted — child did not finish in time",
"api_calls": 0,
"duration_seconds": 0,
"_child_role": getattr(
_child_by_index.get(idx), "_delegate_role", None
),
}
else:
results.append(entry)
completed_count += 1
break
from concurrent.futures import wait as _cf_wait, FIRST_COMPLETED
done, pending = _cf_wait(
pending, timeout=0.5, return_when=FIRST_COMPLETED
)
for future in done:
try:
entry = future.result()
except Exception as exc:
idx = futures[future]
entry = {
"task_index": idx,
"status": "interrupted",
"status": "error",
"summary": None,
"error": "Parent agent interrupted — child did not finish in time",
"error": str(exc),
"api_calls": 0,
"duration_seconds": 0,
"_child_role": getattr(
@ -2402,165 +2347,229 @@ def delegate_task(
}
results.append(entry)
completed_count += 1
break
from concurrent.futures import wait as _cf_wait, FIRST_COMPLETED
done, pending = _cf_wait(
pending, timeout=0.5, return_when=FIRST_COMPLETED
)
for future in done:
try:
entry = future.result()
except Exception as exc:
idx = futures[future]
entry = {
"task_index": idx,
"status": "error",
"summary": None,
"error": str(exc),
"api_calls": 0,
"duration_seconds": 0,
"_child_role": getattr(
_child_by_index.get(idx), "_delegate_role", None
),
}
results.append(entry)
completed_count += 1
# Print per-task completion line above the spinner
idx = entry["task_index"]
label = (
task_labels[idx] if idx < len(task_labels) else f"Task {idx}"
)
dur = entry.get("duration_seconds", 0)
status = entry.get("status", "?")
icon = "" if status == "completed" else ""
remaining = n_tasks - completed_count
completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)"
if spinner_ref:
try:
spinner_ref.print_above(completion_line)
except Exception:
# Print per-task completion line above the spinner
idx = entry["task_index"]
label = (
task_labels[idx] if idx < len(task_labels) else f"Task {idx}"
)
dur = entry.get("duration_seconds", 0)
status = entry.get("status", "?")
icon = "" if status == "completed" else ""
remaining = n_tasks - completed_count
completion_line = f"{icon} [{idx+1}/{n_tasks}] {label} ({dur}s)"
if spinner_ref:
try:
spinner_ref.print_above(completion_line)
except Exception:
print(f" {completion_line}")
else:
print(f" {completion_line}")
else:
print(f" {completion_line}")
# Update spinner text to show remaining count
if spinner_ref and remaining > 0:
try:
spinner_ref.update_text(
f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining"
)
except Exception as e:
logger.debug("Spinner update_text failed: %s", e)
# Update spinner text to show remaining count
if spinner_ref and remaining > 0:
try:
spinner_ref.update_text(
f"🔀 {remaining} task{'s' if remaining != 1 else ''} remaining"
)
except Exception as e:
logger.debug("Spinner update_text failed: %s", e)
# Sort by task_index so results match input order
results.sort(key=lambda r: r["task_index"])
# Sort by task_index so results match input order
results.sort(key=lambda r: r["task_index"])
# Notify parent's memory provider of delegation outcomes
if (
parent_agent
and hasattr(parent_agent, "_memory_manager")
and parent_agent._memory_manager
):
for entry in results:
try:
_task_goal = (
task_list[entry["task_index"]]["goal"]
if entry["task_index"] < len(task_list)
else ""
)
parent_agent._memory_manager.on_delegation(
task=_task_goal,
result=entry.get("summary", "") or "",
child_session_id=(
getattr(children[entry["task_index"]][2], "session_id", "")
if entry["task_index"] < len(children)
# Notify parent's memory provider of delegation outcomes
if (
parent_agent
and hasattr(parent_agent, "_memory_manager")
and parent_agent._memory_manager
):
for entry in results:
try:
_task_goal = (
task_list[entry["task_index"]]["goal"]
if entry["task_index"] < len(task_list)
else ""
),
)
parent_agent._memory_manager.on_delegation(
task=_task_goal,
result=entry.get("summary", "") or "",
child_session_id=(
getattr(children[entry["task_index"]][2], "session_id", "")
if entry["task_index"] < len(children)
else ""
),
)
except Exception:
pass
# Fire subagent_stop hooks once per child, serialised on the parent thread.
# This keeps Python-plugin and shell-hook callbacks off of the worker threads
# that ran the children, so hook authors don't need to reason about
# concurrent invocation. Role was captured into the entry dict in
# _run_single_child (or the fabricated-entry branches above) before the
# child was closed.
_parent_session_id = getattr(parent_agent, "session_id", None)
try:
from hermes_cli.plugins import invoke_hook as _invoke_hook
except Exception:
_invoke_hook = None
# Aggregate child spend here so the parent's footer/UI reflect the true
# cost of a subagent-heavy turn. Port of Kilo-Org/kilocode#9448. Each
# child's cost was captured in _run_single_child before its AIAgent was
# closed; we fold them into the parent in one pass alongside the
# subagent_stop hook loop so we don't walk `results` twice.
_children_cost_total = 0.0
for entry in results:
child_role = entry.pop("_child_role", None)
child_cost = entry.pop("_child_cost_usd", 0.0)
try:
if child_cost:
_children_cost_total += float(child_cost)
except (TypeError, ValueError):
pass
if _invoke_hook is None:
continue
try:
_child_index = entry.get("task_index", -1)
_child_agent = (
children[_child_index][2]
if isinstance(_child_index, int) and 0 <= _child_index < len(children)
else None
)
_invoke_hook(
"subagent_stop",
parent_session_id=_parent_session_id,
parent_turn_id=getattr(parent_agent, "_current_turn_id", "") or "",
child_session_id=getattr(_child_agent, "session_id", None),
child_role=child_role,
child_summary=entry.get("summary"),
child_status=entry.get("status"),
duration_ms=int((entry.get("duration_seconds") or 0) * 1000),
)
except Exception:
pass
logger.debug("subagent_stop hook invocation failed", exc_info=True)
# Fire subagent_stop hooks once per child, serialised on the parent thread.
# This keeps Python-plugin and shell-hook callbacks off of the worker threads
# that ran the children, so hook authors don't need to reason about
# concurrent invocation. Role was captured into the entry dict in
# _run_single_child (or the fabricated-entry branches above) before the
# child was closed.
_parent_session_id = getattr(parent_agent, "session_id", None)
try:
from hermes_cli.plugins import invoke_hook as _invoke_hook
except Exception:
_invoke_hook = None
# Aggregate child spend here so the parent's footer/UI reflect the true
# cost of a subagent-heavy turn. Port of Kilo-Org/kilocode#9448. Each
# child's cost was captured in _run_single_child before its AIAgent was
# closed; we fold them into the parent in one pass alongside the
# subagent_stop hook loop so we don't walk `results` twice.
_children_cost_total = 0.0
for entry in results:
child_role = entry.pop("_child_role", None)
child_cost = entry.pop("_child_cost_usd", 0.0)
try:
if child_cost:
_children_cost_total += float(child_cost)
except (TypeError, ValueError):
pass
if _invoke_hook is None:
continue
try:
_child_index = entry.get("task_index", -1)
_child_agent = (
children[_child_index][2]
if isinstance(_child_index, int) and 0 <= _child_index < len(children)
else None
)
_invoke_hook(
"subagent_stop",
parent_session_id=_parent_session_id,
parent_turn_id=getattr(parent_agent, "_current_turn_id", "") or "",
child_session_id=getattr(_child_agent, "session_id", None),
child_role=child_role,
child_summary=entry.get("summary"),
child_status=entry.get("status"),
duration_ms=int((entry.get("duration_seconds") or 0) * 1000),
)
except Exception:
logger.debug("subagent_stop hook invocation failed", exc_info=True)
# Fold the aggregated child cost into the parent's session total. This is
# additive — each delegate_task call contributes its own children — so
# nested orchestrator→worker trees roll up naturally: each layer's own
# delegate_task() folds its direct children in, and when the orchestrator
# itself finishes, its parent folds the orchestrator's now-inflated total
# on top. Degrades silently if the parent lacks the counter (older test
# fixtures, etc.).
if _children_cost_total > 0.0:
try:
current = float(getattr(parent_agent, "session_estimated_cost_usd", 0.0) or 0.0)
parent_agent.session_estimated_cost_usd = current + _children_cost_total
# Upgrade the cost_source so the UI doesn't label a partially-real
# total as "none" when the parent itself hadn't billed any calls
# yet (rare but possible when the parent's only action this turn
# was delegate_task).
if getattr(parent_agent, "session_cost_source", "none") in {None, "", "none"}:
parent_agent.session_cost_source = "subagent"
if getattr(parent_agent, "session_cost_status", "unknown") in {None, "", "unknown"}:
parent_agent.session_cost_status = "estimated"
except Exception:
logger.debug("Subagent cost rollup failed", exc_info=True)
# Fold the aggregated child cost into the parent's session total. This is
# additive — each delegate_task call contributes its own children — so
# nested orchestrator→worker trees roll up naturally: each layer's own
# delegate_task() folds its direct children in, and when the orchestrator
# itself finishes, its parent folds the orchestrator's now-inflated total
# on top. Degrades silently if the parent lacks the counter (older test
# fixtures, etc.).
if _children_cost_total > 0.0:
try:
current = float(getattr(parent_agent, "session_estimated_cost_usd", 0.0) or 0.0)
parent_agent.session_estimated_cost_usd = current + _children_cost_total
# Upgrade the cost_source so the UI doesn't label a partially-real
# total as "none" when the parent itself hadn't billed any calls
# yet (rare but possible when the parent's only action this turn
# was delegate_task).
if getattr(parent_agent, "session_cost_source", "none") in {None, "", "none"}:
parent_agent.session_cost_source = "subagent"
if getattr(parent_agent, "session_cost_status", "unknown") in {None, "", "unknown"}:
parent_agent.session_cost_status = "estimated"
except Exception:
logger.debug("Subagent cost rollup failed", exc_info=True)
total_duration = round(time.monotonic() - overall_start, 2)
total_duration = round(time.monotonic() - overall_start, 2)
return json.dumps(
{
return {
"results": results,
"total_duration_seconds": total_duration,
},
ensure_ascii=False,
)
}
# ----- Background dispatch: run the WHOLE batch as one async unit -----
# When background is true, the entire fan-out runs on the daemon executor
# via a single async delegation. _execute_and_aggregate() joins on every
# child and produces ONE consolidated results block, which re-enters the
# conversation as a single message when ALL children finish. The chat is
# not blocked in the meantime. This is the contract: dispatch N subagents,
# keep chatting, get the combined summaries back together at the end.
if background:
from tools.async_delegation import dispatch_async_delegation_batch
from tools.approval import get_current_session_key
_session_key = get_current_session_key(default="")
_child_agents = [c for (_, _, c) in children]
# Detach every child from the parent's interrupt-propagation list — the
# batch's lifecycle is owned by the async registry now, not the parent
# turn. _build_child_agent attached them (correct for sync runs).
if hasattr(parent_agent, "_active_children"):
_ac_lock = getattr(parent_agent, "_active_children_lock", None)
for _c in _child_agents:
try:
if _ac_lock:
with _ac_lock:
parent_agent._active_children.remove(_c)
else:
parent_agent._active_children.remove(_c)
except ValueError:
pass
def _batch_runner():
return _execute_and_aggregate()
def _batch_interrupt():
for _c in _child_agents:
try:
if hasattr(_c, "interrupt"):
_c.interrupt("Async delegation cancelled")
elif hasattr(_c, "_interrupt_requested"):
_c._interrupt_requested = True
except Exception:
pass
_goals = [t["goal"] for t in task_list]
dispatch = dispatch_async_delegation_batch(
goals=_goals,
context=context,
toolsets=toolsets,
role=top_role,
model=creds["model"],
session_key=_session_key,
runner=_batch_runner,
interrupt_fn=_batch_interrupt,
max_async_children=_get_max_async_children(),
)
if dispatch.get("status") == "dispatched":
n = len(_goals)
note = (
"Subagent is running in the background. You and the user can "
"keep working; its full result re-enters the conversation as a "
"new message when it finishes. Do not wait or poll — just "
"continue."
if n == 1 else
f"{n} subagents are running in parallel in the background. You "
f"and the user can keep working; they wait on each other and "
f"their consolidated results re-enter the conversation as a "
f"single message once ALL of them finish. Do not wait or poll "
f"— just continue."
)
payload = {
"status": "dispatched",
"mode": "background",
"count": n,
"delegation_id": dispatch["delegation_id"],
"goals": _goals,
"note": note,
}
return json.dumps(payload, ensure_ascii=False)
# Pool at capacity / schedule failure — children are still attached
# (we detach above only on the parent list, but the async unit was
# never accepted, so re-attaching isn't needed: we just run inline).
logger.info(
"delegate_task: async pool at capacity (%s); running the whole "
"batch synchronously instead.",
dispatch.get("error", "rejected"),
)
return json.dumps(_execute_and_aggregate(), ensure_ascii=False)
# ----- Synchronous path -----
return json.dumps(_execute_and_aggregate(), ensure_ascii=False)
def _resolve_child_credential_pool(
@ -2842,11 +2851,16 @@ def _build_top_level_description() -> str:
"Only the final summary is returned -- intermediate tool results "
"never enter your context window.\n\n"
"TWO MODES (one of 'goal' or 'tasks' is required):\n"
"1. Single task: provide 'goal' (+ optional context, toolsets)\n"
"1. Single task: provide 'goal' (+ optional context, toolsets).\n"
f"2. Batch (parallel): provide 'tasks' array with up to {max_children} "
f"items concurrently for this user (configured via "
f"delegation.max_concurrent_children in config.yaml). "
f"All run in parallel and results are returned together. {nesting_clause}\n\n"
f"delegation.max_concurrent_children in config.yaml). {nesting_clause}\n\n"
"BOTH MODES RUN IN THE BACKGROUND. delegate_task returns immediately — "
"you and the user keep working, and each subagent's full result "
"re-enters the conversation as its own new message when it finishes. A "
"batch is just N independent background subagents (N handles, each "
"completes on its own). Do NOT wait or poll; just continue with other "
"work after dispatching.\n\n"
"WHEN TO USE delegate_task:\n"
"- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
"- Tasks that would flood your context with intermediate data\n"
@ -2857,11 +2871,10 @@ def _build_top_level_description() -> str:
"- Tasks needing user interaction -> subagents cannot use clarify\n"
"- Durable long-running work that must outlive the current turn -> "
"use cronjob (action='create') or terminal(background=True, "
"notify_on_complete=True) instead. delegate_task runs SYNCHRONOUSLY "
"inside the parent turn: if the parent is interrupted (user sends a "
"new message, /stop, /new) the child is cancelled with status="
"'interrupted' and its work is discarded. Children cannot continue "
"in the background.\n\n"
"notify_on_complete=True) instead. Background delegations are NOT "
"durable: if the parent session is closed (/new) or the process exits "
"before a subagent finishes, that subagent's work is discarded, and "
"/stop cancels every running background subagent.\n\n"
"IMPORTANT:\n"
"- Subagents have NO memory of your conversation. Pass all relevant "
"info (file paths, error messages, constraints) via the 'context' field.\n"
@ -3059,19 +3072,13 @@ DELEGATE_TASK_SCHEMA = {
"background": {
"type": "boolean",
"description": (
"Run the subagent asynchronously in the BACKGROUND "
"instead of blocking this turn. When true, delegate_task "
"returns immediately with a delegation_id; you and the "
"user keep working while the subagent runs, and its full "
"result re-enters the conversation as a new message when "
"it finishes (similar to terminal background=true + "
"notify_on_complete). The re-injected message includes the "
"original goal/context so you can act on it even after "
"moving on. Single-task only — cannot be combined with the "
"'tasks' batch array. Use for long-running independent work "
"the user shouldn't have to wait on (research, builds, "
"multi-step investigations). Do NOT poll or wait after "
"dispatching — just continue; the result will come to you."
"DEPRECATED / IGNORED. Single-task delegations always run "
"in the background automatically — you do not need to (and "
"cannot) opt in or out. The result re-enters the "
"conversation as a new message when the subagent finishes; "
"just continue working in the meantime. Setting this has no "
"effect; the parameter remains only for backward "
"compatibility."
),
},
"acp_command": {
@ -3105,6 +3112,23 @@ DELEGATE_TASK_SCHEMA = {
# --- Registry ---
from tools.registry import registry, tool_error
def _model_background_value(args: dict, parent_agent=None) -> bool:
"""Background flag for the MODEL-facing dispatch path (registry fallback).
Delegations from the top-level agent always run in the background the
model does not choose. This applies to both a single task and a fan-out
batch (each task becomes its own independent background subagent). The one
exception is a delegation from an orchestrator subagent (depth > 0), which
needs its workers' results within its own turn. The live path is
``run_agent._dispatch_delegate_task``; this lambda mirrors it for the rare
case the intercept is bypassed. Direct Python callers of ``delegate_task``
keep the historical synchronous default.
"""
is_subagent = getattr(parent_agent, "_delegate_depth", 0) > 0
return not is_subagent
registry.register(
name="delegate_task",
toolset="delegation",
@ -3118,7 +3142,7 @@ registry.register(
acp_command=args.get("acp_command"),
acp_args=args.get("acp_args"),
role=args.get("role"),
background=args.get("background"),
background=_model_background_value(args, kw.get("parent_agent")),
parent_agent=kw.get("parent_agent"),
),
check_fn=check_delegate_requirements,

View file

@ -1572,6 +1572,70 @@ def _format_async_delegation(evt: dict) -> str:
dispatched_at = evt.get("dispatched_at")
completed_at = evt.get("completed_at") or _time.time()
# ----- Batch (fan-out) completion: consolidated multi-task block -----
# A whole delegate_task fan-out dispatched as one background unit finishes
# together and carries a per-task `results` list. Render every subagent's
# summary in one block so the model gets the consolidated outcome at once.
batch_results = evt.get("results")
if evt.get("is_batch") or isinstance(batch_results, list):
results = batch_results or []
goals = evt.get("goals") or []
n = len(results) if results else len(goals)
total_dur = evt.get("total_duration_seconds", duration)
lines = [
f"[ASYNC DELEGATION BATCH COMPLETE — {deleg_id}]",
f"A background fan-out of {n} subagent(s) you dispatched earlier "
"has finished. All ran in parallel and waited on each other; their "
"consolidated results are below. You may have moved on since "
"dispatching — act on these or re-dispatch if things have changed.",
"",
]
if isinstance(dispatched_at, (int, float)):
ts = _time.strftime("%Y-%m-%d %H:%M:%S", _time.localtime(dispatched_at))
age = f" ({_format_age(completed_at - dispatched_at)} ago)"
lines.append(f"Dispatched: {ts}{age}")
if context:
lines.append(f"Context you provided: {context}")
if toolsets:
lines.append(f"Toolsets: {', '.join(toolsets)}")
lines.append(f"Role: {role} Model: {model} Total duration: {total_dur}s")
if error and not results:
lines.append("--- ERROR ---")
lines.append(f"The batch did not complete successfully: {error}")
return "\n".join(lines)
for r in sorted(results, key=lambda x: x.get("task_index", 0)):
idx = r.get("task_index", 0)
r_status = r.get("status", "?")
r_summary = r.get("summary")
r_error = r.get("error")
r_goal = goals[idx] if idx < len(goals) else r.get("goal", "")
icon = "" if r_status in ("completed", "success") else ""
lines.append("")
header = f"--- {icon} TASK {idx + 1}/{n}"
if r_goal:
header += f": {r_goal}"
header += f" (status={r_status}"
if r.get("api_calls"):
header += f", api_calls={r['api_calls']}"
if r.get("duration_seconds") is not None:
header += f", {r['duration_seconds']}s"
header += ") ---"
lines.append(header)
if r_status in ("completed", "success") and r_summary:
lines.append(r_summary)
elif r_summary:
if r_error:
lines.append(f"({r_status}: {r_error})")
lines.append("Partial output:")
lines.append(r_summary)
else:
lines.append(
f"(no summary — status={r_status}"
+ (f": {r_error}" if r_error else "")
+ ")"
)
return "\n".join(lines)
age = ""
if isinstance(dispatched_at, (int, float)):
age = f" ({_format_age(completed_at - dispatched_at)} ago)"