hermes-agent/tests/tools/test_async_delegation.py
Teknium c66ecf0bc3
feat(delegation): async background subagents via delegate_task(background=true) (#40946)
* feat(delegation): async background subagents via delegate_task(background=true)

delegate_task(background=true) dispatches a subagent that runs in the
background and returns a handle immediately, so the user and model keep
working while it runs. The full result — plus the original task source —
re-enters the conversation as a new turn when the subagent finishes,
riding the same completion-queue rail as terminal background processes.

- tools/async_delegation.py: daemon-executor registry, capacity cap,
  rich self-contained completion event pushed onto the shared
  process_registry.completion_queue (type='async_delegation').
- delegate_tool.py: background param + single-task dispatch branch;
  batch async rejected (v1).
- process_registry.py: format_process_notification renders the rich
  task-source block (goal/context/toolsets/model/status/result).
- gateway/run.py: dedicated _async_delegation_watcher drains + injects
  results into the originating session (idle + post-turn), session_key
  routing enrichment, shutdown interrupt of dangling delegations.
- config: delegation.max_async_children (default 3).

Reuses the existing idle-drain wiring rather than mutating a running
agent loop, preserving message-role alternation and prompt-cache
invariants. 13 targeted tests; CLI + gateway paths E2E-verified.

* test(delegation): make async non-blocking tests environment-independent

CI 'test (5)' flaked on a cold, 8-worker runner: the first
delegate_task(background=true) call measured 2.27s of one-time setup
(config load + child-agent construction + imports), tripping the
elapsed < 1.0 wall-clock assertion. That assertion was testing setup
overhead, not blocking.

Replace the wall-clock thresholds with the real invariant: dispatch
returns while the child is still gated (active_count == 1, completion
queue empty), which a synchronous impl could not do. Keep only a loose
4s sanity backstop well under the runner's 5s gate.

* fix(delegation): harden async background delegation

Follow-up review fixes:
- Detach background child from parent._active_children at dispatch —
  otherwise parent-turn interrupts (Ctrl+C, mid-turn steering), cache
  evicts (release_clients), and session close (/new) kill/close the
  detached subagent mid-run, defeating the point of background mode.
  Lifecycle is owned by the async registry's interrupt_fn.
- Make the capacity check atomic with the record insert (TOCTOU: two
  concurrent dispatches could both pass active_count() and exceed the cap).
- TUI dedup: key async_delegation events by delegation_id — the
  fallthrough keyed them all as ("", type), suppressing every completion
  after the first in the desktop/TUI status feed.
- CLI /stop now interrupts running background delegations and /agents
  lists them (they live outside the process registry and were invisible).
- Drop stray unbalanced ']' line from the re-injection block and the
  unused _ASYNC_DEFAULT import.

Tests: detach-at-dispatch + concurrent-capacity race added (15 total in
test_async_delegation.py); 137 delegate + 140 process-registry/notify/watch
+ 7 TUI dedup tests pass.

* fix(delegation): harden async background completion drains
2026-06-15 13:33:12 -07:00

473 lines
16 KiB
Python

"""Tests for async (background) delegation — tools/async_delegation.py.
Covers the dispatch handle, non-blocking behavior, completion-event delivery
onto the shared process_registry.completion_queue, the rich re-injection block
formatting, capacity rejection, and crash handling.
"""
import queue
import threading
import time
import pytest
from tools import async_delegation as ad
from tools.process_registry import process_registry, format_process_notification
@pytest.fixture(autouse=True)
def _clean_state():
ad._reset_for_tests()
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
yield
ad._reset_for_tests()
while not process_registry.completion_queue.empty():
process_registry.completion_queue.get_nowait()
def _drain_one(timeout=5.0):
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if not process_registry.completion_queue.empty():
return process_registry.completion_queue.get_nowait()
time.sleep(0.02)
return None
def test_dispatch_returns_immediately_without_blocking():
gate = threading.Event()
def runner():
gate.wait(timeout=5)
return {"status": "completed", "summary": "done", "api_calls": 1,
"duration_seconds": 0.1, "model": "m"}
t0 = time.monotonic()
res = ad.dispatch_async_delegation(
goal="g", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=runner, max_async_children=3,
)
elapsed = time.monotonic() - t0
assert res["status"] == "dispatched"
assert res["delegation_id"].startswith("deleg_")
# Non-blocking invariant: dispatch returned while the runner is still
# gated (active), so it cannot have waited on the gate. The active_count
# check is the environment-independent proof; the generous wall-clock
# bound is a loose sanity backstop, not the primary assertion (a loaded
# CI runner can be slow but never anywhere near the runner's 5s gate).
assert ad.active_count() == 1
assert elapsed < 4.0, f"dispatch blocked {elapsed:.2f}s (gate is 5s)"
gate.set()
def test_async_executor_workers_are_daemon_threads():
gate = threading.Event()
def runner():
gate.wait(timeout=5)
return {"status": "completed", "summary": "done"}
res = ad.dispatch_async_delegation(
goal="daemon check", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=runner, max_async_children=1,
)
assert res["status"] == "dispatched"
deadline = time.monotonic() + 2
worker = None
while time.monotonic() < deadline:
worker = next(
(t for t in threading.enumerate() if t.name.startswith("async-delegate")),
None,
)
if worker is not None:
break
time.sleep(0.02)
assert worker is not None
assert worker.daemon is True
gate.set()
assert _drain_one() is not None
def test_completion_event_lands_on_shared_queue_with_session_key():
def runner():
return {"status": "completed", "summary": "the result",
"api_calls": 3, "duration_seconds": 2.0, "model": "test-model"}
res = ad.dispatch_async_delegation(
goal="compute X", context="some context", toolsets=["web", "file"],
role="leaf", model="test-model", session_key="agent:main:cli:dm:local",
runner=runner, max_async_children=3,
)
assert res["status"] == "dispatched"
evt = _drain_one()
assert evt is not None
assert evt["type"] == "async_delegation"
assert evt["summary"] == "the result"
assert evt["session_key"] == "agent:main:cli:dm:local"
assert evt["delegation_id"] == res["delegation_id"]
def test_rich_reinjection_block_is_self_contained():
def runner():
return {"status": "completed", "summary": "The answer is 42.",
"api_calls": 7, "duration_seconds": 3.5, "model": "test-model"}
ad.dispatch_async_delegation(
goal="Compute the meaning of life",
context="User is a philosopher. Respond tersely.",
toolsets=["web"], role="leaf", model="test-model",
session_key="", runner=runner, max_async_children=3,
)
evt = _drain_one()
assert evt is not None
text = format_process_notification(evt)
assert text is not None
for needle in [
"ASYNC DELEGATION COMPLETE",
"Compute the meaning of life",
"User is a philosopher",
"Toolsets: web",
"The answer is 42.",
"Status: completed",
"API calls: 7",
]:
assert needle in text, f"missing {needle!r}"
def test_dispatch_rejected_at_capacity():
ev = threading.Event()
def blocker():
ev.wait(timeout=5)
return {"status": "completed", "summary": "x"}
for i in range(2):
r = ad.dispatch_async_delegation(
goal=f"task{i}", context=None, toolsets=None, role="leaf",
model="m", session_key="", runner=blocker, max_async_children=2,
)
assert r["status"] == "dispatched"
r3 = ad.dispatch_async_delegation(
goal="task3", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=blocker, max_async_children=2,
)
assert r3["status"] == "rejected"
assert "capacity reached" in r3["error"]
ev.set()
def test_crashed_runner_produces_error_completion():
def boom():
raise RuntimeError("subagent exploded")
r = ad.dispatch_async_delegation(
goal="risky", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=boom, max_async_children=3,
)
assert r["status"] == "dispatched"
evt = _drain_one()
assert evt is not None
assert evt["status"] == "error"
text = format_process_notification(evt)
assert text is not None
assert "did not complete successfully" in text
assert "subagent exploded" in text
def test_interrupt_all_signals_running_children():
ev = threading.Event()
interrupted = {"count": 0}
def blocker():
ev.wait(timeout=5)
return {"status": "interrupted", "summary": None,
"error": "cancelled"}
def interrupt_fn():
interrupted["count"] += 1
ev.set()
ad.dispatch_async_delegation(
goal="long task", context=None, toolsets=None, role="leaf",
model="m", session_key="", runner=blocker,
interrupt_fn=interrupt_fn, max_async_children=3,
)
n = ad.interrupt_all(reason="test")
assert n == 1
assert interrupted["count"] == 1
# child still emits a completion event after interrupt
evt = _drain_one()
assert evt is not None
assert evt["status"] == "interrupted"
def test_completed_records_pruned_to_cap():
# Run more than the retention cap quickly; ensure list doesn't grow forever.
for i in range(ad._MAX_RETAINED_COMPLETED + 10):
ad.dispatch_async_delegation(
goal=f"t{i}", context=None, toolsets=None, role="leaf", model="m",
session_key="", runner=lambda: {"status": "completed", "summary": "ok"},
max_async_children=ad._MAX_RETAINED_COMPLETED + 20,
)
# let workers finish
deadline = time.monotonic() + 10
while time.monotonic() < deadline and ad.active_count() > 0:
time.sleep(0.05)
assert len(ad.list_async_delegations()) <= ad._MAX_RETAINED_COMPLETED
# ---------------------------------------------------------------------------
# Integration: delegate_task(background=True) routing
# ---------------------------------------------------------------------------
def test_delegate_task_background_routes_async_and_does_not_block(monkeypatch):
"""delegate_task(background=True) returns a handle without running the
child synchronously, and the child completes on the background thread."""
from unittest.mock import MagicMock, patch
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
parent._interrupt_requested = False
fake_child = MagicMock()
fake_child._delegate_role = "leaf"
fake_child._subagent_id = "s1"
gate = threading.Event()
def slow_child(task_index, goal, child=None, parent_agent=None, **kw):
gate.wait(timeout=5) # a sync impl would hang delegate_task here
return {
"task_index": 0, "status": "completed", "summary": f"done: {goal}",
"api_calls": 1, "duration_seconds": 0.1, "model": "m",
"exit_reason": "completed",
}
creds = {
"model": "m", "provider": None, "base_url": None, "api_key": None,
"api_mode": None, "command": None, "args": None,
}
with patch.object(dt, "_build_child_agent", return_value=fake_child), \
patch.object(dt, "_run_single_child", side_effect=slow_child), \
patch.object(dt, "_resolve_delegation_credentials", return_value=creds):
out = dt.delegate_task(
goal="the real task", context="ctx", toolsets=["web"],
background=True, parent_agent=parent,
)
import json
parsed = json.loads(out)
assert parsed["status"] == "dispatched"
assert parsed["mode"] == "background"
assert parsed["delegation_id"].startswith("deleg_")
# The real non-blocking invariant (environment-independent — no wall-clock
# threshold that flakes on a loaded CI runner): delegate_task returned
# while the child is STILL blocked on the closed gate, so no completion
# event exists yet. A synchronous impl could not have returned here — it
# would still be inside slow_child waiting on the gate.
assert process_registry.completion_queue.empty()
assert ad.active_count() == 1 # child running in background, not finished
gate.set()
evt = _drain_one()
assert evt is not None
assert evt["type"] == "async_delegation"
assert evt["summary"] == "done: the real task"
text = format_process_notification(evt)
assert text is not None
assert "the real task" in text and "ctx" in text
def test_delegate_task_background_rejects_batch(monkeypatch):
"""background=True with a multi-item tasks batch is rejected (v1: single-task only)."""
import json
from unittest.mock import MagicMock
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
out = dt.delegate_task(
tasks=[{"goal": "a"}, {"goal": "b"}],
background=True,
parent_agent=parent,
)
parsed = json.loads(out)
assert "error" in parsed
assert "single-task only" in parsed["error"]
def test_delegate_task_background_detaches_child_from_parent(monkeypatch):
"""A background child must NOT remain in parent._active_children —
otherwise parent-turn interrupts / cache evicts / session close would
kill the detached subagent mid-run."""
from unittest.mock import MagicMock, patch
import tools.delegate_tool as dt
parent = MagicMock()
parent._delegate_depth = 0
parent.session_id = "sess"
parent._active_children = []
parent._active_children_lock = threading.Lock()
fake_child = MagicMock()
fake_child._delegate_role = "leaf"
fake_child._subagent_id = "s1"
gate = threading.Event()
def slow_child(task_index, goal, child=None, parent_agent=None, **kw):
gate.wait(timeout=5)
return {"task_index": 0, "status": "completed", "summary": "ok"}
def build_and_register(**kw):
# Mirror what the real _build_child_agent does: register the child
# for interrupt propagation.
parent._active_children.append(fake_child)
return fake_child
creds = {
"model": "m", "provider": None, "base_url": None, "api_key": None,
"api_mode": None, "command": None, "args": None,
}
with patch.object(dt, "_build_child_agent", side_effect=build_and_register), \
patch.object(dt, "_run_single_child", side_effect=slow_child), \
patch.object(dt, "_resolve_delegation_credentials", return_value=creds):
out = dt.delegate_task(goal="bg task", background=True, parent_agent=parent)
import json
assert json.loads(out)["status"] == "dispatched"
# Child detached immediately at dispatch, while it is still running.
assert fake_child not in parent._active_children
gate.set()
assert _drain_one() is not None
def test_concurrent_dispatch_respects_capacity():
"""Two threads racing dispatch with cap=1 must yield exactly one accept
(capacity check and record insert are atomic under the records lock)."""
gate = threading.Event()
def blocker():
gate.wait(timeout=5)
return {"status": "completed", "summary": "x"}
results = []
barrier = threading.Barrier(2)
def racer():
barrier.wait(timeout=5)
results.append(
ad.dispatch_async_delegation(
goal="race", context=None, toolsets=None, role="leaf",
model="m", session_key="", runner=blocker,
max_async_children=1,
)
)
threads = [threading.Thread(target=racer) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
statuses = sorted(r["status"] for r in results)
assert statuses == ["dispatched", "rejected"]
gate.set()
# ---------------------------------------------------------------------------
# Gateway routing: session_key -> platform/chat_id, rich formatting, injection
# ---------------------------------------------------------------------------
def _make_async_evt(**over):
evt = {
"type": "async_delegation",
"delegation_id": "deleg_x1",
"session_key": "agent:main:telegram:dm:12345:678",
"goal": "Investigate flaky test",
"context": "repo /tmp/p",
"toolsets": ["terminal"],
"role": "leaf",
"model": "m",
"status": "completed",
"summary": "Found the bug in test_foo",
"api_calls": 4,
"duration_seconds": 12.0,
"dispatched_at": 1000.0,
"completed_at": 1012.0,
}
evt.update(over)
return evt
def test_gateway_enriches_routing_from_session_key():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
evt = _make_async_evt()
runner._enrich_async_delegation_routing(evt)
assert evt["platform"] == "telegram"
assert evt["chat_id"] == "12345"
assert evt["thread_id"] == "678"
def test_gateway_formatter_renders_async_block():
from gateway.run import _format_gateway_process_notification
txt = _format_gateway_process_notification(_make_async_evt())
assert txt is not None
assert "ASYNC DELEGATION COMPLETE" in txt
assert "Found the bug in test_foo" in txt
assert "Investigate flaky test" in txt
def test_gateway_watch_drain_requeues_async_without_looping():
from gateway.run import _drain_gateway_watch_events
q = queue.Queue()
async_evt = _make_async_evt()
watch_evt = {
"type": "watch_match",
"session_id": "proc_1",
"command": "pytest",
"pattern": "READY",
"output": "READY",
}
q.put(async_evt)
q.put(watch_evt)
watch_events = _drain_gateway_watch_events(q)
assert watch_events == [watch_evt]
assert q.qsize() == 1
assert q.get_nowait() == async_evt
def test_gateway_builds_routable_source_from_enriched_event():
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
evt = _make_async_evt()
runner._enrich_async_delegation_routing(evt)
src = runner._build_process_event_source(evt)
assert src is not None
assert src.platform.value == "telegram"
assert src.chat_id == "12345"
def test_gateway_cli_origin_event_left_unrouted():
"""An empty session_key (CLI origin) is left without routing fields."""
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
evt = _make_async_evt(session_key="")
runner._enrich_async_delegation_routing(evt)
assert "platform" not in evt