hermes-agent/tests/tools/test_delegate.py
pefontana 48ecb98f8a feat(delegate): orchestrator role and configurable spawn depth (default flat)
Adds role='leaf'|'orchestrator' to delegate_task. With max_spawn_depth>=2,
an orchestrator child retains the 'delegation' toolset and can spawn its
own workers; leaf children cannot delegate further (identical to today).

Default posture is flat — max_spawn_depth=1 means a depth-0 parent's
children land at the depth-1 floor and orchestrator role silently
degrades to leaf. Users opt into nested delegation by raising
max_spawn_depth to 2 or 3 in config.yaml.

Also threads acp_command/acp_args through the main agent loop's delegate
dispatch (previously silently dropped in the schema) via a new
_dispatch_delegate_task helper, and adds a DelegateEvent enum with
legacy-string back-compat for gateway/ACP/CLI progress consumers.

Config (hermes_cli/config.py defaults):
  delegation.max_concurrent_children: 3   # floor-only, no upper cap
  delegation.max_spawn_depth: 1           # 1=flat (default), 2-3 unlock nested
  delegation.orchestrator_enabled: true   # global kill switch

Salvaged from @pefontana's PR #11215. Overrides vs. the original PR:
concurrency stays at 3 (PR bumped to 5 + cap 8 — we keep the floor only,
no hard ceiling); max_spawn_depth defaults to 1 (PR defaulted to 2 which
silently enabled one level of orchestration for every user).

Co-authored-by: pefontana <fontana.pedro93@gmail.com>
2026-04-21 14:23:45 -07:00

1970 lines
83 KiB
Python

