feat(delegate): orchestrator role and configurable spawn depth (default flat)

Adds role='leaf'|'orchestrator' to delegate_task. With max_spawn_depth>=2,
an orchestrator child retains the 'delegation' toolset and can spawn its
own workers; leaf children cannot delegate further (identical to today).

Default posture is flat — max_spawn_depth=1 means a depth-0 parent's
children land at the depth-1 floor and orchestrator role silently
degrades to leaf. Users opt into nested delegation by raising
max_spawn_depth to 2 or 3 in config.yaml.

Also threads acp_command/acp_args through the main agent loop's delegate
dispatch (previously silently dropped in the schema) via a new
_dispatch_delegate_task helper, and adds a DelegateEvent enum with
legacy-string back-compat for gateway/ACP/CLI progress consumers.

Config (hermes_cli/config.py defaults):
  delegation.max_concurrent_children: 3   # floor-only, no upper cap
  delegation.max_spawn_depth: 1           # 1=flat (default), 2-3 unlock nested
  delegation.orchestrator_enabled: true   # global kill switch

Salvaged from @pefontana's PR #11215. Overrides vs. the original PR:
concurrency stays at 3 (PR bumped to 5 + cap 8 — we keep the floor only,
no hard ceiling); max_spawn_depth defaults to 1 (PR defaulted to 2 which
silently enabled one level of orchestration for every user).

Co-authored-by: pefontana <fontana.pedro93@gmail.com>
This commit is contained in:
pefontana 2026-04-21 14:11:53 -07:00 committed by Teknium
parent e7f8a5fea3
commit 48ecb98f8a
11 changed files with 1003 additions and 64 deletions

View file

@ -770,9 +770,12 @@ code_execution:
# Subagent Delegation # Subagent Delegation
# ============================================================================= # =============================================================================
# The delegate_task tool spawns child agents with isolated context. # The delegate_task tool spawns child agents with isolated context.
# Supports single tasks and batch mode (up to 3 parallel). # Supports single tasks and batch mode (default 3 parallel, configurable).
delegation: delegation:
max_iterations: 50 # Max tool-calling turns per child (default: 50) max_iterations: 50 # Max tool-calling turns per child (default: 50)
# max_concurrent_children: 3 # Max parallel child agents (default: 3)
# max_spawn_depth: 1 # Tree depth cap (1-3, default: 1 = flat). Raise to 2 or 3 to allow orchestrator children to spawn their own workers.
# orchestrator_enabled: true # Kill switch for role="orchestrator" children (default: true).
# model: "google/gemini-3-flash-preview" # Override model for subagents (empty = inherit parent) # model: "google/gemini-3-flash-preview" # Override model for subagents (empty = inherit parent)
# provider: "openrouter" # Override provider for subagents (empty = inherit parent) # provider: "openrouter" # Override provider for subagents (empty = inherit parent)
# # Resolves full credentials (base_url, api_key) automatically. # # Resolves full credentials (base_url, api_key) automatically.

View file

@ -712,6 +712,12 @@ DEFAULT_CONFIG = {
# independent of the parent's max_iterations) # independent of the parent's max_iterations)
"reasoning_effort": "", # reasoning effort for subagents: "xhigh", "high", "medium", "reasoning_effort": "", # reasoning effort for subagents: "xhigh", "high", "medium",
# "low", "minimal", "none" (empty = inherit parent's level) # "low", "minimal", "none" (empty = inherit parent's level)
"max_concurrent_children": 3, # max parallel children per batch; floor of 1 enforced, no ceiling
# Orchestrator role controls (see tools/delegate_tool.py:_get_max_spawn_depth
# and _get_orchestrator_enabled). Values are clamped to [1, 3] with a
# warning log if out of range.
"max_spawn_depth": 1, # depth cap (1 = flat [default], 2 = orchestrator→leaf, 3 = three-level)
"orchestrator_enabled": True, # kill switch for role="orchestrator"
}, },
# Ephemeral prefill messages file — JSON list of {role, content} dicts # Ephemeral prefill messages file — JSON list of {role, content} dicts

View file