#!/usr/bin/env python3
"""
Tests for the subagent delegation tool.
Uses mock AIAgent instances to test the delegation logic without
requiring API keys or real LLM calls.
Run with: python -m pytest tests/test_delegate.py -v
or: python tests/test_delegate.py
"""
import json
import os
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch
from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA,
DelegateEvent,
_get_max_concurrent_children,
_LEGACY_EVENT_MAP,
MAX_DEPTH,
check_delegate_requirements,
delegate_task,
_build_child_agent,
_build_child_progress_callback,
_build_child_system_prompt,
_strip_blocked_tools,
_resolve_child_credential_pool,
_resolve_delegation_credentials,
)
def _make_mock_parent(depth=0):
"""Create a mock parent agent with the fields delegate_task expects."""
parent = MagicMock()
parent.base_url = "https://openrouter.ai/api/v1"
parent.api_key="***"
parent.provider = "openrouter"
parent.api_mode = "chat_completions"
parent.model = "anthropic/claude-sonnet-4"
parent.platform = "cli"
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent._session_db = None
parent._delegate_depth = depth
parent._active_children = []
parent._active_children_lock = threading.Lock()
parent._print_fn = None
parent.tool_progress_callback = None
parent.thinking_callback = None
return parent
class TestDelegateRequirements(unittest.TestCase):
def test_always_available(self):
self.assertTrue(check_delegate_requirements())
def test_schema_valid(self):
self.assertEqual(DELEGATE_TASK_SCHEMA["name"], "delegate_task")
props = DELEGATE_TASK_SCHEMA["parameters"]["properties"]
self.assertIn("goal", props)
self.assertIn("tasks", props)
self.assertIn("context", props)
self.assertIn("toolsets", props)
self.assertIn("max_iterations", props)
self.assertNotIn("maxItems", props["tasks"]) # removed — limit is now runtime-configurable
class TestChildSystemPrompt(unittest.TestCase):
def test_goal_only(self):
prompt = _build_child_system_prompt("Fix the tests")
self.assertIn("Fix the tests", prompt)
self.assertIn("YOUR TASK", prompt)
self.assertNotIn("CONTEXT", prompt)
def test_goal_with_context(self):
prompt = _build_child_system_prompt("Fix the tests", "Error: assertion failed in test_foo.py line 42")
self.assertIn("Fix the tests", prompt)
self.assertIn("CONTEXT", prompt)
self.assertIn("assertion failed", prompt)
def test_empty_context_ignored(self):
prompt = _build_child_system_prompt("Do something", " ")
self.assertNotIn("CONTEXT", prompt)
class TestStripBlockedTools(unittest.TestCase):
def test_removes_blocked_toolsets(self):
result = _strip_blocked_tools(["terminal", "file", "delegation", "clarify", "memory", "code_execution"])
self.assertEqual(sorted(result), ["file", "terminal"])
def test_preserves_allowed_toolsets(self):
result = _strip_blocked_tools(["terminal", "file", "web", "browser"])
self.assertEqual(sorted(result), ["browser", "file", "terminal", "web"])
def test_empty_input(self):
result = _strip_blocked_tools([])
self.assertEqual(result, [])
class TestDelegateTask(unittest.TestCase):
def test_no_parent_agent(self):
result = json.loads(delegate_task(goal="test"))
self.assertIn("error", result)
self.assertIn("parent agent", result["error"])
def test_depth_limit(self):
parent = _make_mock_parent(depth=2)
result = json.loads(delegate_task(goal="test", parent_agent=parent))
self.assertIn("error", result)
self.assertIn("depth limit", result["error"].lower())
def test_no_goal_or_tasks(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(parent_agent=parent))
self.assertIn("error", result)
def test_empty_goal(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(goal=" ", parent_agent=parent))
self.assertIn("error", result)
def test_task_missing_goal(self):
parent = _make_mock_parent()
result = json.loads(delegate_task(tasks=[{"context": "no goal here"}], parent_agent=parent))
self.assertIn("error", result)
@patch("tools.delegate_tool._run_single_child")
def test_single_task_mode(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done!", "api_calls": 3, "duration_seconds": 5.0
}
parent = _make_mock_parent()
result = json.loads(delegate_task(goal="Fix tests", context="error log...", parent_agent=parent))
self.assertIn("results", result)
self.assertEqual(len(result["results"]), 1)
self.assertEqual(result["results"][0]["status"], "completed")
self.assertEqual(result["results"][0]["summary"], "Done!")
mock_run.assert_called_once()
@patch("tools.delegate_tool._run_single_child")
def test_batch_mode(self, mock_run):
mock_run.side_effect = [
{"task_index": 0, "status": "completed", "summary": "Result A", "api_calls": 2, "duration_seconds": 3.0},
{"task_index": 1, "status": "completed", "summary": "Result B", "api_calls": 4, "duration_seconds": 6.0},
]
parent = _make_mock_parent()
tasks = [
{"goal": "Research topic A"},
{"goal": "Research topic B"},
]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
self.assertIn("results", result)
self.assertEqual(len(result["results"]), 2)
self.assertEqual(result["results"][0]["summary"], "Result A")
self.assertEqual(result["results"][1]["summary"], "Result B")
self.assertIn("total_duration_seconds", result)
@patch("tools.delegate_tool._run_single_child")
def test_batch_capped_at_3(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
limit = _get_max_concurrent_children()
tasks = [{"goal": f"Task {i}"} for i in range(limit + 2)]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
# Should return an error instead of silently truncating
self.assertIn("error", result)
self.assertIn("Too many tasks", result["error"])
mock_run.assert_not_called()
@patch("tools.delegate_tool._run_single_child")
def test_batch_ignores_toplevel_goal(self, mock_run):
"""When tasks array is provided, top-level goal/context/toolsets are ignored."""
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
result = json.loads(delegate_task(
goal="This should be ignored",
tasks=[{"goal": "Actual task"}],
parent_agent=parent,
))
# The mock was called with the tasks array item, not the top-level goal
call_args = mock_run.call_args
self.assertEqual(call_args.kwargs.get("goal") or call_args[1].get("goal", call_args[0][1] if len(call_args[0]) > 1 else None), "Actual task")
@patch("tools.delegate_tool._run_single_child")
def test_failed_child_included_in_results(self, mock_run):
mock_run.return_value = {
"task_index": 0, "status": "error",
"summary": None, "error": "Something broke",
"api_calls": 0, "duration_seconds": 0.5
}
parent = _make_mock_parent()
result = json.loads(delegate_task(goal="Break things", parent_agent=parent))
self.assertEqual(result["results"][0]["status"], "error")
self.assertIn("Something broke", result["results"][0]["error"])
def test_depth_increments(self):
"""Verify child gets parent's depth + 1."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test depth", parent_agent=parent)
self.assertEqual(mock_child._delegate_depth, 1)
def test_active_children_tracking(self):
"""Verify children are registered/unregistered for interrupt propagation."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test tracking", parent_agent=parent)
self.assertEqual(len(parent._active_children), 0)
def test_child_inherits_runtime_credentials(self):
parent = _make_mock_parent(depth=0)
parent.base_url = "https://chatgpt.com/backend-api/codex"
parent.api_key="***"
parent.provider = "openai-codex"
parent.api_mode = "codex_responses"
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "ok",
"completed": True,
"api_calls": 1,
}
MockAgent.return_value = mock_child
delegate_task(goal="Test runtime inheritance", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["base_url"], parent.base_url)
self.assertEqual(kwargs["api_key"], parent.api_key)
self.assertEqual(kwargs["provider"], parent.provider)
self.assertEqual(kwargs["api_mode"], parent.api_mode)
def test_child_inherits_parent_print_fn(self):
parent = _make_mock_parent(depth=0)
sink = MagicMock()
parent._print_fn = sink
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
MockAgent.return_value = mock_child
_build_child_agent(
task_index=0,
goal="Keep stdout clean",
context=None,
toolsets=None,
model=None,
max_iterations=10,
parent_agent=parent,
task_count=1,
)
self.assertIs(mock_child._print_fn, sink)
def test_child_uses_thinking_callback_when_progress_callback_available(self):
parent = _make_mock_parent(depth=0)
parent.tool_progress_callback = MagicMock()
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
MockAgent.return_value = mock_child
_build_child_agent(
task_index=0,
goal="Avoid raw child spinners",
context=None,
toolsets=None,
model=None,
max_iterations=10,
parent_agent=parent,
task_count=1,
)
self.assertTrue(callable(mock_child.thinking_callback))
mock_child.thinking_callback("deliberating...")
parent.tool_progress_callback.assert_not_called()
class TestToolNamePreservation(unittest.TestCase):
"""Verify _last_resolved_tool_names is restored after subagent runs."""
def test_global_tool_names_restored_after_delegation(self):
"""The process-global _last_resolved_tool_names must be restored
after a subagent completes so the parent's execute_code sandbox
generates correct imports."""
import model_tools
parent = _make_mock_parent(depth=0)
original_tools = ["terminal", "read_file", "web_search", "execute_code", "delegate_task"]
model_tools._last_resolved_tool_names = list(original_tools)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1,
}
MockAgent.return_value = mock_child
delegate_task(goal="Test tool preservation", parent_agent=parent)
self.assertEqual(model_tools._last_resolved_tool_names, original_tools)
def test_global_tool_names_restored_after_child_failure(self):
"""Even when the child agent raises, the global must be restored."""
import model_tools
parent = _make_mock_parent(depth=0)
original_tools = ["terminal", "read_file", "web_search"]
model_tools._last_resolved_tool_names = list(original_tools)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.side_effect = RuntimeError("boom")
MockAgent.return_value = mock_child
result = json.loads(delegate_task(goal="Crash test", parent_agent=parent))
self.assertEqual(result["results"][0]["status"], "error")
self.assertEqual(model_tools._last_resolved_tool_names, original_tools)
def test_build_child_agent_does_not_raise_name_error(self):
"""Regression: _build_child_agent must not reference _saved_tool_names.
The bug introduced by the e7844e9c merge conflict: line 235 inside
_build_child_agent read `list(_saved_tool_names)` where that variable
is only defined later in _run_single_child. Calling _build_child_agent
standalone (without _run_single_child's scope) must never raise NameError.
"""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent"):
try:
_build_child_agent(
task_index=0,
goal="regression check",
context=None,
toolsets=None,
model=None,
max_iterations=10,
parent_agent=parent,
task_count=1,
)
except NameError as exc:
self.fail(
f"_build_child_agent raised NameError — "
f"_saved_tool_names leaked back into wrong scope: {exc}"
)
def test_saved_tool_names_set_on_child_before_run(self):
"""_run_single_child must set _delegate_saved_tool_names on the child
from model_tools._last_resolved_tool_names before run_conversation."""
import model_tools
parent = _make_mock_parent(depth=0)
expected_tools = ["read_file", "web_search", "execute_code"]
model_tools._last_resolved_tool_names = list(expected_tools)
captured = {}
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
def capture_and_return(user_message):
captured["saved"] = list(mock_child._delegate_saved_tool_names)
return {"final_response": "ok", "completed": True, "api_calls": 1}
mock_child.run_conversation.side_effect = capture_and_return
MockAgent.return_value = mock_child
delegate_task(goal="capture test", parent_agent=parent)
self.assertEqual(captured["saved"], expected_tools)
class TestDelegateObservability(unittest.TestCase):
"""Tests for enriched metadata returned by _run_single_child."""
def test_observability_fields_present(self):
"""Completed child should return tool_trace, tokens, model, exit_reason."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.model = "claude-sonnet-4-6"
mock_child.session_prompt_tokens = 5000
mock_child.session_completion_tokens = 1200
mock_child.run_conversation.return_value = {
"final_response": "done",
"completed": True,
"interrupted": False,
"api_calls": 3,
"messages": [
{"role": "user", "content": "do something"},
{"role": "assistant", "tool_calls": [
{"id": "tc_1", "function": {"name": "web_search", "arguments": '{"query": "test"}'}}
]},
{"role": "tool", "tool_call_id": "tc_1", "content": '{"results": [1,2,3]}'},
{"role": "assistant", "content": "done"},
],
}
MockAgent.return_value = mock_child
result = json.loads(delegate_task(goal="Test observability", parent_agent=parent))
entry = result["results"][0]
# Core observability fields
self.assertEqual(entry["model"], "claude-sonnet-4-6")
self.assertEqual(entry["exit_reason"], "completed")
self.assertEqual(entry["tokens"]["input"], 5000)
self.assertEqual(entry["tokens"]["output"], 1200)
# Tool trace
self.assertEqual(len(entry["tool_trace"]), 1)
self.assertEqual(entry["tool_trace"][0]["tool"], "web_search")
self.assertIn("args_bytes", entry["tool_trace"][0])
self.assertIn("result_bytes", entry["tool_trace"][0])
self.assertEqual(entry["tool_trace"][0]["status"], "ok")
def test_tool_trace_detects_error(self):
"""Tool results containing 'error' should be marked as error status."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.model = "claude-sonnet-4-6"
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.run_conversation.return_value = {
"final_response": "failed",
"completed": True,
"interrupted": False,
"api_calls": 1,
"messages": [
{"role": "assistant", "tool_calls": [
{"id": "tc_1", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}}
]},
{"role": "tool", "tool_call_id": "tc_1", "content": "Error: command not found"},
],
}
MockAgent.return_value = mock_child
result = json.loads(delegate_task(goal="Test error trace", parent_agent=parent))
trace = result["results"][0]["tool_trace"]
self.assertEqual(trace[0]["status"], "error")
def test_parallel_tool_calls_paired_correctly(self):
"""Parallel tool calls should each get their own result via tool_call_id matching."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.model = "claude-sonnet-4-6"
mock_child.session_prompt_tokens = 3000
mock_child.session_completion_tokens = 800
mock_child.run_conversation.return_value = {
"final_response": "done",
"completed": True,
"interrupted": False,
"api_calls": 1,
"messages": [
{"role": "assistant", "tool_calls": [
{"id": "tc_a", "function": {"name": "web_search", "arguments": '{"q": "a"}'}},
{"id": "tc_b", "function": {"name": "web_search", "arguments": '{"q": "b"}'}},
{"id": "tc_c", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}},
]},
{"role": "tool", "tool_call_id": "tc_a", "content": '{"ok": true}'},
{"role": "tool", "tool_call_id": "tc_b", "content": "Error: rate limited"},
{"role": "tool", "tool_call_id": "tc_c", "content": "file1.txt\nfile2.txt"},
{"role": "assistant", "content": "done"},
],
}
MockAgent.return_value = mock_child
result = json.loads(delegate_task(goal="Test parallel", parent_agent=parent))
trace = result["results"][0]["tool_trace"]
# All three tool calls should have results
self.assertEqual(len(trace), 3)
# First: web_search → ok
self.assertEqual(trace[0]["tool"], "web_search")
self.assertEqual(trace[0]["status"], "ok")
self.assertIn("result_bytes", trace[0])
# Second: web_search → error
self.assertEqual(trace[1]["tool"], "web_search")
self.assertEqual(trace[1]["status"], "error")
self.assertIn("result_bytes", trace[1])
# Third: terminal → ok
self.assertEqual(trace[2]["tool"], "terminal")
self.assertEqual(trace[2]["status"], "ok")
self.assertIn("result_bytes", trace[2])
def test_exit_reason_interrupted(self):
"""Interrupted child should report exit_reason='interrupted'."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.model = "claude-sonnet-4-6"
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.run_conversation.return_value = {
"final_response": "",
"completed": False,
"interrupted": True,
"api_calls": 2,
"messages": [],
}
MockAgent.return_value = mock_child
result = json.loads(delegate_task(goal="Test interrupt", parent_agent=parent))
self.assertEqual(result["results"][0]["exit_reason"], "interrupted")
def test_exit_reason_max_iterations(self):
"""Child that didn't complete and wasn't interrupted hit max_iterations."""
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.model = "claude-sonnet-4-6"
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.run_conversation.return_value = {
"final_response": "",
"completed": False,
"interrupted": False,
"api_calls": 50,
"messages": [],
}
MockAgent.return_value = mock_child
result = json.loads(delegate_task(goal="Test max iter", parent_agent=parent))
self.assertEqual(result["results"][0]["exit_reason"], "max_iterations")
class TestBlockedTools(unittest.TestCase):
def test_blocked_tools_constant(self):
for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]:
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self):
from tools.delegate_tool import (
_get_max_spawn_depth, _get_orchestrator_enabled,
_MIN_SPAWN_DEPTH, _MAX_SPAWN_DEPTH_CAP,
)
self.assertEqual(_get_max_concurrent_children(), 3)
self.assertEqual(MAX_DEPTH, 1)
self.assertEqual(_get_max_spawn_depth(), 1) # default: flat
self.assertTrue(_get_orchestrator_enabled()) # default
self.assertEqual(_MIN_SPAWN_DEPTH, 1)
self.assertEqual(_MAX_SPAWN_DEPTH_CAP, 3)
class TestDelegationCredentialResolution(unittest.TestCase):
"""Tests for provider:model credential resolution in delegation config."""
def test_no_provider_returns_none_credentials(self):
"""When delegation.provider is empty, all credentials are None (inherit parent)."""
parent = _make_mock_parent(depth=0)
cfg = {"model": "", "provider": ""}
creds = _resolve_delegation_credentials(cfg, parent)
self.assertIsNone(creds["provider"])
self.assertIsNone(creds["base_url"])
self.assertIsNone(creds["api_key"])
self.assertIsNone(creds["api_mode"])
self.assertIsNone(creds["model"])
def test_model_only_no_provider(self):
"""When only model is set (no provider), model is returned but credentials are None."""
parent = _make_mock_parent(depth=0)
cfg = {"model": "google/gemini-3-flash-preview", "provider": ""}
creds = _resolve_delegation_credentials(cfg, parent)
self.assertEqual(creds["model"], "google/gemini-3-flash-preview")
self.assertIsNone(creds["provider"])
self.assertIsNone(creds["base_url"])
self.assertIsNone(creds["api_key"])
@patch("hermes_cli.runtime_provider.resolve_runtime_provider")
def test_provider_resolves_full_credentials(self, mock_resolve):
"""When delegation.provider is set, full credentials are resolved."""
mock_resolve.return_value = {
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "sk-or-test-key",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
cfg = {"model": "google/gemini-3-flash-preview", "provider": "openrouter"}
creds = _resolve_delegation_credentials(cfg, parent)
self.assertEqual(creds["model"], "google/gemini-3-flash-preview")
self.assertEqual(creds["provider"], "openrouter")
self.assertEqual(creds["base_url"], "https://openrouter.ai/api/v1")
self.assertEqual(creds["api_key"], "sk-or-test-key")
self.assertEqual(creds["api_mode"], "chat_completions")
mock_resolve.assert_called_once_with(requested="openrouter")
def test_direct_endpoint_uses_configured_base_url_and_api_key(self):
parent = _make_mock_parent(depth=0)
cfg = {
"model": "qwen2.5-coder",
"provider": "openrouter",
"base_url": "http://localhost:1234/v1",
"api_key": "local-key",
}
creds = _resolve_delegation_credentials(cfg, parent)
self.assertEqual(creds["model"], "qwen2.5-coder")
self.assertEqual(creds["provider"], "custom")
self.assertEqual(creds["base_url"], "http://localhost:1234/v1")
self.assertEqual(creds["api_key"], "local-key")
self.assertEqual(creds["api_mode"], "chat_completions")
def test_direct_endpoint_falls_back_to_openai_api_key_env(self):
parent = _make_mock_parent(depth=0)
cfg = {
"model": "qwen2.5-coder",
"base_url": "http://localhost:1234/v1",
}
with patch.dict(os.environ, {"OPENAI_API_KEY": "env-openai-key"}, clear=False):
creds = _resolve_delegation_credentials(cfg, parent)
self.assertEqual(creds["api_key"], "env-openai-key")
self.assertEqual(creds["provider"], "custom")
def test_direct_endpoint_does_not_fall_back_to_openrouter_api_key_env(self):
parent = _make_mock_parent(depth=0)
cfg = {
"model": "qwen2.5-coder",
"base_url": "http://localhost:1234/v1",
}
with patch.dict(
os.environ,
{
"OPENROUTER_API_KEY": "env-openrouter-key",
"OPENAI_API_KEY": "",
},
clear=False,
):
with self.assertRaises(ValueError) as ctx:
_resolve_delegation_credentials(cfg, parent)
self.assertIn("OPENAI_API_KEY", str(ctx.exception))
@patch("hermes_cli.runtime_provider.resolve_runtime_provider")
def test_nous_provider_resolves_nous_credentials(self, mock_resolve):
"""Nous provider resolves Nous Portal base_url and api_key."""
mock_resolve.return_value = {
"provider": "nous",
"base_url": "https://inference-api.nousresearch.com/v1",
"api_key": "nous-agent-key-xyz",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
cfg = {"model": "hermes-3-llama-3.1-8b", "provider": "nous"}
creds = _resolve_delegation_credentials(cfg, parent)
self.assertEqual(creds["provider"], "nous")
self.assertEqual(creds["base_url"], "https://inference-api.nousresearch.com/v1")
self.assertEqual(creds["api_key"], "nous-agent-key-xyz")
mock_resolve.assert_called_once_with(requested="nous")
@patch("hermes_cli.runtime_provider.resolve_runtime_provider")
def test_provider_resolution_failure_raises_valueerror(self, mock_resolve):
"""When provider resolution fails, ValueError is raised with helpful message."""
mock_resolve.side_effect = RuntimeError("OPENROUTER_API_KEY not set")
parent = _make_mock_parent(depth=0)
cfg = {"model": "some-model", "provider": "openrouter"}
with self.assertRaises(ValueError) as ctx:
_resolve_delegation_credentials(cfg, parent)
self.assertIn("openrouter", str(ctx.exception).lower())
self.assertIn("Cannot resolve", str(ctx.exception))
@patch("hermes_cli.runtime_provider.resolve_runtime_provider")
def test_provider_resolves_but_no_api_key_raises(self, mock_resolve):
"""When provider resolves but has no API key, ValueError is raised."""
mock_resolve.return_value = {
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
cfg = {"model": "some-model", "provider": "openrouter"}
with self.assertRaises(ValueError) as ctx:
_resolve_delegation_credentials(cfg, parent)
self.assertIn("no API key", str(ctx.exception))
def test_missing_config_keys_inherit_parent(self):
"""When config dict has no model/provider keys at all, inherits parent."""
parent = _make_mock_parent(depth=0)
cfg = {"max_iterations": 45}
creds = _resolve_delegation_credentials(cfg, parent)
self.assertIsNone(creds["model"])
self.assertIsNone(creds["provider"])
class TestDelegationProviderIntegration(unittest.TestCase):
"""Integration tests: delegation config → _run_single_child → AIAgent construction."""
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_config_provider_credentials_reach_child_agent(self, mock_creds, mock_cfg):
"""When delegation.provider is configured, child agent gets resolved credentials."""
mock_cfg.return_value = {
"max_iterations": 45,
"model": "google/gemini-3-flash-preview",
"provider": "openrouter",
}
mock_creds.return_value = {
"model": "google/gemini-3-flash-preview",
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "sk-or-delegation-key",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test provider routing", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview")
self.assertEqual(kwargs["provider"], "openrouter")
self.assertEqual(kwargs["base_url"], "https://openrouter.ai/api/v1")
self.assertEqual(kwargs["api_key"], "sk-or-delegation-key")
self.assertEqual(kwargs["api_mode"], "chat_completions")
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_cross_provider_delegation(self, mock_creds, mock_cfg):
"""Parent on Nous, subagent on OpenRouter — full credential switch."""
mock_cfg.return_value = {
"max_iterations": 45,
"model": "google/gemini-3-flash-preview",
"provider": "openrouter",
}
mock_creds.return_value = {
"model": "google/gemini-3-flash-preview",
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "sk-or-key",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
parent.provider = "nous"
parent.base_url = "https://inference-api.nousresearch.com/v1"
parent.api_key = "nous-key-abc"
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Cross-provider test", parent_agent=parent)
_, kwargs = MockAgent.call_args
# Child should use OpenRouter, NOT Nous
self.assertEqual(kwargs["provider"], "openrouter")
self.assertEqual(kwargs["base_url"], "https://openrouter.ai/api/v1")
self.assertEqual(kwargs["api_key"], "sk-or-key")
self.assertNotEqual(kwargs["base_url"], parent.base_url)
self.assertNotEqual(kwargs["api_key"], parent.api_key)
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_direct_endpoint_credentials_reach_child_agent(self, mock_creds, mock_cfg):
mock_cfg.return_value = {
"max_iterations": 45,
"model": "qwen2.5-coder",
"base_url": "http://localhost:1234/v1",
"api_key": "local-key",
}
mock_creds.return_value = {
"model": "qwen2.5-coder",
"provider": "custom",
"base_url": "http://localhost:1234/v1",
"api_key": "local-key",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Direct endpoint test", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["model"], "qwen2.5-coder")
self.assertEqual(kwargs["provider"], "custom")
self.assertEqual(kwargs["base_url"], "http://localhost:1234/v1")
self.assertEqual(kwargs["api_key"], "local-key")
self.assertEqual(kwargs["api_mode"], "chat_completions")
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_empty_config_inherits_parent(self, mock_creds, mock_cfg):
"""When delegation config is empty, child inherits parent credentials."""
mock_cfg.return_value = {"max_iterations": 45, "model": "", "provider": ""}
mock_creds.return_value = {
"model": None,
"provider": None,
"base_url": None,
"api_key": None,
"api_mode": None,
}
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Test inherit", parent_agent=parent)
_, kwargs = MockAgent.call_args
self.assertEqual(kwargs["model"], parent.model)
self.assertEqual(kwargs["provider"], parent.provider)
self.assertEqual(kwargs["base_url"], parent.base_url)
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_credential_error_returns_json_error(self, mock_creds, mock_cfg):
"""When credential resolution fails, delegate_task returns a JSON error."""
mock_cfg.return_value = {"model": "bad-model", "provider": "nonexistent"}
mock_creds.side_effect = ValueError(
"Cannot resolve delegation provider 'nonexistent': Unknown provider"
)
parent = _make_mock_parent(depth=0)
result = json.loads(delegate_task(goal="Should fail", parent_agent=parent))
self.assertIn("error", result)
self.assertIn("Cannot resolve", result["error"])
self.assertIn("nonexistent", result["error"])
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_batch_mode_all_children_get_credentials(self, mock_creds, mock_cfg):
"""In batch mode, all children receive the resolved credentials."""
mock_cfg.return_value = {
"max_iterations": 45,
"model": "meta-llama/llama-4-scout",
"provider": "openrouter",
}
mock_creds.return_value = {
"model": "meta-llama/llama-4-scout",
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "sk-or-batch",
"api_mode": "chat_completions",
}
parent = _make_mock_parent(depth=0)
# Patch _build_child_agent since credentials are now passed there
# (agents are built in the main thread before being handed to workers)
with patch("tools.delegate_tool._build_child_agent") as mock_build, \
patch("tools.delegate_tool._run_single_child") as mock_run:
mock_child = MagicMock()
mock_build.return_value = mock_child
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
tasks = [{"goal": "Task A"}, {"goal": "Task B"}]
delegate_task(tasks=tasks, parent_agent=parent)
self.assertEqual(mock_build.call_count, 2)
for call in mock_build.call_args_list:
self.assertEqual(call.kwargs.get("model"), "meta-llama/llama-4-scout")
self.assertEqual(call.kwargs.get("override_provider"), "openrouter")
self.assertEqual(call.kwargs.get("override_base_url"), "https://openrouter.ai/api/v1")
self.assertEqual(call.kwargs.get("override_api_key"), "sk-or-batch")
self.assertEqual(call.kwargs.get("override_api_mode"), "chat_completions")
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_delegation_acp_runtime_reaches_child_agent(self, mock_creds, mock_cfg):
"""Resolved ACP runtime command/args must be forwarded to child agents."""
mock_cfg.return_value = {
"max_iterations": 45,
"model": "copilot-model",
"provider": "copilot-acp",
}
mock_creds.return_value = {
"model": "copilot-model",
"provider": "copilot-acp",
"base_url": "acp://copilot",
"api_key": "copilot-acp",
"api_mode": "chat_completions",
"command": "custom-copilot",
"args": ["--stdio-custom"],
}
parent = _make_mock_parent(depth=0)
with patch("tools.delegate_tool._build_child_agent") as mock_build, \
patch("tools.delegate_tool._run_single_child") as mock_run:
mock_child = MagicMock()
mock_build.return_value = mock_child
mock_run.return_value = {
"task_index": 0, "status": "completed",
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
delegate_task(goal="ACP delegation test", parent_agent=parent)
_, kwargs = mock_build.call_args
self.assertEqual(kwargs.get("override_provider"), "copilot-acp")
self.assertEqual(kwargs.get("override_base_url"), "acp://copilot")
self.assertEqual(kwargs.get("override_api_key"), "copilot-acp")
self.assertEqual(kwargs.get("override_api_mode"), "chat_completions")
self.assertEqual(kwargs.get("override_acp_command"), "custom-copilot")
self.assertEqual(kwargs.get("override_acp_args"), ["--stdio-custom"])
@patch("tools.delegate_tool._load_config")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_model_only_no_provider_inherits_parent_credentials(self, mock_creds, mock_cfg):
"""Setting only model (no provider) changes model but keeps parent credentials."""
mock_cfg.return_value = {
"max_iterations": 45,
"model": "google/gemini-3-flash-preview",
"provider": "",
}
mock_creds.return_value = {
"model": "google/gemini-3-flash-preview",
"provider": None,
"base_url": None,
"api_key": None,
"api_mode": None,
}
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1
}
MockAgent.return_value = mock_child
delegate_task(goal="Model only test", parent_agent=parent)
_, kwargs = MockAgent.call_args
# Model should be overridden
self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview")
# But provider/base_url/api_key should inherit from parent
self.assertEqual(kwargs["provider"], parent.provider)
self.assertEqual(kwargs["base_url"], parent.base_url)
class TestChildCredentialPoolResolution(unittest.TestCase):
def test_same_provider_shares_parent_pool(self):
parent = _make_mock_parent()
mock_pool = MagicMock()
parent._credential_pool = mock_pool
result = _resolve_child_credential_pool("openrouter", parent)
self.assertIs(result, mock_pool)
def test_no_provider_inherits_parent_pool(self):
parent = _make_mock_parent()
mock_pool = MagicMock()
parent._credential_pool = mock_pool
result = _resolve_child_credential_pool(None, parent)
self.assertIs(result, mock_pool)
def test_different_provider_loads_own_pool(self):
parent = _make_mock_parent()
parent._credential_pool = MagicMock()
mock_pool = MagicMock()
mock_pool.has_credentials.return_value = True
with patch("agent.credential_pool.load_pool", return_value=mock_pool):
result = _resolve_child_credential_pool("anthropic", parent)
self.assertIs(result, mock_pool)
def test_different_provider_empty_pool_returns_none(self):
parent = _make_mock_parent()
parent._credential_pool = MagicMock()
mock_pool = MagicMock()
mock_pool.has_credentials.return_value = False
with patch("agent.credential_pool.load_pool", return_value=mock_pool):
result = _resolve_child_credential_pool("anthropic", parent)
self.assertIsNone(result)
def test_different_provider_load_failure_returns_none(self):
parent = _make_mock_parent()
parent._credential_pool = MagicMock()
with patch("agent.credential_pool.load_pool", side_effect=Exception("disk error")):
result = _resolve_child_credential_pool("anthropic", parent)
self.assertIsNone(result)
def test_build_child_agent_assigns_parent_pool_when_shared(self):
parent = _make_mock_parent()
mock_pool = MagicMock()
parent._credential_pool = mock_pool
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
MockAgent.return_value = mock_child
_build_child_agent(
task_index=0,
goal="Test pool assignment",
context=None,
toolsets=["terminal"],
model=None,
max_iterations=10,
parent_agent=parent,
task_count=1,
)
self.assertEqual(mock_child._credential_pool, mock_pool)
class TestChildCredentialLeasing(unittest.TestCase):
def test_run_single_child_acquires_and_releases_lease(self):
from tools.delegate_tool import _run_single_child
leased_entry = MagicMock()
leased_entry.id = "cred-b"
child = MagicMock()
child._credential_pool = MagicMock()
child._credential_pool.acquire_lease.return_value = "cred-b"
child._credential_pool.current.return_value = leased_entry
child.run_conversation.return_value = {
"final_response": "done",
"completed": True,
"interrupted": False,
"api_calls": 1,
"messages": [],
}
result = _run_single_child(
task_index=0,
goal="Investigate rate limits",
child=child,
parent_agent=_make_mock_parent(),
)
self.assertEqual(result["status"], "completed")
child._credential_pool.acquire_lease.assert_called_once_with()
child._swap_credential.assert_called_once_with(leased_entry)
child._credential_pool.release_lease.assert_called_once_with("cred-b")
def test_run_single_child_releases_lease_after_failure(self):
from tools.delegate_tool import _run_single_child
child = MagicMock()
child._credential_pool = MagicMock()
child._credential_pool.acquire_lease.return_value = "cred-a"
child._credential_pool.current.return_value = MagicMock(id="cred-a")
child.run_conversation.side_effect = RuntimeError("boom")
result = _run_single_child(
task_index=1,
goal="Trigger failure",
child=child,
parent_agent=_make_mock_parent(),
)
self.assertEqual(result["status"], "error")
child._credential_pool.release_lease.assert_called_once_with("cred-a")
class TestDelegateHeartbeat(unittest.TestCase):
"""Heartbeat propagates child activity to parent during delegation.
Without the heartbeat, the gateway inactivity timeout fires because the
parent's _last_activity_ts freezes when delegate_task starts.
"""
def test_heartbeat_touches_parent_activity_during_child_run(self):
"""Parent's _touch_activity is called while child.run_conversation blocks."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "terminal",
"api_call_count": 3,
"max_iterations": 50,
"last_activity_desc": "executing tool: terminal",
}
# Make run_conversation block long enough for heartbeats to fire
def slow_run(**kwargs):
time.sleep(0.25)
return {"final_response": "done", "completed": True, "api_calls": 3}
child.run_conversation.side_effect = slow_run
# Patch the heartbeat interval to fire quickly
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test heartbeat",
child=child,
parent_agent=parent,
)
# Heartbeat should have fired at least once during the 0.25s sleep
self.assertGreater(len(touch_calls), 0,
"Heartbeat did not propagate activity to parent")
# Verify the description includes child's current tool detail
self.assertTrue(
any("terminal" in desc for desc in touch_calls),
f"Heartbeat descriptions should include child tool info: {touch_calls}")
def test_heartbeat_stops_after_child_completes(self):
"""Heartbeat thread is cleaned up when the child finishes."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 1,
"max_iterations": 50,
"last_activity_desc": "done",
}
child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1,
}
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test cleanup",
child=child,
parent_agent=parent,
)
# Record count after completion, wait, and verify no more calls
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child completed")
def test_heartbeat_stops_after_child_error(self):
"""Heartbeat thread is cleaned up even when the child raises."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "web_search",
"api_call_count": 2,
"max_iterations": 50,
"last_activity_desc": "executing tool: web_search",
}
def slow_fail(**kwargs):
time.sleep(0.15)
raise RuntimeError("network timeout")
child.run_conversation.side_effect = slow_fail
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
result = _run_single_child(
task_index=0,
goal="Test error cleanup",
child=child,
parent_agent=parent,
)
self.assertEqual(result["status"], "error")
# Verify heartbeat stopped
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child error")
def test_heartbeat_includes_child_activity_desc_when_no_tool(self):
"""When child has no current_tool, heartbeat uses last_activity_desc."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 5,
"max_iterations": 90,
"last_activity_desc": "API call #5 completed",
}
def slow_run(**kwargs):
time.sleep(0.15)
return {"final_response": "done", "completed": True, "api_calls": 5}
child.run_conversation.side_effect = slow_run
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test desc fallback",
child=child,
parent_agent=parent,
)
self.assertGreater(len(touch_calls), 0)
self.assertTrue(
any("API call #5 completed" in desc for desc in touch_calls),
f"Heartbeat should include last_activity_desc: {touch_calls}")
class TestDelegationReasoningEffort(unittest.TestCase):
"""Tests for delegation.reasoning_effort config override."""
@patch("tools.delegate_tool._load_config")
@patch("run_agent.AIAgent")
def test_inherits_parent_reasoning_when_no_override(self, MockAgent, mock_cfg):
"""With no delegation.reasoning_effort, child inherits parent's config."""
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": ""}
MockAgent.return_value = MagicMock()
parent = _make_mock_parent()
parent.reasoning_config = {"enabled": True, "effort": "xhigh"}
_build_child_agent(
task_index=0, goal="test", context=None, toolsets=None,
model=None, max_iterations=50, parent_agent=parent,
task_count=1,
)
call_kwargs = MockAgent.call_args[1]
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "xhigh"})
@patch("tools.delegate_tool._load_config")
@patch("run_agent.AIAgent")
def test_override_reasoning_effort_from_config(self, MockAgent, mock_cfg):
"""delegation.reasoning_effort overrides the parent's level."""
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "low"}
MockAgent.return_value = MagicMock()
parent = _make_mock_parent()
parent.reasoning_config = {"enabled": True, "effort": "xhigh"}
_build_child_agent(
task_index=0, goal="test", context=None, toolsets=None,
model=None, max_iterations=50, parent_agent=parent,
task_count=1,
)
call_kwargs = MockAgent.call_args[1]
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "low"})
@patch("tools.delegate_tool._load_config")
@patch("run_agent.AIAgent")
def test_override_reasoning_effort_none_disables(self, MockAgent, mock_cfg):
"""delegation.reasoning_effort: 'none' disables thinking for subagents."""
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "none"}
MockAgent.return_value = MagicMock()
parent = _make_mock_parent()
parent.reasoning_config = {"enabled": True, "effort": "high"}
_build_child_agent(
task_index=0, goal="test", context=None, toolsets=None,
model=None, max_iterations=50, parent_agent=parent,
task_count=1,
)
call_kwargs = MockAgent.call_args[1]
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": False})
@patch("tools.delegate_tool._load_config")
@patch("run_agent.AIAgent")
def test_invalid_reasoning_effort_falls_back_to_parent(self, MockAgent, mock_cfg):
"""Invalid delegation.reasoning_effort falls back to parent's config."""
mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "banana"}
MockAgent.return_value = MagicMock()
parent = _make_mock_parent()
parent.reasoning_config = {"enabled": True, "effort": "medium"}
_build_child_agent(
task_index=0, goal="test", context=None, toolsets=None,
model=None, max_iterations=50, parent_agent=parent,
task_count=1,
)
call_kwargs = MockAgent.call_args[1]
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "medium"})
# =========================================================================
# Dispatch helper, progress events, concurrency
# =========================================================================
class TestDispatchDelegateTask(unittest.TestCase):
"""Tests for the _dispatch_delegate_task helper and full param forwarding."""
@patch("tools.delegate_tool._load_config", return_value={})
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_acp_args_forwarded(self, mock_creds, mock_cfg):
"""Both acp_command and acp_args reach delegate_task via the helper."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
with patch("tools.delegate_tool._build_child_agent") as mock_build:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True,
"api_calls": 1, "messages": [],
}
mock_child._delegate_saved_tool_names = []
mock_child._credential_pool = None
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.model = "test"
mock_build.return_value = mock_child
delegate_task(
goal="test",
acp_command="claude",
acp_args=["--acp", "--stdio"],
parent_agent=parent,
)
_, kwargs = mock_build.call_args
self.assertEqual(kwargs["override_acp_command"], "claude")
self.assertEqual(kwargs["override_acp_args"], ["--acp", "--stdio"])
class TestDelegateEventEnum(unittest.TestCase):
"""Tests for DelegateEvent enum and back-compat aliases."""
def test_enum_values_are_strings(self):
for event in DelegateEvent:
self.assertIsInstance(event.value, str)
self.assertTrue(event.value.startswith("delegate."))
def test_legacy_map_covers_all_old_names(self):
expected_legacy = {"_thinking", "reasoning.available",
"tool.started", "tool.completed", "subagent_progress"}
self.assertEqual(set(_LEGACY_EVENT_MAP.keys()), expected_legacy)
def test_legacy_map_values_are_delegate_events(self):
for old_name, event in _LEGACY_EVENT_MAP.items():
self.assertIsInstance(event, DelegateEvent)
def test_progress_callback_normalises_tool_started(self):
"""_build_child_progress_callback handles tool.started via enum."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
self.assertIsNotNone(cb)
cb("tool.started", tool_name="terminal", preview="ls")
parent._delegate_spinner.print_above.assert_called()
def test_progress_callback_normalises_thinking(self):
"""Both _thinking and reasoning.available route to TASK_THINKING."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("_thinking", tool_name=None, preview="pondering...")
assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list)
parent._delegate_spinner.print_above.reset_mock()
cb("reasoning.available", tool_name=None, preview="hmm")
assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list)
def test_progress_callback_tool_completed_is_noop(self):
"""tool.completed is normalised but produces no display output."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("tool.completed", tool_name="terminal")
parent._delegate_spinner.print_above.assert_not_called()
def test_progress_callback_ignores_unknown_events(self):
"""Unknown event types are silently ignored."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
# Should not raise
cb("some.unknown.event", tool_name="x")
parent._delegate_spinner.print_above.assert_not_called()
def test_progress_callback_accepts_enum_value_directly(self):
"""cb(DelegateEvent.TASK_THINKING, ...) must route to the thinking
branch. Pre-fix the callback only handled legacy strings via
_LEGACY_EVENT_MAP.get and silently dropped enum-typed callers."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb(DelegateEvent.TASK_THINKING, preview="pondering")
# If the enum was accepted, the thinking emoji got printed.
assert any(
"💭" in str(c)
for c in parent._delegate_spinner.print_above.call_args_list
)
def test_progress_callback_accepts_new_style_string(self):
"""cb('delegate.task_thinking', ...) — the string form of the
enum value — must route to the thinking branch too, so new-style
emitters don't have to import DelegateEvent."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("delegate.task_thinking", preview="hmm")
assert any(
"💭" in str(c)
for c in parent._delegate_spinner.print_above.call_args_list
)
def test_progress_callback_task_progress_not_misrendered(self):
"""'subagent_progress' (legacy name for TASK_PROGRESS) carries a
pre-batched summary in the tool_name slot. Before the fix, this
fell through to the TASK_TOOL_STARTED rendering path, treating
the summary string as a tool name. After the fix: distinct
render (no tool-start emoji lookup) and pass-through relay
upward (no re-batching).
Regression path only reachable once nested orchestration is
enabled: nested orchestrators relay subagent_progress from
grandchildren upward through this callback.
"""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("subagent_progress", tool_name="🔀 [1] terminal, file")
# Spinner gets a distinct 🔀-prefixed line, NOT a tool emoji
# followed by the summary string as if it were a tool name.
calls = parent._delegate_spinner.print_above.call_args_list
self.assertTrue(any("🔀 🔀 [1] terminal, file" in str(c) for c in calls))
# Parent callback receives the relay (pass-through, no re-batching).
parent.tool_progress_callback.assert_called_once()
# No '⚡' tool-start emoji should appear — that's the pre-fix bug.
self.assertFalse(any("" in str(c) for c in calls))
class TestConcurrencyDefaults(unittest.TestCase):
"""Tests for the concurrency default and no hard ceiling."""
@patch("tools.delegate_tool._load_config", return_value={})
def test_default_is_three(self, mock_cfg):
# Clear env var if set
with patch.dict(os.environ, {}, clear=True):
self.assertEqual(_get_max_concurrent_children(), 3)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 10})
def test_no_upper_ceiling(self, mock_cfg):
"""Users can raise concurrency as high as they want — no hard cap."""
self.assertEqual(_get_max_concurrent_children(), 10)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 100})
def test_very_high_values_honored(self, mock_cfg):
self.assertEqual(_get_max_concurrent_children(), 100)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 0})
def test_zero_clamped_to_one(self, mock_cfg):
"""Floor of 1 is enforced; zero or negative values raise to 1."""
self.assertEqual(_get_max_concurrent_children(), 1)
@patch("tools.delegate_tool._load_config", return_value={})
def test_env_var_honored_uncapped(self, mock_cfg):
with patch.dict(os.environ, {"DELEGATION_MAX_CONCURRENT_CHILDREN": "12"}):
self.assertEqual(_get_max_concurrent_children(), 12)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 6})
def test_configured_value_returned(self, mock_cfg):
self.assertEqual(_get_max_concurrent_children(), 6)
# =========================================================================
# max_spawn_depth clamping
# =========================================================================
class TestMaxSpawnDepth(unittest.TestCase):
"""Tests for _get_max_spawn_depth clamping and fallback behavior."""
@patch("tools.delegate_tool._load_config", return_value={})
def test_max_spawn_depth_defaults_to_1(self, mock_cfg):
from tools.delegate_tool import _get_max_spawn_depth
self.assertEqual(_get_max_spawn_depth(), 1)
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 0})
def test_max_spawn_depth_clamped_below_one(self, mock_cfg):
import logging
from tools.delegate_tool import _get_max_spawn_depth
with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm:
result = _get_max_spawn_depth()
self.assertEqual(result, 1)
self.assertTrue(any("clamping to 1" in m for m in cm.output))
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 99})
def test_max_spawn_depth_clamped_above_three(self, mock_cfg):
import logging
from tools.delegate_tool import _get_max_spawn_depth
with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm:
result = _get_max_spawn_depth()
self.assertEqual(result, 3)
self.assertTrue(any("clamping to 3" in m for m in cm.output))
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": "not-a-number"})
def test_max_spawn_depth_invalid_falls_back_to_default(self, mock_cfg):
from tools.delegate_tool import _get_max_spawn_depth
self.assertEqual(_get_max_spawn_depth(), 1)
# =========================================================================
# role param plumbing
# =========================================================================
#
# These tests cover the schema + signature + stash plumbing of the role
# param. The full role-honoring behavior (toolset re-add, role-aware
# prompt) lives in TestOrchestratorRoleBehavior below; these tests only
# assert on _delegate_role stashing and on the schema shape.
class TestOrchestratorRoleSchema(unittest.TestCase):
"""Tests that the role param reaches the child via dispatch."""
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def _run_with_mock_child(self, role_arg, mock_cfg, mock_creds):
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True,
"api_calls": 1, "messages": [],
}
mock_child._delegate_saved_tool_names = []
mock_child._credential_pool = None
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.model = "test"
MockAgent.return_value = mock_child
kwargs = {"goal": "test", "parent_agent": parent}
if role_arg is not _SENTINEL:
kwargs["role"] = role_arg
delegate_task(**kwargs)
return mock_child
def test_default_role_is_leaf(self):
child = self._run_with_mock_child(_SENTINEL)
self.assertEqual(child._delegate_role, "leaf")
def test_explicit_orchestrator_role_stashed(self):
"""role='orchestrator' reaches _build_child_agent and is stashed.
Full behavior (toolset re-add) lands in commit 3; commit 2 only
verifies the plumbing."""
child = self._run_with_mock_child("orchestrator")
self.assertEqual(child._delegate_role, "orchestrator")
def test_unknown_role_coerces_to_leaf(self):
"""role='nonsense' → _normalize_role warns and returns 'leaf'."""
import logging
with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm:
child = self._run_with_mock_child("nonsense")
self.assertEqual(child._delegate_role, "leaf")
self.assertTrue(any("coercing" in m.lower() for m in cm.output))
def test_schema_has_role_top_level_and_per_task(self):
from tools.delegate_tool import DELEGATE_TASK_SCHEMA
props = DELEGATE_TASK_SCHEMA["parameters"]["properties"]
self.assertIn("role", props)
self.assertEqual(props["role"]["enum"], ["leaf", "orchestrator"])
task_props = props["tasks"]["items"]["properties"]
self.assertIn("role", task_props)
self.assertEqual(task_props["role"]["enum"], ["leaf", "orchestrator"])
# Sentinel used to distinguish "role kwarg omitted" from "role=None".
_SENTINEL = object()
# =========================================================================
# role-honoring behavior
# =========================================================================
def _make_role_mock_child():
"""Helper: mock child with minimal fields for delegate_task to process."""
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True,
"api_calls": 1, "messages": [],
}
mock_child._delegate_saved_tool_names = []
mock_child._credential_pool = None
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.model = "test"
return mock_child
class TestOrchestratorRoleBehavior(unittest.TestCase):
"""Tests that role='orchestrator' actually changes toolset + prompt."""
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_orchestrator_role_keeps_delegation_at_depth_1(
self, mock_cfg, mock_creds
):
"""role='orchestrator' + depth-0 parent with max_spawn_depth=2 →
child at depth 1 gets 'delegation' in enabled_toolsets (can
further delegate). Requires max_spawn_depth>=2 since the new
default is 1 (flat)."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file"]
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator", parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "orchestrator")
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_orchestrator_blocked_at_max_spawn_depth(
self, mock_cfg, mock_creds
):
"""Parent at depth 1 with max_spawn_depth=2 spawns child
at depth 2 (the floor); role='orchestrator' degrades to leaf."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=1)
parent.enabled_toolsets = ["terminal", "delegation"]
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator", parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertNotIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "leaf")
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config", return_value={})
def test_orchestrator_blocked_at_default_flat_depth(
self, mock_cfg, mock_creds
):
"""With default max_spawn_depth=1 (flat), role='orchestrator'
on a depth-0 parent produces a depth-1 child that is already at
the floor — the role degrades to 'leaf' and the delegation
toolset is stripped. This is the new default posture."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file", "delegation"]
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator", parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertNotIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "leaf")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_orchestrator_enabled_false_forces_leaf(self, mock_creds):
"""Kill switch delegation.orchestrator_enabled=false overrides
role='orchestrator'."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "delegation"]
with patch("tools.delegate_tool._load_config",
return_value={"orchestrator_enabled": False}):
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator",
parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertNotIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "leaf")
# ── Role-aware system prompt ────────────────────────────────────────
def test_leaf_prompt_does_not_mention_delegation(self):
prompt = _build_child_system_prompt(
"Fix tests", role="leaf",
max_spawn_depth=2, child_depth=1,
)
self.assertNotIn("delegate_task", prompt)
self.assertNotIn("Orchestrator Role", prompt)
def test_orchestrator_prompt_mentions_delegation_capability(self):
prompt = _build_child_system_prompt(
"Survey approaches", role="orchestrator",
max_spawn_depth=2, child_depth=1,
)
self.assertIn("delegate_task", prompt)
self.assertIn("Orchestrator Role", prompt)
# Depth/max-depth note present and literal:
self.assertIn("depth 1", prompt)
self.assertIn("max_spawn_depth=2", prompt)
def test_orchestrator_prompt_at_depth_floor_says_children_are_leaves(self):
"""With max_spawn_depth=2 and child_depth=1, the orchestrator's
own children would be at depth 2 (the floor) → must be leaves."""
prompt = _build_child_system_prompt(
"Survey", role="orchestrator",
max_spawn_depth=2, child_depth=1,
)
self.assertIn("MUST be leaves", prompt)
def test_orchestrator_prompt_below_floor_allows_more_nesting(self):
"""With max_spawn_depth=3 and child_depth=1, the orchestrator's
own children can themselves be orchestrators (depth 2 < 3)."""
prompt = _build_child_system_prompt(
"Deep work", role="orchestrator",
max_spawn_depth=3, child_depth=1,
)
self.assertIn("can themselves be orchestrators", prompt)
# ── Batch mode and intersection ─────────────────────────────────────
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_batch_mode_per_task_role_override(self, mock_cfg, mock_creds):
"""Per-task role beats top-level; no top-level role → "leaf".
tasks=[{role:'orchestrator'},{role:'leaf'},{}] → first gets
delegation, second and third don't. Requires max_spawn_depth>=2
(raised explicitly here) since the new default is 1 (flat).
"""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file", "delegation"]
built_toolsets = []
def _factory(*a, **kw):
m = _make_role_mock_child()
built_toolsets.append(kw.get("enabled_toolsets"))
return m
with patch("run_agent.AIAgent", side_effect=_factory):
delegate_task(
tasks=[
{"goal": "A", "role": "orchestrator"},
{"goal": "B", "role": "leaf"},
{"goal": "C"}, # no role → falls back to top_role (leaf)
],
parent_agent=parent,
)
self.assertIn("delegation", built_toolsets[0])
self.assertNotIn("delegation", built_toolsets[1])
self.assertNotIn("delegation", built_toolsets[2])
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_intersection_preserves_delegation_bound(
self, mock_cfg, mock_creds
):
"""Design decision: orchestrator capability is granted by role,
NOT inherited from the parent's toolset. A parent without
'delegation' in its enabled_toolsets can still spawn an
orchestrator child — the re-add in _build_child_agent runs
unconditionally for orchestrators (when max_spawn_depth allows).
If you want to change to "parent must have delegation too",
update _build_child_agent to check parent_toolsets before the
re-add and update this test to match.
"""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file"] # no delegation
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator",
parent_agent=parent)
self.assertIn("delegation", MockAgent.call_args[1]["enabled_toolsets"])
class TestOrchestratorEndToEnd(unittest.TestCase):
"""End-to-end: parent -> orchestrator -> two-leaf nested orchestration.
Covers the acceptance gate: parent delegates to an orchestrator
child; the orchestrator delegates to two leaf grandchildren; the
role/toolset/depth chain all resolve correctly.
Mock strategy: a single AIAgent patch with a side_effect factory
that keys on the child's ephemeral_system_prompt — orchestrator
prompts contain the string "Orchestrator Role" (see
_build_child_system_prompt), leaves don't. The orchestrator
mock's run_conversation recursively calls delegate_task with
tasks=[{goal:...},{goal:...}] to spawn two leaves. This keeps
the test in one patch context and avoids depth-indexed nesting.
"""
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_end_to_end_nested_orchestration(self, mock_cfg, mock_creds):
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file", "delegation"]
# (enabled_toolsets, _delegate_role) for each agent built
built_agents: list = []
# Keep the orchestrator mock around so the re-entrant delegate_task
# can reach it via closure.
orch_mock = {}
def _factory(*a, **kw):
prompt = kw.get("ephemeral_system_prompt", "") or ""
is_orchestrator = "Orchestrator Role" in prompt
m = _make_role_mock_child()
built_agents.append({
"enabled_toolsets": list(kw.get("enabled_toolsets") or []),
"is_orchestrator_prompt": is_orchestrator,
})
if is_orchestrator:
# Prepare the orchestrator mock as a parent-capable object
# so the nested delegate_task call succeeds.
m._delegate_depth = 1
m._delegate_role = "orchestrator"
m._active_children = []
m._active_children_lock = threading.Lock()
m._session_db = None
m.platform = "cli"
m.enabled_toolsets = ["terminal", "file", "delegation"]
m.api_key = "***"
m.base_url = ""
m.provider = None
m.api_mode = None
m.providers_allowed = None
m.providers_ignored = None
m.providers_order = None
m.provider_sort = None
m._print_fn = None
m.tool_progress_callback = None
m.thinking_callback = None
orch_mock["agent"] = m
def _orchestrator_run(user_message=None):
# Re-entrant: orchestrator spawns two leaves
delegate_task(
tasks=[{"goal": "leaf-A"}, {"goal": "leaf-B"}],
parent_agent=m,
)
return {
"final_response": "orchestrated 2 workers",
"completed": True, "api_calls": 1,
"messages": [],
}
m.run_conversation.side_effect = _orchestrator_run
return m
with patch("run_agent.AIAgent", side_effect=_factory) as MockAgent:
delegate_task(
goal="top-level orchestration",
role="orchestrator",
parent_agent=parent,
)
# 1 orchestrator + 2 leaf grandchildren = 3 agents
self.assertEqual(MockAgent.call_count, 3)
# First built = the orchestrator (parent's direct child)
self.assertIn("delegation", built_agents[0]["enabled_toolsets"])
self.assertTrue(built_agents[0]["is_orchestrator_prompt"])
# Next two = leaves (grandchildren)
self.assertNotIn("delegation", built_agents[1]["enabled_toolsets"])
self.assertFalse(built_agents[1]["is_orchestrator_prompt"])
self.assertNotIn("delegation", built_agents[2]["enabled_toolsets"])
self.assertFalse(built_agents[2]["is_orchestrator_prompt"])
if __name__ == "__main__":
unittest.main()