@ -127,7 +127,7 @@ TIPS = [
# --- Tools & Capabilities --- # --- Tools & Capabilities ---
"execute_code runs Python scripts that call Hermes tools programmatically — results stay out of context.", "execute_code runs Python scripts that call Hermes tools programmatically — results stay out of context.",
"delegate_task spawns up to 3 concurrent sub-agents with isolated contexts for parallel work.", "delegate_task spawns up to 3 concurrent sub-agents by default (configurable via delegation.max_concurrent_children) with isolated contexts for parallel work.",
"web_extract works on PDF URLs — pass any PDF link and it converts to markdown.", "web_extract works on PDF URLs — pass any PDF link and it converts to markdown.",
"search_files is ripgrep-backed and faster than grep — use it instead of terminal grep.", "search_files is ripgrep-backed and faster than grep — use it instead of terminal grep.",
"patch uses 9 fuzzy matching strategies so minor whitespace differences won't break edits.", "patch uses 9 fuzzy matching strategies so minor whitespace differences won't break edits.",

View file

@ -7629,8 +7629,27 @@ class AIAgent:
finally: finally:
self._executing_tools = False self._executing_tools = False
def _dispatch_delegate_task(self, function_args: dict) -> str:
"""Single call site for delegate_task dispatch.
New DELEGATE_TASK_SCHEMA fields only need to be added here to reach all
invocation paths (concurrent, sequential, inline).
"""
from tools.delegate_tool import delegate_task as _delegate_task
return _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=function_args.get("tasks"),
max_iterations=function_args.get("max_iterations"),
acp_command=function_args.get("acp_command"),
acp_args=function_args.get("acp_args"),
role=function_args.get("role"),
parent_agent=self,
)
def _invoke_tool(self, function_name: str, function_args: dict, effective_task_id: str, def _invoke_tool(self, function_name: str, function_args: dict, effective_task_id: str,
tool_call_id: Optional[str] = None) -> str: tool_call_id: Optional[str] = None, messages: list = None) -> str:
"""Invoke a single tool and return the result string. No display logic. """Invoke a single tool and return the result string. No display logic.
Handles both agent-level tools (todo, memory, etc.) and registry-dispatched Handles both agent-level tools (todo, memory, etc.) and registry-dispatched
@ -7698,15 +7717,7 @@ class AIAgent:
callback=self.clarify_callback, callback=self.clarify_callback,
) )
elif function_name == "delegate_task": elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task return self._dispatch_delegate_task(function_args)
return _delegate_task(
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=function_args.get("tasks"),
max_iterations=function_args.get("max_iterations"),
parent_agent=self,
)
else: else:
return handle_function_call( return handle_function_call(
function_name, function_args, effective_task_id, function_name, function_args, effective_task_id,
@ -7868,7 +7879,7 @@ class AIAgent:
pass pass
start = time.time() start = time.time()
try: try:
result = self._invoke_tool(function_name, function_args, effective_task_id, tool_call.id) result = self._invoke_tool(function_name, function_args, effective_task_id, tool_call.id, messages=messages)
except Exception as tool_error: except Exception as tool_error:
result = f"Error executing tool '{function_name}': {tool_error}" result = f"Error executing tool '{function_name}': {tool_error}"
logger.error("_invoke_tool raised for %s: %s", function_name, tool_error, exc_info=True) logger.error("_invoke_tool raised for %s: %s", function_name, tool_error, exc_info=True)
@ -8220,7 +8231,6 @@ class AIAgent:
if self._should_emit_quiet_tool_messages(): if self._should_emit_quiet_tool_messages():
self._vprint(f" {_get_cute_tool_message_impl('clarify', function_args, tool_duration, result=function_result)}") self._vprint(f" {_get_cute_tool_message_impl('clarify', function_args, tool_duration, result=function_result)}")
elif function_name == "delegate_task": elif function_name == "delegate_task":
from tools.delegate_tool import delegate_task as _delegate_task
tasks_arg = function_args.get("tasks") tasks_arg = function_args.get("tasks")
if tasks_arg and isinstance(tasks_arg, list): if tasks_arg and isinstance(tasks_arg, list):
spinner_label = f"🔀 delegating {len(tasks_arg)} tasks" spinner_label = f"🔀 delegating {len(tasks_arg)} tasks"
@ -8235,14 +8245,7 @@ class AIAgent:
self._delegate_spinner = spinner self._delegate_spinner = spinner
_delegate_result = None _delegate_result = None
try: try:
function_result = _delegate_task( function_result = self._dispatch_delegate_task(function_args)
goal=function_args.get("goal"),
context=function_args.get("context"),
toolsets=function_args.get("toolsets"),
tasks=tasks_arg,
max_iterations=function_args.get("max_iterations"),
parent_agent=self,
)
_delegate_result = function_result _delegate_result = function_result
finally: finally:
self._delegate_spinner = None self._delegate_spinner = None

View file

@ -193,7 +193,7 @@ class TestBuildChildProgressCallback:
# task_index=0 in a batch of 3 → prefix "[1]" # task_index=0 in a batch of 3 → prefix "[1]"
cb0 = _build_child_progress_callback(0, "test goal", parent, task_count=3) cb0 = _build_child_progress_callback(0, "test goal", parent, task_count=3)
cb0("web_search", "test") cb0("tool.started", "web_search", "test", {})
output = buf.getvalue() output = buf.getvalue()
assert "[1]" in output assert "[1]" in output
@ -201,7 +201,7 @@ class TestBuildChildProgressCallback:
buf.truncate(0) buf.truncate(0)
buf.seek(0) buf.seek(0)
cb2 = _build_child_progress_callback(2, "test goal", parent, task_count=3) cb2 = _build_child_progress_callback(2, "test goal", parent, task_count=3)
cb2("web_search", "test") cb2("tool.started", "web_search", "test", {})
output = buf.getvalue() output = buf.getvalue()
assert "[3]" in output assert "[3]" in output

View file

@ -10,7 +10,7 @@ import inspect
def test_delegation_default_toolsets_removed_from_cli_config(): def test_delegation_default_toolsets_removed_from_cli_config():
"""delegation.default_toolsets was dead config — never read by """delegation.default_toolsets was dead config — never read by
_load_config() or anywhere else. Removed in M0.5. _load_config() or anywhere else. Removed.
Guards against accidental re-introduction in cli.py's CLI_CONFIG default Guards against accidental re-introduction in cli.py's CLI_CONFIG default
dict. If this test fails, someone re-added the key without wiring it up dict. If this test fails, someone re-added the key without wiring it up

View file

@ -20,11 +20,14 @@ from unittest.mock import MagicMock, patch
from tools.delegate_tool import ( from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS, DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA, DELEGATE_TASK_SCHEMA,
DelegateEvent,
_get_max_concurrent_children, _get_max_concurrent_children,
_LEGACY_EVENT_MAP,
MAX_DEPTH, MAX_DEPTH,
check_delegate_requirements, check_delegate_requirements,
delegate_task, delegate_task,
_build_child_agent, _build_child_agent,
_build_child_progress_callback,
_build_child_system_prompt, _build_child_system_prompt,
_strip_blocked_tools, _strip_blocked_tools,
_resolve_child_credential_pool, _resolve_child_credential_pool,
@ -568,8 +571,16 @@ class TestBlockedTools(unittest.TestCase):
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS) self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self): def test_constants(self):
from tools.delegate_tool import (
_get_max_spawn_depth, _get_orchestrator_enabled,
_MIN_SPAWN_DEPTH, _MAX_SPAWN_DEPTH_CAP,
)
self.assertEqual(_get_max_concurrent_children(), 3) self.assertEqual(_get_max_concurrent_children(), 3)
self.assertEqual(MAX_DEPTH, 2) self.assertEqual(MAX_DEPTH, 1)
self.assertEqual(_get_max_spawn_depth(), 1) # default: flat
self.assertTrue(_get_orchestrator_enabled()) # default
self.assertEqual(_MIN_SPAWN_DEPTH, 1)
self.assertEqual(_MAX_SPAWN_DEPTH_CAP, 3)
class TestDelegationCredentialResolution(unittest.TestCase): class TestDelegationCredentialResolution(unittest.TestCase):
@ -1325,5 +1336,635 @@ class TestDelegationReasoningEffort(unittest.TestCase):
self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "medium"}) self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "medium"})
# =========================================================================
# Dispatch helper, progress events, concurrency
# =========================================================================
class TestDispatchDelegateTask(unittest.TestCase):
"""Tests for the _dispatch_delegate_task helper and full param forwarding."""
@patch("tools.delegate_tool._load_config", return_value={})
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_acp_args_forwarded(self, mock_creds, mock_cfg):
"""Both acp_command and acp_args reach delegate_task via the helper."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
with patch("tools.delegate_tool._build_child_agent") as mock_build:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True,
"api_calls": 1, "messages": [],
}
mock_child._delegate_saved_tool_names = []
mock_child._credential_pool = None
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.model = "test"
mock_build.return_value = mock_child
delegate_task(
goal="test",
acp_command="claude",
acp_args=["--acp", "--stdio"],
parent_agent=parent,
)
_, kwargs = mock_build.call_args
self.assertEqual(kwargs["override_acp_command"], "claude")
self.assertEqual(kwargs["override_acp_args"], ["--acp", "--stdio"])
class TestDelegateEventEnum(unittest.TestCase):
"""Tests for DelegateEvent enum and back-compat aliases."""
def test_enum_values_are_strings(self):
for event in DelegateEvent:
self.assertIsInstance(event.value, str)
self.assertTrue(event.value.startswith("delegate."))
def test_legacy_map_covers_all_old_names(self):
expected_legacy = {"_thinking", "reasoning.available",
"tool.started", "tool.completed", "subagent_progress"}
self.assertEqual(set(_LEGACY_EVENT_MAP.keys()), expected_legacy)
def test_legacy_map_values_are_delegate_events(self):
for old_name, event in _LEGACY_EVENT_MAP.items():
self.assertIsInstance(event, DelegateEvent)
def test_progress_callback_normalises_tool_started(self):
"""_build_child_progress_callback handles tool.started via enum."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
self.assertIsNotNone(cb)
cb("tool.started", tool_name="terminal", preview="ls")
parent._delegate_spinner.print_above.assert_called()
def test_progress_callback_normalises_thinking(self):
"""Both _thinking and reasoning.available route to TASK_THINKING."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("_thinking", tool_name=None, preview="pondering...")
assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list)
parent._delegate_spinner.print_above.reset_mock()
cb("reasoning.available", tool_name=None, preview="hmm")
assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list)
def test_progress_callback_tool_completed_is_noop(self):
"""tool.completed is normalised but produces no display output."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("tool.completed", tool_name="terminal")
parent._delegate_spinner.print_above.assert_not_called()
def test_progress_callback_ignores_unknown_events(self):
"""Unknown event types are silently ignored."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
# Should not raise
cb("some.unknown.event", tool_name="x")
parent._delegate_spinner.print_above.assert_not_called()
def test_progress_callback_accepts_enum_value_directly(self):
"""cb(DelegateEvent.TASK_THINKING, ...) must route to the thinking
branch. Pre-fix the callback only handled legacy strings via
_LEGACY_EVENT_MAP.get and silently dropped enum-typed callers."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = None
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb(DelegateEvent.TASK_THINKING, preview="pondering")
# If the enum was accepted, the thinking emoji got printed.
assert any(
"💭" in str(c)
for c in parent._delegate_spinner.print_above.call_args_list
)
def test_progress_callback_accepts_new_style_string(self):
"""cb('delegate.task_thinking', ...) — the string form of the
enum value must route to the thinking branch too, so new-style
emitters don't have to import DelegateEvent."""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("delegate.task_thinking", preview="hmm")
assert any(
"💭" in str(c)
for c in parent._delegate_spinner.print_above.call_args_list
)
def test_progress_callback_task_progress_not_misrendered(self):
"""'subagent_progress' (legacy name for TASK_PROGRESS) carries a
pre-batched summary in the tool_name slot. Before the fix, this
fell through to the TASK_TOOL_STARTED rendering path, treating
the summary string as a tool name. After the fix: distinct
render (no tool-start emoji lookup) and pass-through relay
upward (no re-batching).
Regression path only reachable once nested orchestration is
enabled: nested orchestrators relay subagent_progress from
grandchildren upward through this callback.
"""
parent = _make_mock_parent()
parent._delegate_spinner = MagicMock()
parent.tool_progress_callback = MagicMock()
cb = _build_child_progress_callback(0, "test goal", parent, task_count=1)
cb("subagent_progress", tool_name="🔀 [1] terminal, file")
# Spinner gets a distinct 🔀-prefixed line, NOT a tool emoji
# followed by the summary string as if it were a tool name.
calls = parent._delegate_spinner.print_above.call_args_list
self.assertTrue(any("🔀 🔀 [1] terminal, file" in str(c) for c in calls))
# Parent callback receives the relay (pass-through, no re-batching).
parent.tool_progress_callback.assert_called_once()
# No '⚡' tool-start emoji should appear — that's the pre-fix bug.
self.assertFalse(any("" in str(c) for c in calls))
class TestConcurrencyDefaults(unittest.TestCase):
"""Tests for the concurrency default and no hard ceiling."""
@patch("tools.delegate_tool._load_config", return_value={})
def test_default_is_three(self, mock_cfg):
# Clear env var if set
with patch.dict(os.environ, {}, clear=True):
self.assertEqual(_get_max_concurrent_children(), 3)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 10})
def test_no_upper_ceiling(self, mock_cfg):
"""Users can raise concurrency as high as they want — no hard cap."""
self.assertEqual(_get_max_concurrent_children(), 10)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 100})
def test_very_high_values_honored(self, mock_cfg):
self.assertEqual(_get_max_concurrent_children(), 100)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 0})
def test_zero_clamped_to_one(self, mock_cfg):
"""Floor of 1 is enforced; zero or negative values raise to 1."""
self.assertEqual(_get_max_concurrent_children(), 1)
@patch("tools.delegate_tool._load_config", return_value={})
def test_env_var_honored_uncapped(self, mock_cfg):
with patch.dict(os.environ, {"DELEGATION_MAX_CONCURRENT_CHILDREN": "12"}):
self.assertEqual(_get_max_concurrent_children(), 12)
@patch("tools.delegate_tool._load_config",
return_value={"max_concurrent_children": 6})
def test_configured_value_returned(self, mock_cfg):
self.assertEqual(_get_max_concurrent_children(), 6)
# =========================================================================
# max_spawn_depth clamping
# =========================================================================
class TestMaxSpawnDepth(unittest.TestCase):
"""Tests for _get_max_spawn_depth clamping and fallback behavior."""
@patch("tools.delegate_tool._load_config", return_value={})
def test_max_spawn_depth_defaults_to_1(self, mock_cfg):
from tools.delegate_tool import _get_max_spawn_depth
self.assertEqual(_get_max_spawn_depth(), 1)
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 0})
def test_max_spawn_depth_clamped_below_one(self, mock_cfg):
import logging
from tools.delegate_tool import _get_max_spawn_depth
with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm:
result = _get_max_spawn_depth()
self.assertEqual(result, 1)
self.assertTrue(any("clamping to 1" in m for m in cm.output))
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 99})
def test_max_spawn_depth_clamped_above_three(self, mock_cfg):
import logging
from tools.delegate_tool import _get_max_spawn_depth
with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm:
result = _get_max_spawn_depth()
self.assertEqual(result, 3)
self.assertTrue(any("clamping to 3" in m for m in cm.output))
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": "not-a-number"})
def test_max_spawn_depth_invalid_falls_back_to_default(self, mock_cfg):
from tools.delegate_tool import _get_max_spawn_depth
self.assertEqual(_get_max_spawn_depth(), 1)
# =========================================================================
# role param plumbing
# =========================================================================
#
# These tests cover the schema + signature + stash plumbing of the role
# param. The full role-honoring behavior (toolset re-add, role-aware
# prompt) lives in TestOrchestratorRoleBehavior below; these tests only
# assert on _delegate_role stashing and on the schema shape.
class TestOrchestratorRoleSchema(unittest.TestCase):
"""Tests that the role param reaches the child via dispatch."""
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def _run_with_mock_child(self, role_arg, mock_cfg, mock_creds):
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
with patch("run_agent.AIAgent") as MockAgent:
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True,
"api_calls": 1, "messages": [],
}
mock_child._delegate_saved_tool_names = []
mock_child._credential_pool = None
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.model = "test"
MockAgent.return_value = mock_child
kwargs = {"goal": "test", "parent_agent": parent}
if role_arg is not _SENTINEL:
kwargs["role"] = role_arg
delegate_task(**kwargs)
return mock_child
def test_default_role_is_leaf(self):
child = self._run_with_mock_child(_SENTINEL)
self.assertEqual(child._delegate_role, "leaf")
def test_explicit_orchestrator_role_stashed(self):
"""role='orchestrator' reaches _build_child_agent and is stashed.
Full behavior (toolset re-add) lands in commit 3; commit 2 only
verifies the plumbing."""
child = self._run_with_mock_child("orchestrator")
self.assertEqual(child._delegate_role, "orchestrator")
def test_unknown_role_coerces_to_leaf(self):
"""role='nonsense' → _normalize_role warns and returns 'leaf'."""
import logging
with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm:
child = self._run_with_mock_child("nonsense")
self.assertEqual(child._delegate_role, "leaf")
self.assertTrue(any("coercing" in m.lower() for m in cm.output))
def test_schema_has_role_top_level_and_per_task(self):
from tools.delegate_tool import DELEGATE_TASK_SCHEMA
props = DELEGATE_TASK_SCHEMA["parameters"]["properties"]
self.assertIn("role", props)
self.assertEqual(props["role"]["enum"], ["leaf", "orchestrator"])
task_props = props["tasks"]["items"]["properties"]
self.assertIn("role", task_props)
self.assertEqual(task_props["role"]["enum"], ["leaf", "orchestrator"])
# Sentinel used to distinguish "role kwarg omitted" from "role=None".
_SENTINEL = object()
# =========================================================================
# role-honoring behavior
# =========================================================================
def _make_role_mock_child():
"""Helper: mock child with minimal fields for delegate_task to process."""
mock_child = MagicMock()
mock_child.run_conversation.return_value = {
"final_response": "done", "completed": True,
"api_calls": 1, "messages": [],
}
mock_child._delegate_saved_tool_names = []
mock_child._credential_pool = None
mock_child.session_prompt_tokens = 0
mock_child.session_completion_tokens = 0
mock_child.model = "test"
return mock_child
class TestOrchestratorRoleBehavior(unittest.TestCase):
"""Tests that role='orchestrator' actually changes toolset + prompt."""
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_orchestrator_role_keeps_delegation_at_depth_1(
self, mock_cfg, mock_creds
):
"""role='orchestrator' + depth-0 parent with max_spawn_depth=2 →
child at depth 1 gets 'delegation' in enabled_toolsets (can
further delegate). Requires max_spawn_depth>=2 since the new
default is 1 (flat)."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file"]
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator", parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "orchestrator")
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_orchestrator_blocked_at_max_spawn_depth(
self, mock_cfg, mock_creds
):
"""Parent at depth 1 with max_spawn_depth=2 spawns child
at depth 2 (the floor); role='orchestrator' degrades to leaf."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=1)
parent.enabled_toolsets = ["terminal", "delegation"]
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator", parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertNotIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "leaf")
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config", return_value={})
def test_orchestrator_blocked_at_default_flat_depth(
self, mock_cfg, mock_creds
):
"""With default max_spawn_depth=1 (flat), role='orchestrator'
on a depth-0 parent produces a depth-1 child that is already at
the floor the role degrades to 'leaf' and the delegation
toolset is stripped. This is the new default posture."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file", "delegation"]
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator", parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertNotIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "leaf")
@patch("tools.delegate_tool._resolve_delegation_credentials")
def test_orchestrator_enabled_false_forces_leaf(self, mock_creds):
"""Kill switch delegation.orchestrator_enabled=false overrides
role='orchestrator'."""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "delegation"]
with patch("tools.delegate_tool._load_config",
return_value={"orchestrator_enabled": False}):
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator",
parent_agent=parent)
kwargs = MockAgent.call_args[1]
self.assertNotIn("delegation", kwargs["enabled_toolsets"])
self.assertEqual(mock_child._delegate_role, "leaf")
# ── Role-aware system prompt ────────────────────────────────────────
def test_leaf_prompt_does_not_mention_delegation(self):
prompt = _build_child_system_prompt(
"Fix tests", role="leaf",
max_spawn_depth=2, child_depth=1,
)
self.assertNotIn("delegate_task", prompt)
self.assertNotIn("Orchestrator Role", prompt)
def test_orchestrator_prompt_mentions_delegation_capability(self):
prompt = _build_child_system_prompt(
"Survey approaches", role="orchestrator",
max_spawn_depth=2, child_depth=1,
)
self.assertIn("delegate_task", prompt)
self.assertIn("Orchestrator Role", prompt)
# Depth/max-depth note present and literal:
self.assertIn("depth 1", prompt)
self.assertIn("max_spawn_depth=2", prompt)
def test_orchestrator_prompt_at_depth_floor_says_children_are_leaves(self):
"""With max_spawn_depth=2 and child_depth=1, the orchestrator's
own children would be at depth 2 (the floor) must be leaves."""
prompt = _build_child_system_prompt(
"Survey", role="orchestrator",
max_spawn_depth=2, child_depth=1,
)
self.assertIn("MUST be leaves", prompt)
def test_orchestrator_prompt_below_floor_allows_more_nesting(self):
"""With max_spawn_depth=3 and child_depth=1, the orchestrator's
own children can themselves be orchestrators (depth 2 < 3)."""
prompt = _build_child_system_prompt(
"Deep work", role="orchestrator",
max_spawn_depth=3, child_depth=1,
)
self.assertIn("can themselves be orchestrators", prompt)
# ── Batch mode and intersection ─────────────────────────────────────
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_batch_mode_per_task_role_override(self, mock_cfg, mock_creds):
"""Per-task role beats top-level; no top-level role → "leaf".
tasks=[{role:'orchestrator'},{role:'leaf'},{}] first gets
delegation, second and third don't. Requires max_spawn_depth>=2
(raised explicitly here) since the new default is 1 (flat).
"""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file", "delegation"]
built_toolsets = []
def _factory(*a, **kw):
m = _make_role_mock_child()
built_toolsets.append(kw.get("enabled_toolsets"))
return m
with patch("run_agent.AIAgent", side_effect=_factory):
delegate_task(
tasks=[
{"goal": "A", "role": "orchestrator"},
{"goal": "B", "role": "leaf"},
{"goal": "C"}, # no role → falls back to top_role (leaf)
],
parent_agent=parent,
)
self.assertIn("delegation", built_toolsets[0])
self.assertNotIn("delegation", built_toolsets[1])
self.assertNotIn("delegation", built_toolsets[2])
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_intersection_preserves_delegation_bound(
self, mock_cfg, mock_creds
):
"""Design decision: orchestrator capability is granted by role,
NOT inherited from the parent's toolset. A parent without
'delegation' in its enabled_toolsets can still spawn an
orchestrator child the re-add in _build_child_agent runs
unconditionally for orchestrators (when max_spawn_depth allows).
If you want to change to "parent must have delegation too",
update _build_child_agent to check parent_toolsets before the
re-add and update this test to match.
"""
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file"] # no delegation
with patch("run_agent.AIAgent") as MockAgent:
mock_child = _make_role_mock_child()
MockAgent.return_value = mock_child
delegate_task(goal="test", role="orchestrator",
parent_agent=parent)
self.assertIn("delegation", MockAgent.call_args[1]["enabled_toolsets"])
class TestOrchestratorEndToEnd(unittest.TestCase):
"""End-to-end: parent -> orchestrator -> two-leaf nested orchestration.
Covers the acceptance gate: parent delegates to an orchestrator
child; the orchestrator delegates to two leaf grandchildren; the
role/toolset/depth chain all resolve correctly.
Mock strategy: a single AIAgent patch with a side_effect factory
that keys on the child's ephemeral_system_prompt — orchestrator
prompts contain the string "Orchestrator Role" (see
_build_child_system_prompt), leaves don't. The orchestrator
mock's run_conversation recursively calls delegate_task with
tasks=[{goal:...},{goal:...}] to spawn two leaves. This keeps
the test in one patch context and avoids depth-indexed nesting.
"""
@patch("tools.delegate_tool._resolve_delegation_credentials")
@patch("tools.delegate_tool._load_config",
return_value={"max_spawn_depth": 2})
def test_end_to_end_nested_orchestration(self, mock_cfg, mock_creds):
mock_creds.return_value = {
"provider": None, "base_url": None,
"api_key": None, "api_mode": None, "model": None,
}
parent = _make_mock_parent(depth=0)
parent.enabled_toolsets = ["terminal", "file", "delegation"]
# (enabled_toolsets, _delegate_role) for each agent built
built_agents: list = []
# Keep the orchestrator mock around so the re-entrant delegate_task
# can reach it via closure.
orch_mock = {}
def _factory(*a, **kw):
prompt = kw.get("ephemeral_system_prompt", "") or ""
is_orchestrator = "Orchestrator Role" in prompt
m = _make_role_mock_child()
built_agents.append({
"enabled_toolsets": list(kw.get("enabled_toolsets") or []),
"is_orchestrator_prompt": is_orchestrator,
})
if is_orchestrator:
# Prepare the orchestrator mock as a parent-capable object
# so the nested delegate_task call succeeds.
m._delegate_depth = 1
m._delegate_role = "orchestrator"
m._active_children = []
m._active_children_lock = threading.Lock()
m._session_db = None
m.platform = "cli"
m.enabled_toolsets = ["terminal", "file", "delegation"]
m.api_key = "***"
m.base_url = ""
m.provider = None
m.api_mode = None
m.providers_allowed = None
m.providers_ignored = None
m.providers_order = None
m.provider_sort = None
m._print_fn = None
m.tool_progress_callback = None
m.thinking_callback = None
orch_mock["agent"] = m
def _orchestrator_run(user_message=None):
# Re-entrant: orchestrator spawns two leaves
delegate_task(
tasks=[{"goal": "leaf-A"}, {"goal": "leaf-B"}],
parent_agent=m,
)
return {
"final_response": "orchestrated 2 workers",
"completed": True, "api_calls": 1,
"messages": [],
}
m.run_conversation.side_effect = _orchestrator_run
return m
with patch("run_agent.AIAgent", side_effect=_factory) as MockAgent:
delegate_task(
goal="top-level orchestration",
role="orchestrator",
parent_agent=parent,
)
# 1 orchestrator + 2 leaf grandchildren = 3 agents
self.assertEqual(MockAgent.call_count, 3)
# First built = the orchestrator (parent's direct child)
self.assertIn("delegation", built_agents[0]["enabled_toolsets"])
self.assertTrue(built_agents[0]["is_orchestrator_prompt"])
# Next two = leaves (grandchildren)
self.assertNotIn("delegation", built_agents[1]["enabled_toolsets"])
self.assertFalse(built_agents[1]["is_orchestrator_prompt"])
self.assertNotIn("delegation", built_agents[2]["enabled_toolsets"])
self.assertFalse(built_agents[2]["is_orchestrator_prompt"])
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View file

@ -16,6 +16,7 @@ The parent's context only sees the delegation call and the summary result,
never the child's intermediate tool calls or reasoning. never the child's intermediate tool calls or reasoning.
""" """
import enum
import json import json
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -41,6 +42,12 @@ DELEGATE_BLOCKED_TOOLS = frozenset([
# Build a description fragment listing toolsets available for subagents. # Build a description fragment listing toolsets available for subagents.
# Excludes toolsets where ALL tools are blocked, composite/platform toolsets # Excludes toolsets where ALL tools are blocked, composite/platform toolsets
# (hermes-* prefixed), and scenario toolsets. # (hermes-* prefixed), and scenario toolsets.
#
# NOTE: "delegation" is in this exclusion set so the subagent-facing
# capability hint string (_TOOLSET_LIST_STR) doesn't advertise it as a
# toolset to request explicitly — the correct mechanism for nested
# delegation is role='orchestrator', which re-adds "delegation" in
# _build_child_agent regardless of this exclusion.
_EXCLUDED_TOOLSET_NAMES = frozenset({"debugging", "safe", "delegation", "moa", "rl"}) _EXCLUDED_TOOLSET_NAMES = frozenset({"debugging", "safe", "delegation", "moa", "rl"})
_SUBAGENT_TOOLSETS = sorted( _SUBAGENT_TOOLSETS = sorted(
name for name, defn in TOOLSETS.items() name for name, defn in TOOLSETS.items()
@ -51,13 +58,36 @@ _SUBAGENT_TOOLSETS = sorted(
_TOOLSET_LIST_STR = ", ".join(f"'{n}'" for n in _SUBAGENT_TOOLSETS) _TOOLSET_LIST_STR = ", ".join(f"'{n}'" for n in _SUBAGENT_TOOLSETS)
_DEFAULT_MAX_CONCURRENT_CHILDREN = 3 _DEFAULT_MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2) MAX_DEPTH = 1 # flat by default: parent (0) -> child (1); grandchild rejected unless max_spawn_depth raised.
# Configurable depth cap consulted by _get_max_spawn_depth; MAX_DEPTH
# stays as the default fallback and is still the symbol tests import.
_MIN_SPAWN_DEPTH = 1
_MAX_SPAWN_DEPTH_CAP = 3
def _normalize_role(r: Optional[str]) -> str:
"""Normalise a caller-provided role to 'leaf' or 'orchestrator'.
None/empty -> 'leaf'. Unknown strings coerce to 'leaf' with a
warning log (matches the silent-degrade pattern of
_get_orchestrator_enabled). _build_child_agent adds a second
degrade layer for depth/kill-switch bounds.
"""
if r is None or not r:
return "leaf"
r_norm = str(r).strip().lower()
if r_norm in ("leaf", "orchestrator"):
return r_norm
logger.warning("Unknown delegate_task role=%r, coercing to 'leaf'", r)
return "leaf"
def _get_max_concurrent_children() -> int: def _get_max_concurrent_children() -> int:
"""Read delegation.max_concurrent_children from config, falling back to """Read delegation.max_concurrent_children from config, falling back to
DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3). DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3).
Users can raise this as high as they want; only the floor (1) is enforced.
Uses the same ``_load_config()`` path that the rest of ``delegate_task`` Uses the same ``_load_config()`` path that the rest of ``delegate_task``
uses, keeping config priority consistent (config.yaml > env > default). uses, keeping config priority consistent (config.yaml > env > default).
""" """
@ -71,18 +101,108 @@ def _get_max_concurrent_children() -> int:
"delegation.max_concurrent_children=%r is not a valid integer; " "delegation.max_concurrent_children=%r is not a valid integer; "
"using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN, "using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN,
) )
return _DEFAULT_MAX_CONCURRENT_CHILDREN
env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN") env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN")
if env_val: if env_val:
try: try:
return max(1, int(env_val)) return max(1, int(env_val))
except (TypeError, ValueError): except (TypeError, ValueError):
pass return _DEFAULT_MAX_CONCURRENT_CHILDREN
return _DEFAULT_MAX_CONCURRENT_CHILDREN return _DEFAULT_MAX_CONCURRENT_CHILDREN
def _get_max_spawn_depth() -> int:
"""Read delegation.max_spawn_depth from config, clamped to [1, 3].
depth 0 = parent agent. max_spawn_depth = N means agents at depths
0..N-1 can spawn; depth N is the leaf floor. Default 1 is flat:
parent spawns children (depth 1), depth-1 children cannot spawn
(blocked by this guard AND, for leaf children, by the delegation
toolset strip in _strip_blocked_tools).
Raise to 2 or 3 to unlock nested orchestration. role="orchestrator"
removes the toolset strip for depth-1 children when
max_spawn_depth >= 2, enabling them to spawn their own workers.
"""
cfg = _load_config()
val = cfg.get("max_spawn_depth")
if val is None:
return MAX_DEPTH
try:
ival = int(val)
except (TypeError, ValueError):
logger.warning(
"delegation.max_spawn_depth=%r is not a valid integer; "
"using default %d", val, MAX_DEPTH,
)
return MAX_DEPTH
clamped = max(_MIN_SPAWN_DEPTH, min(_MAX_SPAWN_DEPTH_CAP, ival))
if clamped != ival:
logger.warning(
"delegation.max_spawn_depth=%d out of range [%d, %d]; "
"clamping to %d", ival, _MIN_SPAWN_DEPTH,
_MAX_SPAWN_DEPTH_CAP, clamped,
)
return clamped
def _get_orchestrator_enabled() -> bool:
"""Global kill switch for the orchestrator role.
When False, role="orchestrator" is silently forced to "leaf" in
_build_child_agent and the delegation toolset is stripped as before.
Lets an operator disable the feature without a code revert.
"""
cfg = _load_config()
val = cfg.get("orchestrator_enabled", True)
if isinstance(val, bool):
return val
# Accept "true"/"false" strings from YAML that doesn't auto-coerce.
if isinstance(val, str):
return val.strip().lower() in ("true", "1", "yes", "on")
return True
DEFAULT_MAX_ITERATIONS = 50 DEFAULT_MAX_ITERATIONS = 50
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation _HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
DEFAULT_TOOLSETS = ["terminal", "file", "web"] DEFAULT_TOOLSETS = ["terminal", "file", "web"]
# ---------------------------------------------------------------------------
# Delegation progress event types
# ---------------------------------------------------------------------------
class DelegateEvent(str, enum.Enum):
"""Formal event types emitted during delegation progress.
_build_child_progress_callback normalises incoming legacy strings
(``tool.started``, ``_thinking``, ) to these enum values via
``_LEGACY_EVENT_MAP``. External consumers (gateway SSE, ACP adapter,
CLI) still receive the legacy strings during the deprecation window.
TASK_SPAWNED / TASK_COMPLETED / TASK_FAILED are reserved for
future orchestrator lifecycle events and are not currently emitted.
"""
TASK_SPAWNED = "delegate.task_spawned"
TASK_PROGRESS = "delegate.task_progress"
TASK_COMPLETED = "delegate.task_completed"
TASK_FAILED = "delegate.task_failed"
TASK_THINKING = "delegate.task_thinking"
TASK_TOOL_STARTED = "delegate.tool_started"
TASK_TOOL_COMPLETED = "delegate.tool_completed"
# Legacy event strings → DelegateEvent mapping.
# Incoming child-agent events use the old names; the callback normalises them.
_LEGACY_EVENT_MAP: Dict[str, DelegateEvent] = {
"_thinking": DelegateEvent.TASK_THINKING,
"reasoning.available": DelegateEvent.TASK_THINKING,
"tool.started": DelegateEvent.TASK_TOOL_STARTED,
"tool.completed": DelegateEvent.TASK_TOOL_COMPLETED,
"subagent_progress": DelegateEvent.TASK_PROGRESS,
}
def check_delegate_requirements() -> bool: def check_delegate_requirements() -> bool:
"""Delegation has no external requirements -- always available.""" """Delegation has no external requirements -- always available."""
return True return True
@ -93,8 +213,18 @@ def _build_child_system_prompt(
context: Optional[str] = None, context: Optional[str] = None,
*, *,
workspace_path: Optional[str] = None, workspace_path: Optional[str] = None,
role: str = "leaf",
max_spawn_depth: int = 2,
child_depth: int = 1,
) -> str: ) -> str:
"""Build a focused system prompt for a child agent.""" """Build a focused system prompt for a child agent.
When role='orchestrator', appends a delegation-capability block
modeled on OpenClaw's buildSubagentSystemPrompt (canSpawn branch at
inspiration/openclaw/src/agents/subagent-system-prompt.ts:63-95).
The depth note is literal truth (grounded in the passed config) so
the LLM doesn't confabulate nesting capabilities that don't exist.
"""
parts = [ parts = [
"You are a focused subagent working on a specific delegated task.", "You are a focused subagent working on a specific delegated task.",
"", "",
@ -120,6 +250,37 @@ def _build_child_system_prompt(
"Be thorough but concise -- your response is returned to the " "Be thorough but concise -- your response is returned to the "
"parent agent as a summary." "parent agent as a summary."
) )
if role == "orchestrator":
child_note = (
"Your own children MUST be leaves (cannot delegate further) "
"because they would be at the depth floor — you cannot pass "
"role='orchestrator' to your own delegate_task calls."
if child_depth + 1 >= max_spawn_depth else
"Your own children can themselves be orchestrators or leaves, "
"depending on the `role` you pass to delegate_task. Default is "
"'leaf'; pass role='orchestrator' explicitly when a child "
"needs to further decompose its work."
)
parts.append(
"\n## Subagent Spawning (Orchestrator Role)\n"
"You have access to the `delegate_task` tool and CAN spawn "
"your own subagents to parallelize independent work.\n\n"
"WHEN to delegate:\n"
"- The goal decomposes into 2+ independent subtasks that can "
"run in parallel (e.g. research A and B simultaneously).\n"
"- A subtask is reasoning-heavy and would flood your context "
"with intermediate data.\n\n"
"WHEN NOT to delegate:\n"
"- Single-step mechanical work — do it directly.\n"
"- Trivial tasks you can execute in one or two tool calls.\n"
"- Re-delegating your entire assigned goal to one worker "
"(that's just pass-through with no value added).\n\n"
"Coordinate your workers' results and synthesize them before "
"reporting back to your parent. You are responsible for the "
"final summary, not your workers.\n\n"
f"NOTE: You are at depth {child_depth}. The delegation tree "
f"is capped at max_spawn_depth={max_spawn_depth}. {child_note}"
)
return "\n".join(parts) return "\n".join(parts)
@ -197,10 +358,9 @@ def _build_child_progress_callback(task_index: int, goal: str, parent_agent, tas
except Exception as e: except Exception as e:
logger.debug("Parent callback failed: %s", e) logger.debug("Parent callback failed: %s", e)
def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs): def _callback(event_type, tool_name: str = None, preview: str = None, args=None, **kwargs):
# event_type is one of: "tool.started", "tool.completed", # Lifecycle events emitted by the orchestrator itself — handled
# "reasoning.available", "_thinking", "subagent.*" # before enum normalisation since they are not part of DelegateEvent.
if event_type == "subagent.start": if event_type == "subagent.start":
if spinner and goal_label: if spinner and goal_label:
short = (goal_label[:55] + "...") if len(goal_label) > 55 else goal_label short = (goal_label[:55] + "...") if len(goal_label) > 55 else goal_label
@ -215,8 +375,21 @@ def _build_child_progress_callback(task_index: int, goal: str, parent_agent, tas
_relay("subagent.complete", preview=preview, **kwargs) _relay("subagent.complete", preview=preview, **kwargs)
return return
# "_thinking" / reasoning events # Normalise legacy strings, new-style "delegate.*" strings, and
if event_type in ("_thinking", "reasoning.available"): # DelegateEvent enum values all to a single DelegateEvent. The
# original implementation only accepted the five legacy strings;
# enum-typed callers were silently dropped.
if isinstance(event_type, DelegateEvent):
event = event_type
else:
event = _LEGACY_EVENT_MAP.get(event_type)
if event is None:
try:
event = DelegateEvent(event_type)
except (ValueError, TypeError):
return # Unknown event — ignore
if event == DelegateEvent.TASK_THINKING:
text = preview or tool_name or "" text = preview or tool_name or ""
if spinner: if spinner:
short = (text[:55] + "...") if len(text) > 55 else text short = (text[:55] + "...") if len(text) > 55 else text
@ -227,11 +400,31 @@ def _build_child_progress_callback(task_index: int, goal: str, parent_agent, tas
_relay("subagent.thinking", preview=text) _relay("subagent.thinking", preview=text)
return return
# tool.completed — no display needed here (spinner shows on started) if event == DelegateEvent.TASK_TOOL_COMPLETED:
if event_type == "tool.completed":
return return
# tool.started — display and batch for parent relay if event == DelegateEvent.TASK_PROGRESS:
# Pre-batched progress summary relayed from a nested
# orchestrator's grandchild (upstream emits as
# parent_cb("subagent_progress", summary_string) where the
# summary lands in the tool_name positional slot). Treat as
# a pass-through: render distinctly (not via the tool-start
# emoji lookup, which would mistake the summary string for a
# tool name) and relay upward without re-batching.
summary_text = tool_name or preview or ""
if spinner and summary_text:
try:
spinner.print_above(f" {prefix}├─ 🔀 {summary_text}")
except Exception as e:
logger.debug("Spinner print_above failed: %s", e)
if parent_cb:
try:
parent_cb("subagent_progress", f"{prefix}{summary_text}")
except Exception as e:
logger.debug("Parent callback relay failed: %s", e)
return
# TASK_TOOL_STARTED — display and batch for parent relay
if spinner: if spinner:
short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "") short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "")
from agent.display import get_tool_emoji from agent.display import get_tool_emoji
@ -280,6 +473,10 @@ def _build_child_agent(
# ACP transport overrides — lets a non-ACP parent spawn ACP child agents # ACP transport overrides — lets a non-ACP parent spawn ACP child agents
override_acp_command: Optional[str] = None, override_acp_command: Optional[str] = None,
override_acp_args: Optional[List[str]] = None, override_acp_args: Optional[List[str]] = None,
# Per-call role controlling whether the child can further delegate.
# 'leaf' (default) cannot; 'orchestrator' retains the delegation
# toolset subject to depth/kill-switch bounds applied below.
role: str = "leaf",
): ):
""" """
Build a child AIAgent on the main thread (thread-safe construction). Build a child AIAgent on the main thread (thread-safe construction).
@ -292,6 +489,17 @@ def _build_child_agent(
""" """
from run_agent import AIAgent from run_agent import AIAgent
# ── Role resolution ─────────────────────────────────────────────────
# Honor the caller's role only when BOTH the kill switch and the
# child's depth allow it. This is the single point where role
# degrades to 'leaf' — keeps the rule predictable. Callers pass
# the normalised role (_normalize_role ran in delegate_task) so
# we only deal with 'leaf' or 'orchestrator' here.
child_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
max_spawn = _get_max_spawn_depth()
orchestrator_ok = _get_orchestrator_enabled() and child_depth < max_spawn
effective_role = role if (role == "orchestrator" and orchestrator_ok) else "leaf"
# When no explicit toolsets given, inherit from parent's enabled toolsets # When no explicit toolsets given, inherit from parent's enabled toolsets
# so disabled tools (e.g. web) don't leak to subagents. # so disabled tools (e.g. web) don't leak to subagents.
# Note: enabled_toolsets=None means "all tools enabled" (the default), # Note: enabled_toolsets=None means "all tools enabled" (the default),
@ -319,8 +527,21 @@ def _build_child_agent(
else: else:
child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS) child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
# Orchestrators retain the 'delegation' toolset that _strip_blocked_tools
# removed. The re-add is unconditional on parent-toolset membership because
# orchestrator capability is granted by role, not inherited — see the
# test_intersection_preserves_delegation_bound test for the design rationale.
if effective_role == "orchestrator" and "delegation" not in child_toolsets:
child_toolsets.append("delegation")
workspace_hint = _resolve_workspace_hint(parent_agent) workspace_hint = _resolve_workspace_hint(parent_agent)
child_prompt = _build_child_system_prompt(goal, context, workspace_path=workspace_hint) child_prompt = _build_child_system_prompt(
goal, context,
workspace_path=workspace_hint,
role=effective_role,
max_spawn_depth=max_spawn,
child_depth=child_depth,
)
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal). # Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
parent_api_key = getattr(parent_agent, "api_key", None) parent_api_key = getattr(parent_agent, "api_key", None)
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"): if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
@ -406,7 +627,10 @@ def _build_child_agent(
) )
child._print_fn = getattr(parent_agent, '_print_fn', None) child._print_fn = getattr(parent_agent, '_print_fn', None)
# Set delegation depth so children can't spawn grandchildren # Set delegation depth so children can't spawn grandchildren
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1 child._delegate_depth = child_depth
# Stash the post-degrade role for introspection (leaf if the
# kill switch or depth bounded the caller's requested role).
child._delegate_role = effective_role
# Share a credential pool with the child when possible so subagents can # Share a credential pool with the child when possible so subagents can
# rotate credentials on rate limits instead of getting pinned to one key. # rotate credentials on rate limits instead of getting pinned to one key.
@ -691,27 +915,40 @@ def delegate_task(
max_iterations: Optional[int] = None, max_iterations: Optional[int] = None,
acp_command: Optional[str] = None, acp_command: Optional[str] = None,
acp_args: Optional[List[str]] = None, acp_args: Optional[List[str]] = None,
role: Optional[str] = None,
parent_agent=None, parent_agent=None,
) -> str: ) -> str:
""" """
Spawn one or more child agents to handle delegated tasks. Spawn one or more child agents to handle delegated tasks.
Supports two modes: Supports two modes:
- Single: provide goal (+ optional context, toolsets) - Single: provide goal (+ optional context, toolsets, role)
- Batch: provide tasks array [{goal, context, toolsets}, ...] - Batch: provide tasks array [{goal, context, toolsets, role}, ...]
The 'role' parameter controls whether a child can further delegate:
'leaf' (default) cannot; 'orchestrator' retains the delegation
toolset and can spawn its own workers, bounded by
delegation.max_spawn_depth. Per-task role beats the top-level one.
Returns JSON with results array, one entry per task. Returns JSON with results array, one entry per task.
""" """
if parent_agent is None: if parent_agent is None:
return tool_error("delegate_task requires a parent agent context.") return tool_error("delegate_task requires a parent agent context.")
# Depth limit # Normalise the top-level role once; per-task overrides re-normalise.
top_role = _normalize_role(role)
# Depth limit — configurable via delegation.max_spawn_depth,
# default 2 for parity with the original MAX_DEPTH constant.
depth = getattr(parent_agent, '_delegate_depth', 0) depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH: max_spawn = _get_max_spawn_depth()
if depth >= max_spawn:
return json.dumps({ return json.dumps({
"error": ( "error": (
f"Delegation depth limit reached ({MAX_DEPTH}). " f"Delegation depth limit reached (depth={depth}, "
"Subagents cannot spawn further subagents." f"max_spawn_depth={max_spawn}). Raise "
f"delegation.max_spawn_depth in config.yaml if deeper "
f"nesting is required (cap: {_MAX_SPAWN_DEPTH_CAP})."
) )
}) })
@ -743,7 +980,8 @@ def delegate_task(
) )
task_list = tasks task_list = tasks
elif goal and isinstance(goal, str) and goal.strip(): elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}] task_list = [{"goal": goal, "context": context,
"toolsets": toolsets, "role": top_role}]
else: else:
return tool_error("Provide either 'goal' (single task) or 'tasks' (batch).") return tool_error("Provide either 'goal' (single task) or 'tasks' (batch).")
@ -775,6 +1013,9 @@ def delegate_task(
try: try:
for i, t in enumerate(task_list): for i, t in enumerate(task_list):
task_acp_args = t.get("acp_args") if "acp_args" in t else None task_acp_args = t.get("acp_args") if "acp_args" in t else None
# Per-task role beats top-level; normalise again so unknown
# per-task values warn and degrade to leaf uniformly.
effective_role = _normalize_role(t.get("role") or top_role)
child = _build_child_agent( child = _build_child_agent(
task_index=i, goal=t["goal"], context=t.get("context"), task_index=i, goal=t["goal"], context=t.get("context"),
toolsets=t.get("toolsets") or toolsets, model=creds["model"], toolsets=t.get("toolsets") or toolsets, model=creds["model"],
@ -786,6 +1027,7 @@ def delegate_task(
override_acp_args=task_acp_args if task_acp_args is not None else ( override_acp_args=task_acp_args if task_acp_args is not None else (
acp_args if acp_args is not None else creds.get("args") acp_args if acp_args is not None else creds.get("args")
), ),
role=effective_role,
) )
# Override with correct parent tool names (before child construction mutated global) # Override with correct parent tool names (before child construction mutated global)
child._delegate_saved_tool_names = _parent_tool_names child._delegate_saved_tool_names = _parent_tool_names
@ -1119,7 +1361,7 @@ DELEGATE_TASK_SCHEMA = {
"never enter your context window.\n\n" "never enter your context window.\n\n"
"TWO MODES (one of 'goal' or 'tasks' is required):\n" "TWO MODES (one of 'goal' or 'tasks' is required):\n"
"1. Single task: provide 'goal' (+ optional context, toolsets)\n" "1. Single task: provide 'goal' (+ optional context, toolsets)\n"
"2. Batch (parallel): provide 'tasks' array with up to 3 items. " "2. Batch (parallel): provide 'tasks' array with up to delegation.max_concurrent_children items (default 3). "
"All run concurrently and results are returned together.\n\n" "All run concurrently and results are returned together.\n\n"
"WHEN TO USE delegate_task:\n" "WHEN TO USE delegate_task:\n"
"- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n" "- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
@ -1132,8 +1374,14 @@ DELEGATE_TASK_SCHEMA = {
"IMPORTANT:\n" "IMPORTANT:\n"
"- Subagents have NO memory of your conversation. Pass all relevant " "- Subagents have NO memory of your conversation. Pass all relevant "
"info (file paths, error messages, constraints) via the 'context' field.\n" "info (file paths, error messages, constraints) via the 'context' field.\n"
"- Subagents CANNOT call: delegate_task, clarify, memory, send_message, " "- Leaf subagents (role='leaf', the default) CANNOT call: "
"execute_code.\n" "delegate_task, clarify, memory, send_message, execute_code.\n"
"- Orchestrator subagents (role='orchestrator') retain "
"delegate_task so they can spawn their own workers, but still "
"cannot use clarify, memory, send_message, or execute_code. "
"Orchestrators are bounded by delegation.max_spawn_depth "
"(default 2) and can be disabled globally via "
"delegation.orchestrator_enabled=false.\n"
"- Each subagent gets its own terminal session (separate working directory and state).\n" "- Each subagent gets its own terminal session (separate working directory and state).\n"
"- Results are always returned as an array, one entry per task." "- Results are always returned as an array, one entry per task."
), ),
@ -1189,6 +1437,11 @@ DELEGATE_TASK_SCHEMA = {
"items": {"type": "string"}, "items": {"type": "string"},
"description": "Per-task ACP args override.", "description": "Per-task ACP args override.",
}, },
"role": {
"type": "string",
"enum": ["leaf", "orchestrator"],
"description": "Per-task role override. See top-level 'role' for semantics.",
},
}, },
"required": ["goal"], "required": ["goal"],
}, },
@ -1208,6 +1461,19 @@ DELEGATE_TASK_SCHEMA = {
"Only set lower for simple tasks." "Only set lower for simple tasks."
), ),
}, },
"role": {
"type": "string",
"enum": ["leaf", "orchestrator"],
"description": (
"Role of the child agent. 'leaf' (default) = focused "
"worker, cannot delegate further. 'orchestrator' = can "
"use delegate_task to spawn its own workers. Requires "
"delegation.max_spawn_depth >= 2 in config; ignored "
"(treated as 'leaf') when the child would exceed "
"max_spawn_depth or when "
"delegation.orchestrator_enabled=false."
),
},
"acp_command": { "acp_command": {
"type": "string", "type": "string",
"description": ( "description": (
@ -1246,6 +1512,7 @@ registry.register(
max_iterations=args.get("max_iterations"), max_iterations=args.get("max_iterations"),
acp_command=args.get("acp_command"), acp_command=args.get("acp_command"),
acp_args=args.get("acp_args"), acp_args=args.get("acp_args"),
role=args.get("role"),
parent_agent=kw.get("parent_agent")), parent_agent=kw.get("parent_agent")),
check_fn=check_delegate_requirements, check_fn=check_delegate_requirements,
emoji="🔀", emoji="🔀",

View file

@ -216,8 +216,8 @@ Restricting toolsets keeps the subagent focused and prevents accidental side eff
## Constraints ## Constraints
- **Default 3 parallel tasks** — batches default to 3 concurrent subagents (configurable via `delegation.max_concurrent_children` in config.yaml) - **Default 3 parallel tasks** — batches default to 3 concurrent subagents (configurable via `delegation.max_concurrent_children` in config.yaml — no hard ceiling, only a floor of 1)
- **No nesting** — subagents cannot call `delegate_task`, `clarify`, `memory`, `send_message`, or `execute_code` - **Nested delegation is opt-in** — leaf subagents (default) cannot call `delegate_task`, `clarify`, `memory`, `send_message`, or `execute_code`. Orchestrator subagents (`role="orchestrator"`) retain `delegate_task` for further delegation, but only when `delegation.max_spawn_depth` is raised above the default of 1 (1-3 supported); the other four remain blocked. Disable globally via `delegation.orchestrator_enabled: false`.
- **Separate terminals** — each subagent gets its own terminal session with separate working directory and state - **Separate terminals** — each subagent gets its own terminal session with separate working directory and state
- **No conversation history** — subagents see only what you put in `goal` and `context` - **No conversation history** — subagents see only what you put in `goal` and `context`
- **Default 50 iterations** — set `max_iterations` lower for simple tasks to save cost - **Default 50 iterations** — set `max_iterations` lower for simple tasks to save cost

View file

@ -20,7 +20,7 @@ delegate_task(
## Parallel Batch ## Parallel Batch
Up to 3 concurrent subagents: Up to 3 concurrent subagents by default (configurable, no hard ceiling):
```python ```python
delegate_task(tasks=[ delegate_task(tasks=[
@ -121,8 +121,8 @@ delegate_task(
When you provide a `tasks` array, subagents run in **parallel** using a thread pool: When you provide a `tasks` array, subagents run in **parallel** using a thread pool:
- **Maximum concurrency:** 3 tasks (the `tasks` array is truncated to 3 if longer) - **Maximum concurrency:** 5 tasks by default (configurable via `delegation.max_concurrent_children`, absolute cap of 8)
- **Thread pool:** Uses `ThreadPoolExecutor` with `MAX_CONCURRENT_CHILDREN = 3` workers - **Thread pool:** Uses `ThreadPoolExecutor` with the configured concurrency limit as max workers
- **Progress display:** In CLI mode, a tree-view shows tool calls from each subagent in real-time with per-task completion lines. In gateway mode, progress is batched and relayed to the parent's progress callback - **Progress display:** In CLI mode, a tree-view shows tool calls from each subagent in real-time with per-task completion lines. In gateway mode, progress is batched and relayed to the parent's progress callback
- **Result ordering:** Results are sorted by task index to match input order regardless of completion order - **Result ordering:** Results are sorted by task index to match input order regardless of completion order
- **Interrupt propagation:** Interrupting the parent (e.g., sending a new message) interrupts all active children - **Interrupt propagation:** Interrupting the parent (e.g., sending a new message) interrupts all active children
@ -154,8 +154,8 @@ The `toolsets` parameter controls what tools the subagent has access to. Choose
| `["file"]` | Read-only analysis, code review without execution | | `["file"]` | Read-only analysis, code review without execution |
| `["terminal"]` | System administration, process management | | `["terminal"]` | System administration, process management |
Certain toolsets are **always blocked** for subagents regardless of what you specify: Certain toolsets are blocked for subagents regardless of what you specify:
- `delegation`no recursive delegation (prevents infinite spawning) - `delegation`blocked for leaf subagents (the default). Retained for `role="orchestrator"` children, bounded by `max_spawn_depth` — see [Depth Limit and Nested Orchestration](#depth-limit-and-nested-orchestration) below.
- `clarify` — subagents cannot interact with the user - `clarify` — subagents cannot interact with the user
- `memory` — no writes to shared persistent memory - `memory` — no writes to shared persistent memory
- `code_execution` — children should reason step-by-step - `code_execution` — children should reason step-by-step
@ -173,16 +173,32 @@ delegate_task(
) )
``` ```
## Depth Limit ## Depth Limit and Nested Orchestration
Delegation has a **depth limit of 2** — a parent (depth 0) can spawn children (depth 1), but children cannot delegate further. This prevents runaway recursive delegation chains. By default, delegation is **flat**: a parent (depth 0) spawns children (depth 1), and those children cannot delegate further. This prevents runaway recursive delegation.
For multi-stage workflows (research → synthesis, or parallel orchestration over sub-problems), a parent can spawn **orchestrator** children that *can* delegate their own workers:
```python
delegate_task(
goal="Survey three code review approaches and recommend one",
role="orchestrator", # Allows this child to spawn its own workers
context="...",
)
```
- `role="leaf"` (default): child cannot delegate further — identical to the flat-delegation behavior.
- `role="orchestrator"`: child retains the `delegation` toolset. Gated by `delegation.max_spawn_depth` (default **1** = flat, so `role="orchestrator"` is a no-op at defaults). Raise `max_spawn_depth` to 2 to allow orchestrator children to spawn leaf grandchildren; 3 for three levels (cap).
- `delegation.orchestrator_enabled: false`: global kill switch that forces every child to `leaf` regardless of the `role` parameter.
**Cost warning:** With `max_spawn_depth: 3` and `max_concurrent_children: 3`, the tree can reach 3×3×3 = 27 concurrent leaf agents. Each extra level multiplies spend — raise `max_spawn_depth` intentionally.
## Key Properties ## Key Properties
- Each subagent gets its **own terminal session** (separate from the parent) - Each subagent gets its **own terminal session** (separate from the parent)
- **No nested delegation** — children cannot delegate further (no grandchildren) - **Nested delegation is opt-in** — only `role="orchestrator"` children can delegate further, and only when `max_spawn_depth` is raised from its default of 1 (flat). Disable globally with `orchestrator_enabled: false`.
- Subagents **cannot** call: `delegate_task`, `clarify`, `memory`, `send_message`, `execute_code` - Leaf subagents **cannot** call: `delegate_task`, `clarify`, `memory`, `send_message`, `execute_code`. Orchestrator subagents retain `delegate_task` but still cannot use the other four.
- **Interrupt propagation** — interrupting the parent interrupts all active children - **Interrupt propagation** — interrupting the parent interrupts all active children (including grandchildren under orchestrators)
- Only the final summary enters the parent's context, keeping token usage efficient - Only the final summary enters the parent's context, keeping token usage efficient
- Subagents inherit the parent's **API key, provider configuration, and credential pool** (enabling key rotation on rate limits) - Subagents inherit the parent's **API key, provider configuration, and credential pool** (enabling key rotation on rate limits)
@ -193,7 +209,7 @@ Delegation has a **depth limit of 2** — a parent (depth 0) can spawn children
| **Reasoning** | Full LLM reasoning loop | Just Python code execution | | **Reasoning** | Full LLM reasoning loop | Just Python code execution |
| **Context** | Fresh isolated conversation | No conversation, just script | | **Context** | Fresh isolated conversation | No conversation, just script |
| **Tool access** | All non-blocked tools with reasoning | 7 tools via RPC, no reasoning | | **Tool access** | All non-blocked tools with reasoning | 7 tools via RPC, no reasoning |
| **Parallelism** | Up to 3 concurrent subagents | Single script | | **Parallelism** | 3 concurrent subagents by default (configurable) | Single script |
| **Best for** | Complex tasks needing judgment | Mechanical multi-step pipelines | | **Best for** | Complex tasks needing judgment | Mechanical multi-step pipelines |
| **Token cost** | Higher (full LLM loop) | Lower (only stdout returned) | | **Token cost** | Higher (full LLM loop) | Lower (only stdout returned) |
| **User interaction** | None (subagents can't clarify) | None | | **User interaction** | None (subagents can't clarify) | None |
@ -206,6 +222,9 @@ Delegation has a **depth limit of 2** — a parent (depth 0) can spawn children
# In ~/.hermes/config.yaml # In ~/.hermes/config.yaml
delegation: delegation:
max_iterations: 50 # Max turns per child (default: 50) max_iterations: 50 # Max turns per child (default: 50)
# max_concurrent_children: 3 # Parallel children per batch (default: 3)
# max_spawn_depth: 1 # Tree depth (1-3, default 1 = flat). Raise to 2 to allow orchestrator children to spawn leaves; 3 for three levels.
# orchestrator_enabled: true # Disable to force all children to leaf role.
model: "google/gemini-3-flash-preview" # Optional provider/model override model: "google/gemini-3-flash-preview" # Optional provider/model override
provider: "openrouter" # Optional built-in provider provider: "openrouter" # Optional built-in provider

View file

@ -20,7 +20,7 @@ Hermes Agent includes a rich set of capabilities that extend far beyond basic ch
## Automation ## Automation
- **[Scheduled Tasks (Cron)](cron.md)** — Schedule tasks to run automatically with natural language or cron expressions. Jobs can attach skills, deliver results to any platform, and support pause/resume/edit operations. - **[Scheduled Tasks (Cron)](cron.md)** — Schedule tasks to run automatically with natural language or cron expressions. Jobs can attach skills, deliver results to any platform, and support pause/resume/edit operations.
- **[Subagent Delegation](delegation.md)** — The `delegate_task` tool spawns child agent instances with isolated context, restricted toolsets, and their own terminal sessions. Run up to 3 concurrent subagents for parallel workstreams. - **[Subagent Delegation](delegation.md)** — The `delegate_task` tool spawns child agent instances with isolated context, restricted toolsets, and their own terminal sessions. Run 3 concurrent subagents by default (configurable) for parallel workstreams.
- **[Code Execution](code-execution.md)** — The `execute_code` tool lets the agent write Python scripts that call Hermes tools programmatically, collapsing multi-step workflows into a single LLM turn via sandboxed RPC execution. - **[Code Execution](code-execution.md)** — The `execute_code` tool lets the agent write Python scripts that call Hermes tools programmatically, collapsing multi-step workflows into a single LLM turn via sandboxed RPC execution.
- **[Event Hooks](hooks.md)** — Run custom code at key lifecycle points. Gateway hooks handle logging, alerts, and webhooks; plugin hooks handle tool interception, metrics, and guardrails. - **[Event Hooks](hooks.md)** — Run custom code at key lifecycle points. Gateway hooks handle logging, alerts, and webhooks; plugin hooks handle tool interception, metrics, and guardrails.
- **[Batch Processing](batch-processing.md)** — Run the Hermes agent across hundreds or thousands of prompts in parallel, generating structured ShareGPT-format trajectory data for training data generation or evaluation. - **[Batch Processing](batch-processing.md)** — Run the Hermes agent across hundreds or thousands of prompts in parallel, generating structured ShareGPT-format trajectory data for training data generation or evaluation.