#!/usr/bin/env python3 """ Tests for the subagent delegation tool. Uses mock AIAgent instances to test the delegation logic without requiring API keys or real LLM calls. Run with: python -m pytest tests/test_delegate.py -v or: python tests/test_delegate.py """ import json import os import sys import threading import time import unittest from unittest.mock import MagicMock, patch from tools.delegate_tool import ( DELEGATE_BLOCKED_TOOLS, DELEGATE_TASK_SCHEMA, DelegateEvent, _get_max_concurrent_children, _LEGACY_EVENT_MAP, MAX_DEPTH, check_delegate_requirements, delegate_task, _build_child_agent, _build_child_progress_callback, _build_child_system_prompt, _strip_blocked_tools, _resolve_child_credential_pool, _resolve_delegation_credentials, ) def _make_mock_parent(depth=0): """Create a mock parent agent with the fields delegate_task expects.""" parent = MagicMock() parent.base_url = "https://openrouter.ai/api/v1" parent.api_key="***" parent.provider = "openrouter" parent.api_mode = "chat_completions" parent.model = "anthropic/claude-sonnet-4" parent.platform = "cli" parent.providers_allowed = None parent.providers_ignored = None parent.providers_order = None parent.provider_sort = None parent._session_db = None parent._delegate_depth = depth parent._active_children = [] parent._active_children_lock = threading.Lock() parent._print_fn = None parent.tool_progress_callback = None parent.thinking_callback = None return parent class TestDelegateRequirements(unittest.TestCase): def test_always_available(self): self.assertTrue(check_delegate_requirements()) def test_schema_valid(self): self.assertEqual(DELEGATE_TASK_SCHEMA["name"], "delegate_task") props = DELEGATE_TASK_SCHEMA["parameters"]["properties"] self.assertIn("goal", props) self.assertIn("tasks", props) self.assertIn("context", props) self.assertIn("toolsets", props) self.assertIn("max_iterations", props) self.assertNotIn("maxItems", props["tasks"]) # removed — limit is now runtime-configurable class TestChildSystemPrompt(unittest.TestCase): def test_goal_only(self): prompt = _build_child_system_prompt("Fix the tests") self.assertIn("Fix the tests", prompt) self.assertIn("YOUR TASK", prompt) self.assertNotIn("CONTEXT", prompt) def test_goal_with_context(self): prompt = _build_child_system_prompt("Fix the tests", "Error: assertion failed in test_foo.py line 42") self.assertIn("Fix the tests", prompt) self.assertIn("CONTEXT", prompt) self.assertIn("assertion failed", prompt) def test_empty_context_ignored(self): prompt = _build_child_system_prompt("Do something", " ") self.assertNotIn("CONTEXT", prompt) class TestStripBlockedTools(unittest.TestCase): def test_removes_blocked_toolsets(self): result = _strip_blocked_tools(["terminal", "file", "delegation", "clarify", "memory", "code_execution"]) self.assertEqual(sorted(result), ["file", "terminal"]) def test_preserves_allowed_toolsets(self): result = _strip_blocked_tools(["terminal", "file", "web", "browser"]) self.assertEqual(sorted(result), ["browser", "file", "terminal", "web"]) def test_empty_input(self): result = _strip_blocked_tools([]) self.assertEqual(result, []) class TestDelegateTask(unittest.TestCase): def test_no_parent_agent(self): result = json.loads(delegate_task(goal="test")) self.assertIn("error", result) self.assertIn("parent agent", result["error"]) def test_depth_limit(self): parent = _make_mock_parent(depth=2) result = json.loads(delegate_task(goal="test", parent_agent=parent)) self.assertIn("error", result) self.assertIn("depth limit", result["error"].lower()) def test_no_goal_or_tasks(self): parent = _make_mock_parent() result = json.loads(delegate_task(parent_agent=parent)) self.assertIn("error", result) def test_empty_goal(self): parent = _make_mock_parent() result = json.loads(delegate_task(goal=" ", parent_agent=parent)) self.assertIn("error", result) def test_task_missing_goal(self): parent = _make_mock_parent() result = json.loads(delegate_task(tasks=[{"context": "no goal here"}], parent_agent=parent)) self.assertIn("error", result) @patch("tools.delegate_tool._run_single_child") def test_single_task_mode(self, mock_run): mock_run.return_value = { "task_index": 0, "status": "completed", "summary": "Done!", "api_calls": 3, "duration_seconds": 5.0 } parent = _make_mock_parent() result = json.loads(delegate_task(goal="Fix tests", context="error log...", parent_agent=parent)) self.assertIn("results", result) self.assertEqual(len(result["results"]), 1) self.assertEqual(result["results"][0]["status"], "completed") self.assertEqual(result["results"][0]["summary"], "Done!") mock_run.assert_called_once() @patch("tools.delegate_tool._run_single_child") def test_batch_mode(self, mock_run): mock_run.side_effect = [ {"task_index": 0, "status": "completed", "summary": "Result A", "api_calls": 2, "duration_seconds": 3.0}, {"task_index": 1, "status": "completed", "summary": "Result B", "api_calls": 4, "duration_seconds": 6.0}, ] parent = _make_mock_parent() tasks = [ {"goal": "Research topic A"}, {"goal": "Research topic B"}, ] result = json.loads(delegate_task(tasks=tasks, parent_agent=parent)) self.assertIn("results", result) self.assertEqual(len(result["results"]), 2) self.assertEqual(result["results"][0]["summary"], "Result A") self.assertEqual(result["results"][1]["summary"], "Result B") self.assertIn("total_duration_seconds", result) @patch("tools.delegate_tool._run_single_child") def test_batch_capped_at_3(self, mock_run): mock_run.return_value = { "task_index": 0, "status": "completed", "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 } parent = _make_mock_parent() limit = _get_max_concurrent_children() tasks = [{"goal": f"Task {i}"} for i in range(limit + 2)] result = json.loads(delegate_task(tasks=tasks, parent_agent=parent)) # Should return an error instead of silently truncating self.assertIn("error", result) self.assertIn("Too many tasks", result["error"]) mock_run.assert_not_called() @patch("tools.delegate_tool._run_single_child") def test_batch_ignores_toplevel_goal(self, mock_run): """When tasks array is provided, top-level goal/context/toolsets are ignored.""" mock_run.return_value = { "task_index": 0, "status": "completed", "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 } parent = _make_mock_parent() result = json.loads(delegate_task( goal="This should be ignored", tasks=[{"goal": "Actual task"}], parent_agent=parent, )) # The mock was called with the tasks array item, not the top-level goal call_args = mock_run.call_args self.assertEqual(call_args.kwargs.get("goal") or call_args[1].get("goal", call_args[0][1] if len(call_args[0]) > 1 else None), "Actual task") @patch("tools.delegate_tool._run_single_child") def test_failed_child_included_in_results(self, mock_run): mock_run.return_value = { "task_index": 0, "status": "error", "summary": None, "error": "Something broke", "api_calls": 0, "duration_seconds": 0.5 } parent = _make_mock_parent() result = json.loads(delegate_task(goal="Break things", parent_agent=parent)) self.assertEqual(result["results"][0]["status"], "error") self.assertIn("Something broke", result["results"][0]["error"]) def test_depth_increments(self): """Verify child gets parent's depth + 1.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Test depth", parent_agent=parent) self.assertEqual(mock_child._delegate_depth, 1) def test_active_children_tracking(self): """Verify children are registered/unregistered for interrupt propagation.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Test tracking", parent_agent=parent) self.assertEqual(len(parent._active_children), 0) def test_child_inherits_runtime_credentials(self): parent = _make_mock_parent(depth=0) parent.base_url = "https://chatgpt.com/backend-api/codex" parent.api_key="***" parent.provider = "openai-codex" parent.api_mode = "codex_responses" with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "ok", "completed": True, "api_calls": 1, } MockAgent.return_value = mock_child delegate_task(goal="Test runtime inheritance", parent_agent=parent) _, kwargs = MockAgent.call_args self.assertEqual(kwargs["base_url"], parent.base_url) self.assertEqual(kwargs["api_key"], parent.api_key) self.assertEqual(kwargs["provider"], parent.provider) self.assertEqual(kwargs["api_mode"], parent.api_mode) def test_child_inherits_parent_print_fn(self): parent = _make_mock_parent(depth=0) sink = MagicMock() parent._print_fn = sink with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() MockAgent.return_value = mock_child _build_child_agent( task_index=0, goal="Keep stdout clean", context=None, toolsets=None, model=None, max_iterations=10, parent_agent=parent, task_count=1, ) self.assertIs(mock_child._print_fn, sink) def test_child_uses_thinking_callback_when_progress_callback_available(self): parent = _make_mock_parent(depth=0) parent.tool_progress_callback = MagicMock() with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() MockAgent.return_value = mock_child _build_child_agent( task_index=0, goal="Avoid raw child spinners", context=None, toolsets=None, model=None, max_iterations=10, parent_agent=parent, task_count=1, ) self.assertTrue(callable(mock_child.thinking_callback)) mock_child.thinking_callback("deliberating...") parent.tool_progress_callback.assert_not_called() class TestToolNamePreservation(unittest.TestCase): """Verify _last_resolved_tool_names is restored after subagent runs.""" def test_global_tool_names_restored_after_delegation(self): """The process-global _last_resolved_tool_names must be restored after a subagent completes so the parent's execute_code sandbox generates correct imports.""" import model_tools parent = _make_mock_parent(depth=0) original_tools = ["terminal", "read_file", "web_search", "execute_code", "delegate_task"] model_tools._last_resolved_tool_names = list(original_tools) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1, } MockAgent.return_value = mock_child delegate_task(goal="Test tool preservation", parent_agent=parent) self.assertEqual(model_tools._last_resolved_tool_names, original_tools) def test_global_tool_names_restored_after_child_failure(self): """Even when the child agent raises, the global must be restored.""" import model_tools parent = _make_mock_parent(depth=0) original_tools = ["terminal", "read_file", "web_search"] model_tools._last_resolved_tool_names = list(original_tools) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.side_effect = RuntimeError("boom") MockAgent.return_value = mock_child result = json.loads(delegate_task(goal="Crash test", parent_agent=parent)) self.assertEqual(result["results"][0]["status"], "error") self.assertEqual(model_tools._last_resolved_tool_names, original_tools) def test_build_child_agent_does_not_raise_name_error(self): """Regression: _build_child_agent must not reference _saved_tool_names. The bug introduced by the e7844e9c merge conflict: line 235 inside _build_child_agent read `list(_saved_tool_names)` where that variable is only defined later in _run_single_child. Calling _build_child_agent standalone (without _run_single_child's scope) must never raise NameError. """ parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent"): try: _build_child_agent( task_index=0, goal="regression check", context=None, toolsets=None, model=None, max_iterations=10, parent_agent=parent, task_count=1, ) except NameError as exc: self.fail( f"_build_child_agent raised NameError — " f"_saved_tool_names leaked back into wrong scope: {exc}" ) def test_saved_tool_names_set_on_child_before_run(self): """_run_single_child must set _delegate_saved_tool_names on the child from model_tools._last_resolved_tool_names before run_conversation.""" import model_tools parent = _make_mock_parent(depth=0) expected_tools = ["read_file", "web_search", "execute_code"] model_tools._last_resolved_tool_names = list(expected_tools) captured = {} with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() def capture_and_return(user_message, task_id=None): captured["saved"] = list(mock_child._delegate_saved_tool_names) return {"final_response": "ok", "completed": True, "api_calls": 1} mock_child.run_conversation.side_effect = capture_and_return MockAgent.return_value = mock_child delegate_task(goal="capture test", parent_agent=parent) self.assertEqual(captured["saved"], expected_tools) class TestDelegateObservability(unittest.TestCase): """Tests for enriched metadata returned by _run_single_child.""" def test_observability_fields_present(self): """Completed child should return tool_trace, tokens, model, exit_reason.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.model = "claude-sonnet-4-6" mock_child.session_prompt_tokens = 5000 mock_child.session_completion_tokens = 1200 mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "interrupted": False, "api_calls": 3, "messages": [ {"role": "user", "content": "do something"}, {"role": "assistant", "tool_calls": [ {"id": "tc_1", "function": {"name": "web_search", "arguments": '{"query": "test"}'}} ]}, {"role": "tool", "tool_call_id": "tc_1", "content": '{"results": [1,2,3]}'}, {"role": "assistant", "content": "done"}, ], } MockAgent.return_value = mock_child result = json.loads(delegate_task(goal="Test observability", parent_agent=parent)) entry = result["results"][0] # Core observability fields self.assertEqual(entry["model"], "claude-sonnet-4-6") self.assertEqual(entry["exit_reason"], "completed") self.assertEqual(entry["tokens"]["input"], 5000) self.assertEqual(entry["tokens"]["output"], 1200) # Tool trace self.assertEqual(len(entry["tool_trace"]), 1) self.assertEqual(entry["tool_trace"][0]["tool"], "web_search") self.assertIn("args_bytes", entry["tool_trace"][0]) self.assertIn("result_bytes", entry["tool_trace"][0]) self.assertEqual(entry["tool_trace"][0]["status"], "ok") def test_tool_trace_detects_error(self): """Tool results containing 'error' should be marked as error status.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.model = "claude-sonnet-4-6" mock_child.session_prompt_tokens = 0 mock_child.session_completion_tokens = 0 mock_child.run_conversation.return_value = { "final_response": "failed", "completed": True, "interrupted": False, "api_calls": 1, "messages": [ {"role": "assistant", "tool_calls": [ {"id": "tc_1", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}} ]}, {"role": "tool", "tool_call_id": "tc_1", "content": "Error: command not found"}, ], } MockAgent.return_value = mock_child result = json.loads(delegate_task(goal="Test error trace", parent_agent=parent)) trace = result["results"][0]["tool_trace"] self.assertEqual(trace[0]["status"], "error") def test_parallel_tool_calls_paired_correctly(self): """Parallel tool calls should each get their own result via tool_call_id matching.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.model = "claude-sonnet-4-6" mock_child.session_prompt_tokens = 3000 mock_child.session_completion_tokens = 800 mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "interrupted": False, "api_calls": 1, "messages": [ {"role": "assistant", "tool_calls": [ {"id": "tc_a", "function": {"name": "web_search", "arguments": '{"q": "a"}'}}, {"id": "tc_b", "function": {"name": "web_search", "arguments": '{"q": "b"}'}}, {"id": "tc_c", "function": {"name": "terminal", "arguments": '{"cmd": "ls"}'}}, ]}, {"role": "tool", "tool_call_id": "tc_a", "content": '{"ok": true}'}, {"role": "tool", "tool_call_id": "tc_b", "content": "Error: rate limited"}, {"role": "tool", "tool_call_id": "tc_c", "content": "file1.txt\nfile2.txt"}, {"role": "assistant", "content": "done"}, ], } MockAgent.return_value = mock_child result = json.loads(delegate_task(goal="Test parallel", parent_agent=parent)) trace = result["results"][0]["tool_trace"] # All three tool calls should have results self.assertEqual(len(trace), 3) # First: web_search → ok self.assertEqual(trace[0]["tool"], "web_search") self.assertEqual(trace[0]["status"], "ok") self.assertIn("result_bytes", trace[0]) # Second: web_search → error self.assertEqual(trace[1]["tool"], "web_search") self.assertEqual(trace[1]["status"], "error") self.assertIn("result_bytes", trace[1]) # Third: terminal → ok self.assertEqual(trace[2]["tool"], "terminal") self.assertEqual(trace[2]["status"], "ok") self.assertIn("result_bytes", trace[2]) def test_exit_reason_interrupted(self): """Interrupted child should report exit_reason='interrupted'.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.model = "claude-sonnet-4-6" mock_child.session_prompt_tokens = 0 mock_child.session_completion_tokens = 0 mock_child.run_conversation.return_value = { "final_response": "", "completed": False, "interrupted": True, "api_calls": 2, "messages": [], } MockAgent.return_value = mock_child result = json.loads(delegate_task(goal="Test interrupt", parent_agent=parent)) self.assertEqual(result["results"][0]["exit_reason"], "interrupted") def test_exit_reason_max_iterations(self): """Child that didn't complete and wasn't interrupted hit max_iterations.""" parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.model = "claude-sonnet-4-6" mock_child.session_prompt_tokens = 0 mock_child.session_completion_tokens = 0 mock_child.run_conversation.return_value = { "final_response": "", "completed": False, "interrupted": False, "api_calls": 50, "messages": [], } MockAgent.return_value = mock_child result = json.loads(delegate_task(goal="Test max iter", parent_agent=parent)) self.assertEqual(result["results"][0]["exit_reason"], "max_iterations") class TestBlockedTools(unittest.TestCase): def test_blocked_tools_constant(self): for tool in ["delegate_task", "clarify", "memory", "send_message", "execute_code"]: self.assertIn(tool, DELEGATE_BLOCKED_TOOLS) def test_constants(self): from tools.delegate_tool import ( _get_max_spawn_depth, _get_orchestrator_enabled, _MIN_SPAWN_DEPTH, _MAX_SPAWN_DEPTH_CAP, ) self.assertEqual(_get_max_concurrent_children(), 3) self.assertEqual(MAX_DEPTH, 1) self.assertEqual(_get_max_spawn_depth(), 1) # default: flat self.assertTrue(_get_orchestrator_enabled()) # default self.assertEqual(_MIN_SPAWN_DEPTH, 1) self.assertEqual(_MAX_SPAWN_DEPTH_CAP, 3) class TestDelegationCredentialResolution(unittest.TestCase): """Tests for provider:model credential resolution in delegation config.""" def test_no_provider_returns_none_credentials(self): """When delegation.provider is empty, all credentials are None (inherit parent).""" parent = _make_mock_parent(depth=0) cfg = {"model": "", "provider": ""} creds = _resolve_delegation_credentials(cfg, parent) self.assertIsNone(creds["provider"]) self.assertIsNone(creds["base_url"]) self.assertIsNone(creds["api_key"]) self.assertIsNone(creds["api_mode"]) self.assertIsNone(creds["model"]) def test_model_only_no_provider(self): """When only model is set (no provider), model is returned but credentials are None.""" parent = _make_mock_parent(depth=0) cfg = {"model": "google/gemini-3-flash-preview", "provider": ""} creds = _resolve_delegation_credentials(cfg, parent) self.assertEqual(creds["model"], "google/gemini-3-flash-preview") self.assertIsNone(creds["provider"]) self.assertIsNone(creds["base_url"]) self.assertIsNone(creds["api_key"]) @patch("hermes_cli.runtime_provider.resolve_runtime_provider") def test_provider_resolves_full_credentials(self, mock_resolve): """When delegation.provider is set, full credentials are resolved.""" mock_resolve.return_value = { "provider": "openrouter", "base_url": "https://openrouter.ai/api/v1", "api_key": "sk-or-test-key", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) cfg = {"model": "google/gemini-3-flash-preview", "provider": "openrouter"} creds = _resolve_delegation_credentials(cfg, parent) self.assertEqual(creds["model"], "google/gemini-3-flash-preview") self.assertEqual(creds["provider"], "openrouter") self.assertEqual(creds["base_url"], "https://openrouter.ai/api/v1") self.assertEqual(creds["api_key"], "sk-or-test-key") self.assertEqual(creds["api_mode"], "chat_completions") mock_resolve.assert_called_once_with(requested="openrouter") def test_direct_endpoint_uses_configured_base_url_and_api_key(self): parent = _make_mock_parent(depth=0) cfg = { "model": "qwen2.5-coder", "provider": "openrouter", "base_url": "http://localhost:1234/v1", "api_key": "local-key", } creds = _resolve_delegation_credentials(cfg, parent) self.assertEqual(creds["model"], "qwen2.5-coder") self.assertEqual(creds["provider"], "custom") self.assertEqual(creds["base_url"], "http://localhost:1234/v1") self.assertEqual(creds["api_key"], "local-key") self.assertEqual(creds["api_mode"], "chat_completions") def test_direct_endpoint_falls_back_to_openai_api_key_env(self): parent = _make_mock_parent(depth=0) cfg = { "model": "qwen2.5-coder", "base_url": "http://localhost:1234/v1", } with patch.dict(os.environ, {"OPENAI_API_KEY": "env-openai-key"}, clear=False): creds = _resolve_delegation_credentials(cfg, parent) self.assertEqual(creds["api_key"], "env-openai-key") self.assertEqual(creds["provider"], "custom") def test_direct_endpoint_does_not_fall_back_to_openrouter_api_key_env(self): parent = _make_mock_parent(depth=0) cfg = { "model": "qwen2.5-coder", "base_url": "http://localhost:1234/v1", } with patch.dict( os.environ, { "OPENROUTER_API_KEY": "env-openrouter-key", "OPENAI_API_KEY": "", }, clear=False, ): with self.assertRaises(ValueError) as ctx: _resolve_delegation_credentials(cfg, parent) self.assertIn("OPENAI_API_KEY", str(ctx.exception)) @patch("hermes_cli.runtime_provider.resolve_runtime_provider") def test_nous_provider_resolves_nous_credentials(self, mock_resolve): """Nous provider resolves Nous Portal base_url and api_key.""" mock_resolve.return_value = { "provider": "nous", "base_url": "https://inference-api.nousresearch.com/v1", "api_key": "nous-agent-key-xyz", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) cfg = {"model": "hermes-3-llama-3.1-8b", "provider": "nous"} creds = _resolve_delegation_credentials(cfg, parent) self.assertEqual(creds["provider"], "nous") self.assertEqual(creds["base_url"], "https://inference-api.nousresearch.com/v1") self.assertEqual(creds["api_key"], "nous-agent-key-xyz") mock_resolve.assert_called_once_with(requested="nous") @patch("hermes_cli.runtime_provider.resolve_runtime_provider") def test_provider_resolution_failure_raises_valueerror(self, mock_resolve): """When provider resolution fails, ValueError is raised with helpful message.""" mock_resolve.side_effect = RuntimeError("OPENROUTER_API_KEY not set") parent = _make_mock_parent(depth=0) cfg = {"model": "some-model", "provider": "openrouter"} with self.assertRaises(ValueError) as ctx: _resolve_delegation_credentials(cfg, parent) self.assertIn("openrouter", str(ctx.exception).lower()) self.assertIn("Cannot resolve", str(ctx.exception)) @patch("hermes_cli.runtime_provider.resolve_runtime_provider") def test_provider_resolves_but_no_api_key_raises(self, mock_resolve): """When provider resolves but has no API key, ValueError is raised.""" mock_resolve.return_value = { "provider": "openrouter", "base_url": "https://openrouter.ai/api/v1", "api_key": "", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) cfg = {"model": "some-model", "provider": "openrouter"} with self.assertRaises(ValueError) as ctx: _resolve_delegation_credentials(cfg, parent) self.assertIn("no API key", str(ctx.exception)) def test_missing_config_keys_inherit_parent(self): """When config dict has no model/provider keys at all, inherits parent.""" parent = _make_mock_parent(depth=0) cfg = {"max_iterations": 45} creds = _resolve_delegation_credentials(cfg, parent) self.assertIsNone(creds["model"]) self.assertIsNone(creds["provider"]) class TestDelegationProviderIntegration(unittest.TestCase): """Integration tests: delegation config → _run_single_child → AIAgent construction.""" @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_config_provider_credentials_reach_child_agent(self, mock_creds, mock_cfg): """When delegation.provider is configured, child agent gets resolved credentials.""" mock_cfg.return_value = { "max_iterations": 45, "model": "google/gemini-3-flash-preview", "provider": "openrouter", } mock_creds.return_value = { "model": "google/gemini-3-flash-preview", "provider": "openrouter", "base_url": "https://openrouter.ai/api/v1", "api_key": "sk-or-delegation-key", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Test provider routing", parent_agent=parent) _, kwargs = MockAgent.call_args self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview") self.assertEqual(kwargs["provider"], "openrouter") self.assertEqual(kwargs["base_url"], "https://openrouter.ai/api/v1") self.assertEqual(kwargs["api_key"], "sk-or-delegation-key") self.assertEqual(kwargs["api_mode"], "chat_completions") @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_cross_provider_delegation(self, mock_creds, mock_cfg): """Parent on Nous, subagent on OpenRouter — full credential switch.""" mock_cfg.return_value = { "max_iterations": 45, "model": "google/gemini-3-flash-preview", "provider": "openrouter", } mock_creds.return_value = { "model": "google/gemini-3-flash-preview", "provider": "openrouter", "base_url": "https://openrouter.ai/api/v1", "api_key": "sk-or-key", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) parent.provider = "nous" parent.base_url = "https://inference-api.nousresearch.com/v1" parent.api_key = "nous-key-abc" with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Cross-provider test", parent_agent=parent) _, kwargs = MockAgent.call_args # Child should use OpenRouter, NOT Nous self.assertEqual(kwargs["provider"], "openrouter") self.assertEqual(kwargs["base_url"], "https://openrouter.ai/api/v1") self.assertEqual(kwargs["api_key"], "sk-or-key") self.assertNotEqual(kwargs["base_url"], parent.base_url) self.assertNotEqual(kwargs["api_key"], parent.api_key) @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_direct_endpoint_credentials_reach_child_agent(self, mock_creds, mock_cfg): mock_cfg.return_value = { "max_iterations": 45, "model": "qwen2.5-coder", "base_url": "http://localhost:1234/v1", "api_key": "local-key", } mock_creds.return_value = { "model": "qwen2.5-coder", "provider": "custom", "base_url": "http://localhost:1234/v1", "api_key": "local-key", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Direct endpoint test", parent_agent=parent) _, kwargs = MockAgent.call_args self.assertEqual(kwargs["model"], "qwen2.5-coder") self.assertEqual(kwargs["provider"], "custom") self.assertEqual(kwargs["base_url"], "http://localhost:1234/v1") self.assertEqual(kwargs["api_key"], "local-key") self.assertEqual(kwargs["api_mode"], "chat_completions") @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_empty_config_inherits_parent(self, mock_creds, mock_cfg): """When delegation config is empty, child inherits parent credentials.""" mock_cfg.return_value = {"max_iterations": 45, "model": "", "provider": ""} mock_creds.return_value = { "model": None, "provider": None, "base_url": None, "api_key": None, "api_mode": None, } parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Test inherit", parent_agent=parent) _, kwargs = MockAgent.call_args self.assertEqual(kwargs["model"], parent.model) self.assertEqual(kwargs["provider"], parent.provider) self.assertEqual(kwargs["base_url"], parent.base_url) @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_credential_error_returns_json_error(self, mock_creds, mock_cfg): """When credential resolution fails, delegate_task returns a JSON error.""" mock_cfg.return_value = {"model": "bad-model", "provider": "nonexistent"} mock_creds.side_effect = ValueError( "Cannot resolve delegation provider 'nonexistent': Unknown provider" ) parent = _make_mock_parent(depth=0) result = json.loads(delegate_task(goal="Should fail", parent_agent=parent)) self.assertIn("error", result) self.assertIn("Cannot resolve", result["error"]) self.assertIn("nonexistent", result["error"]) @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_batch_mode_all_children_get_credentials(self, mock_creds, mock_cfg): """In batch mode, all children receive the resolved credentials.""" mock_cfg.return_value = { "max_iterations": 45, "model": "meta-llama/llama-4-scout", "provider": "openrouter", } mock_creds.return_value = { "model": "meta-llama/llama-4-scout", "provider": "openrouter", "base_url": "https://openrouter.ai/api/v1", "api_key": "sk-or-batch", "api_mode": "chat_completions", } parent = _make_mock_parent(depth=0) # Patch _build_child_agent since credentials are now passed there # (agents are built in the main thread before being handed to workers) with patch("tools.delegate_tool._build_child_agent") as mock_build, \ patch("tools.delegate_tool._run_single_child") as mock_run: mock_child = MagicMock() mock_build.return_value = mock_child mock_run.return_value = { "task_index": 0, "status": "completed", "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 } tasks = [{"goal": "Task A"}, {"goal": "Task B"}] delegate_task(tasks=tasks, parent_agent=parent) self.assertEqual(mock_build.call_count, 2) for call in mock_build.call_args_list: self.assertEqual(call.kwargs.get("model"), "meta-llama/llama-4-scout") self.assertEqual(call.kwargs.get("override_provider"), "openrouter") self.assertEqual(call.kwargs.get("override_base_url"), "https://openrouter.ai/api/v1") self.assertEqual(call.kwargs.get("override_api_key"), "sk-or-batch") self.assertEqual(call.kwargs.get("override_api_mode"), "chat_completions") @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_delegation_acp_runtime_reaches_child_agent(self, mock_creds, mock_cfg): """Resolved ACP runtime command/args must be forwarded to child agents.""" mock_cfg.return_value = { "max_iterations": 45, "model": "copilot-model", "provider": "copilot-acp", } mock_creds.return_value = { "model": "copilot-model", "provider": "copilot-acp", "base_url": "acp://copilot", "api_key": "copilot-acp", "api_mode": "chat_completions", "command": "custom-copilot", "args": ["--stdio-custom"], } parent = _make_mock_parent(depth=0) with patch("tools.delegate_tool._build_child_agent") as mock_build, \ patch("tools.delegate_tool._run_single_child") as mock_run: mock_child = MagicMock() mock_build.return_value = mock_child mock_run.return_value = { "task_index": 0, "status": "completed", "summary": "Done", "api_calls": 1, "duration_seconds": 1.0 } delegate_task(goal="ACP delegation test", parent_agent=parent) _, kwargs = mock_build.call_args self.assertEqual(kwargs.get("override_provider"), "copilot-acp") self.assertEqual(kwargs.get("override_base_url"), "acp://copilot") self.assertEqual(kwargs.get("override_api_key"), "copilot-acp") self.assertEqual(kwargs.get("override_api_mode"), "chat_completions") self.assertEqual(kwargs.get("override_acp_command"), "custom-copilot") self.assertEqual(kwargs.get("override_acp_args"), ["--stdio-custom"]) @patch("tools.delegate_tool._load_config") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_model_only_no_provider_inherits_parent_credentials(self, mock_creds, mock_cfg): """Setting only model (no provider) changes model but keeps parent credentials.""" mock_cfg.return_value = { "max_iterations": 45, "model": "google/gemini-3-flash-preview", "provider": "", } mock_creds.return_value = { "model": "google/gemini-3-flash-preview", "provider": None, "base_url": None, "api_key": None, "api_mode": None, } parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1 } MockAgent.return_value = mock_child delegate_task(goal="Model only test", parent_agent=parent) _, kwargs = MockAgent.call_args # Model should be overridden self.assertEqual(kwargs["model"], "google/gemini-3-flash-preview") # But provider/base_url/api_key should inherit from parent self.assertEqual(kwargs["provider"], parent.provider) self.assertEqual(kwargs["base_url"], parent.base_url) class TestChildCredentialPoolResolution(unittest.TestCase): def test_same_provider_shares_parent_pool(self): parent = _make_mock_parent() mock_pool = MagicMock() parent._credential_pool = mock_pool result = _resolve_child_credential_pool("openrouter", parent) self.assertIs(result, mock_pool) def test_no_provider_inherits_parent_pool(self): parent = _make_mock_parent() mock_pool = MagicMock() parent._credential_pool = mock_pool result = _resolve_child_credential_pool(None, parent) self.assertIs(result, mock_pool) def test_different_provider_loads_own_pool(self): parent = _make_mock_parent() parent._credential_pool = MagicMock() mock_pool = MagicMock() mock_pool.has_credentials.return_value = True with patch("agent.credential_pool.load_pool", return_value=mock_pool): result = _resolve_child_credential_pool("anthropic", parent) self.assertIs(result, mock_pool) def test_different_provider_empty_pool_returns_none(self): parent = _make_mock_parent() parent._credential_pool = MagicMock() mock_pool = MagicMock() mock_pool.has_credentials.return_value = False with patch("agent.credential_pool.load_pool", return_value=mock_pool): result = _resolve_child_credential_pool("anthropic", parent) self.assertIsNone(result) def test_different_provider_load_failure_returns_none(self): parent = _make_mock_parent() parent._credential_pool = MagicMock() with patch("agent.credential_pool.load_pool", side_effect=Exception("disk error")): result = _resolve_child_credential_pool("anthropic", parent) self.assertIsNone(result) def test_build_child_agent_assigns_parent_pool_when_shared(self): parent = _make_mock_parent() mock_pool = MagicMock() parent._credential_pool = mock_pool with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() MockAgent.return_value = mock_child _build_child_agent( task_index=0, goal="Test pool assignment", context=None, toolsets=["terminal"], model=None, max_iterations=10, parent_agent=parent, task_count=1, ) self.assertEqual(mock_child._credential_pool, mock_pool) @patch("tools.delegate_tool._load_config", return_value={}) def test_build_child_agent_does_not_preserve_mcp_toolsets_by_default(self, mock_cfg): parent = _make_mock_parent() parent.enabled_toolsets = ["web", "browser", "mcp-MiniMax"] with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() MockAgent.return_value = mock_child _build_child_agent( task_index=0, goal="Test narrowed toolsets", context=None, toolsets=["web", "browser"], model=None, max_iterations=10, parent_agent=parent, task_count=1, ) self.assertEqual( MockAgent.call_args[1]["enabled_toolsets"], ["web", "browser"], ) @patch( "tools.delegate_tool._load_config", return_value={"inherit_mcp_toolsets": True}, ) def test_build_child_agent_can_preserve_parent_mcp_toolsets(self, mock_cfg): parent = _make_mock_parent() parent.enabled_toolsets = ["web", "browser", "mcp-MiniMax"] with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() MockAgent.return_value = mock_child _build_child_agent( task_index=0, goal="Test narrowed toolsets", context=None, toolsets=["web", "browser"], model=None, max_iterations=10, parent_agent=parent, task_count=1, ) self.assertEqual( MockAgent.call_args[1]["enabled_toolsets"], ["web", "browser", "mcp-MiniMax"], ) class TestChildCredentialLeasing(unittest.TestCase): def test_run_single_child_acquires_and_releases_lease(self): from tools.delegate_tool import _run_single_child leased_entry = MagicMock() leased_entry.id = "cred-b" child = MagicMock() child._credential_pool = MagicMock() child._credential_pool.acquire_lease.return_value = "cred-b" child._credential_pool.current.return_value = leased_entry child.run_conversation.return_value = { "final_response": "done", "completed": True, "interrupted": False, "api_calls": 1, "messages": [], } result = _run_single_child( task_index=0, goal="Investigate rate limits", child=child, parent_agent=_make_mock_parent(), ) self.assertEqual(result["status"], "completed") child._credential_pool.acquire_lease.assert_called_once_with() child._swap_credential.assert_called_once_with(leased_entry) child._credential_pool.release_lease.assert_called_once_with("cred-b") def test_run_single_child_releases_lease_after_failure(self): from tools.delegate_tool import _run_single_child child = MagicMock() child._credential_pool = MagicMock() child._credential_pool.acquire_lease.return_value = "cred-a" child._credential_pool.current.return_value = MagicMock(id="cred-a") child.run_conversation.side_effect = RuntimeError("boom") result = _run_single_child( task_index=1, goal="Trigger failure", child=child, parent_agent=_make_mock_parent(), ) self.assertEqual(result["status"], "error") child._credential_pool.release_lease.assert_called_once_with("cred-a") class TestDelegateHeartbeat(unittest.TestCase): """Heartbeat propagates child activity to parent during delegation. Without the heartbeat, the gateway inactivity timeout fires because the parent's _last_activity_ts freezes when delegate_task starts. """ def test_heartbeat_touches_parent_activity_during_child_run(self): """Parent's _touch_activity is called while child.run_conversation blocks.""" from tools.delegate_tool import _run_single_child parent = _make_mock_parent() touch_calls = [] parent._touch_activity = lambda desc: touch_calls.append(desc) child = MagicMock() child.get_activity_summary.return_value = { "current_tool": "terminal", "api_call_count": 3, "max_iterations": 50, "last_activity_desc": "executing tool: terminal", } # Make run_conversation block long enough for heartbeats to fire def slow_run(**kwargs): time.sleep(0.25) return {"final_response": "done", "completed": True, "api_calls": 3} child.run_conversation.side_effect = slow_run # Patch the heartbeat interval to fire quickly with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): _run_single_child( task_index=0, goal="Test heartbeat", child=child, parent_agent=parent, ) # Heartbeat should have fired at least once during the 0.25s sleep self.assertGreater(len(touch_calls), 0, "Heartbeat did not propagate activity to parent") # Verify the description includes child's current tool detail self.assertTrue( any("terminal" in desc for desc in touch_calls), f"Heartbeat descriptions should include child tool info: {touch_calls}") def test_heartbeat_stops_after_child_completes(self): """Heartbeat thread is cleaned up when the child finishes.""" from tools.delegate_tool import _run_single_child parent = _make_mock_parent() touch_calls = [] parent._touch_activity = lambda desc: touch_calls.append(desc) child = MagicMock() child.get_activity_summary.return_value = { "current_tool": None, "api_call_count": 1, "max_iterations": 50, "last_activity_desc": "done", } child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1, } with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): _run_single_child( task_index=0, goal="Test cleanup", child=child, parent_agent=parent, ) # Record count after completion, wait, and verify no more calls count_after = len(touch_calls) time.sleep(0.15) self.assertEqual(len(touch_calls), count_after, "Heartbeat continued firing after child completed") def test_heartbeat_stops_after_child_error(self): """Heartbeat thread is cleaned up even when the child raises.""" from tools.delegate_tool import _run_single_child parent = _make_mock_parent() touch_calls = [] parent._touch_activity = lambda desc: touch_calls.append(desc) child = MagicMock() child.get_activity_summary.return_value = { "current_tool": "web_search", "api_call_count": 2, "max_iterations": 50, "last_activity_desc": "executing tool: web_search", } def slow_fail(**kwargs): time.sleep(0.15) raise RuntimeError("network timeout") child.run_conversation.side_effect = slow_fail with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): result = _run_single_child( task_index=0, goal="Test error cleanup", child=child, parent_agent=parent, ) self.assertEqual(result["status"], "error") # Verify heartbeat stopped count_after = len(touch_calls) time.sleep(0.15) self.assertEqual(len(touch_calls), count_after, "Heartbeat continued firing after child error") def test_heartbeat_includes_child_activity_desc_when_no_tool(self): """When child has no current_tool, heartbeat uses last_activity_desc.""" from tools.delegate_tool import _run_single_child parent = _make_mock_parent() touch_calls = [] parent._touch_activity = lambda desc: touch_calls.append(desc) child = MagicMock() child.get_activity_summary.return_value = { "current_tool": None, "api_call_count": 5, "max_iterations": 90, "last_activity_desc": "API call #5 completed", } def slow_run(**kwargs): time.sleep(0.15) return {"final_response": "done", "completed": True, "api_calls": 5} child.run_conversation.side_effect = slow_run with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): _run_single_child( task_index=0, goal="Test desc fallback", child=child, parent_agent=parent, ) self.assertGreater(len(touch_calls), 0) self.assertTrue( any("API call #5 completed" in desc for desc in touch_calls), f"Heartbeat should include last_activity_desc: {touch_calls}") class TestDelegationReasoningEffort(unittest.TestCase): """Tests for delegation.reasoning_effort config override.""" @patch("tools.delegate_tool._load_config") @patch("run_agent.AIAgent") def test_inherits_parent_reasoning_when_no_override(self, MockAgent, mock_cfg): """With no delegation.reasoning_effort, child inherits parent's config.""" mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": ""} MockAgent.return_value = MagicMock() parent = _make_mock_parent() parent.reasoning_config = {"enabled": True, "effort": "xhigh"} _build_child_agent( task_index=0, goal="test", context=None, toolsets=None, model=None, max_iterations=50, parent_agent=parent, task_count=1, ) call_kwargs = MockAgent.call_args[1] self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "xhigh"}) @patch("tools.delegate_tool._load_config") @patch("run_agent.AIAgent") def test_override_reasoning_effort_from_config(self, MockAgent, mock_cfg): """delegation.reasoning_effort overrides the parent's level.""" mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "low"} MockAgent.return_value = MagicMock() parent = _make_mock_parent() parent.reasoning_config = {"enabled": True, "effort": "xhigh"} _build_child_agent( task_index=0, goal="test", context=None, toolsets=None, model=None, max_iterations=50, parent_agent=parent, task_count=1, ) call_kwargs = MockAgent.call_args[1] self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "low"}) @patch("tools.delegate_tool._load_config") @patch("run_agent.AIAgent") def test_override_reasoning_effort_none_disables(self, MockAgent, mock_cfg): """delegation.reasoning_effort: 'none' disables thinking for subagents.""" mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "none"} MockAgent.return_value = MagicMock() parent = _make_mock_parent() parent.reasoning_config = {"enabled": True, "effort": "high"} _build_child_agent( task_index=0, goal="test", context=None, toolsets=None, model=None, max_iterations=50, parent_agent=parent, task_count=1, ) call_kwargs = MockAgent.call_args[1] self.assertEqual(call_kwargs["reasoning_config"], {"enabled": False}) @patch("tools.delegate_tool._load_config") @patch("run_agent.AIAgent") def test_invalid_reasoning_effort_falls_back_to_parent(self, MockAgent, mock_cfg): """Invalid delegation.reasoning_effort falls back to parent's config.""" mock_cfg.return_value = {"max_iterations": 50, "reasoning_effort": "banana"} MockAgent.return_value = MagicMock() parent = _make_mock_parent() parent.reasoning_config = {"enabled": True, "effort": "medium"} _build_child_agent( task_index=0, goal="test", context=None, toolsets=None, model=None, max_iterations=50, parent_agent=parent, task_count=1, ) call_kwargs = MockAgent.call_args[1] self.assertEqual(call_kwargs["reasoning_config"], {"enabled": True, "effort": "medium"}) # ========================================================================= # Dispatch helper, progress events, concurrency # ========================================================================= class TestDispatchDelegateTask(unittest.TestCase): """Tests for the _dispatch_delegate_task helper and full param forwarding.""" @patch("tools.delegate_tool._load_config", return_value={}) @patch("tools.delegate_tool._resolve_delegation_credentials") def test_acp_args_forwarded(self, mock_creds, mock_cfg): """Both acp_command and acp_args reach delegate_task via the helper.""" mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) with patch("tools.delegate_tool._build_child_agent") as mock_build: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1, "messages": [], } mock_child._delegate_saved_tool_names = [] mock_child._credential_pool = None mock_child.session_prompt_tokens = 0 mock_child.session_completion_tokens = 0 mock_child.model = "test" mock_build.return_value = mock_child delegate_task( goal="test", acp_command="claude", acp_args=["--acp", "--stdio"], parent_agent=parent, ) _, kwargs = mock_build.call_args self.assertEqual(kwargs["override_acp_command"], "claude") self.assertEqual(kwargs["override_acp_args"], ["--acp", "--stdio"]) class TestDelegateEventEnum(unittest.TestCase): """Tests for DelegateEvent enum and back-compat aliases.""" def test_enum_values_are_strings(self): for event in DelegateEvent: self.assertIsInstance(event.value, str) self.assertTrue(event.value.startswith("delegate.")) def test_legacy_map_covers_all_old_names(self): expected_legacy = {"_thinking", "reasoning.available", "tool.started", "tool.completed", "subagent_progress"} self.assertEqual(set(_LEGACY_EVENT_MAP.keys()), expected_legacy) def test_legacy_map_values_are_delegate_events(self): for old_name, event in _LEGACY_EVENT_MAP.items(): self.assertIsInstance(event, DelegateEvent) def test_progress_callback_normalises_tool_started(self): """_build_child_progress_callback handles tool.started via enum.""" parent = _make_mock_parent() parent._delegate_spinner = MagicMock() parent.tool_progress_callback = MagicMock() cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) self.assertIsNotNone(cb) cb("tool.started", tool_name="terminal", preview="ls") parent._delegate_spinner.print_above.assert_called() def test_progress_callback_normalises_thinking(self): """Both _thinking and reasoning.available route to TASK_THINKING.""" parent = _make_mock_parent() parent._delegate_spinner = MagicMock() parent.tool_progress_callback = None cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) cb("_thinking", tool_name=None, preview="pondering...") assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list) parent._delegate_spinner.print_above.reset_mock() cb("reasoning.available", tool_name=None, preview="hmm") assert any("💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list) def test_progress_callback_tool_completed_is_noop(self): """tool.completed is normalised but produces no display output.""" parent = _make_mock_parent() parent._delegate_spinner = MagicMock() parent.tool_progress_callback = None cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) cb("tool.completed", tool_name="terminal") parent._delegate_spinner.print_above.assert_not_called() def test_progress_callback_ignores_unknown_events(self): """Unknown event types are silently ignored.""" parent = _make_mock_parent() parent._delegate_spinner = MagicMock() cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) # Should not raise cb("some.unknown.event", tool_name="x") parent._delegate_spinner.print_above.assert_not_called() def test_progress_callback_accepts_enum_value_directly(self): """cb(DelegateEvent.TASK_THINKING, ...) must route to the thinking branch. Pre-fix the callback only handled legacy strings via _LEGACY_EVENT_MAP.get and silently dropped enum-typed callers.""" parent = _make_mock_parent() parent._delegate_spinner = MagicMock() parent.tool_progress_callback = None cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) cb(DelegateEvent.TASK_THINKING, preview="pondering") # If the enum was accepted, the thinking emoji got printed. assert any( "💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list ) def test_progress_callback_accepts_new_style_string(self): """cb('delegate.task_thinking', ...) — the string form of the enum value — must route to the thinking branch too, so new-style emitters don't have to import DelegateEvent.""" parent = _make_mock_parent() parent._delegate_spinner = MagicMock() cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) cb("delegate.task_thinking", preview="hmm") assert any( "💭" in str(c) for c in parent._delegate_spinner.print_above.call_args_list ) def test_progress_callback_task_progress_not_misrendered(self): """'subagent_progress' (legacy name for TASK_PROGRESS) carries a pre-batched summary in the tool_name slot. Before the fix, this fell through to the TASK_TOOL_STARTED rendering path, treating the summary string as a tool name. After the fix: distinct render (no tool-start emoji lookup) and pass-through relay upward (no re-batching). Regression path only reachable once nested orchestration is enabled: nested orchestrators relay subagent_progress from grandchildren upward through this callback. """ parent = _make_mock_parent() parent._delegate_spinner = MagicMock() parent.tool_progress_callback = MagicMock() cb = _build_child_progress_callback(0, "test goal", parent, task_count=1) cb("subagent_progress", tool_name="🔀 [1] terminal, file") # Spinner gets a distinct 🔀-prefixed line, NOT a tool emoji # followed by the summary string as if it were a tool name. calls = parent._delegate_spinner.print_above.call_args_list self.assertTrue(any("🔀 🔀 [1] terminal, file" in str(c) for c in calls)) # Parent callback receives the relay (pass-through, no re-batching). parent.tool_progress_callback.assert_called_once() # No '⚡' tool-start emoji should appear — that's the pre-fix bug. self.assertFalse(any("⚡" in str(c) for c in calls)) class TestConcurrencyDefaults(unittest.TestCase): """Tests for the concurrency default and no hard ceiling.""" @patch("tools.delegate_tool._load_config", return_value={}) def test_default_is_three(self, mock_cfg): # Clear env var if set with patch.dict(os.environ, {}, clear=True): self.assertEqual(_get_max_concurrent_children(), 3) @patch("tools.delegate_tool._load_config", return_value={"max_concurrent_children": 10}) def test_no_upper_ceiling(self, mock_cfg): """Users can raise concurrency as high as they want — no hard cap.""" self.assertEqual(_get_max_concurrent_children(), 10) @patch("tools.delegate_tool._load_config", return_value={"max_concurrent_children": 100}) def test_very_high_values_honored(self, mock_cfg): self.assertEqual(_get_max_concurrent_children(), 100) @patch("tools.delegate_tool._load_config", return_value={"max_concurrent_children": 0}) def test_zero_clamped_to_one(self, mock_cfg): """Floor of 1 is enforced; zero or negative values raise to 1.""" self.assertEqual(_get_max_concurrent_children(), 1) @patch("tools.delegate_tool._load_config", return_value={}) def test_env_var_honored_uncapped(self, mock_cfg): with patch.dict(os.environ, {"DELEGATION_MAX_CONCURRENT_CHILDREN": "12"}): self.assertEqual(_get_max_concurrent_children(), 12) @patch("tools.delegate_tool._load_config", return_value={"max_concurrent_children": 6}) def test_configured_value_returned(self, mock_cfg): self.assertEqual(_get_max_concurrent_children(), 6) # ========================================================================= # max_spawn_depth clamping # ========================================================================= class TestMaxSpawnDepth(unittest.TestCase): """Tests for _get_max_spawn_depth clamping and fallback behavior.""" @patch("tools.delegate_tool._load_config", return_value={}) def test_max_spawn_depth_defaults_to_1(self, mock_cfg): from tools.delegate_tool import _get_max_spawn_depth self.assertEqual(_get_max_spawn_depth(), 1) @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 0}) def test_max_spawn_depth_clamped_below_one(self, mock_cfg): import logging from tools.delegate_tool import _get_max_spawn_depth with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm: result = _get_max_spawn_depth() self.assertEqual(result, 1) self.assertTrue(any("clamping to 1" in m for m in cm.output)) @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 99}) def test_max_spawn_depth_clamped_above_three(self, mock_cfg): import logging from tools.delegate_tool import _get_max_spawn_depth with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm: result = _get_max_spawn_depth() self.assertEqual(result, 3) self.assertTrue(any("clamping to 3" in m for m in cm.output)) @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": "not-a-number"}) def test_max_spawn_depth_invalid_falls_back_to_default(self, mock_cfg): from tools.delegate_tool import _get_max_spawn_depth self.assertEqual(_get_max_spawn_depth(), 1) # ========================================================================= # role param plumbing # ========================================================================= # # These tests cover the schema + signature + stash plumbing of the role # param. The full role-honoring behavior (toolset re-add, role-aware # prompt) lives in TestOrchestratorRoleBehavior below; these tests only # assert on _delegate_role stashing and on the schema shape. class TestOrchestratorRoleSchema(unittest.TestCase): """Tests that the role param reaches the child via dispatch.""" @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 2}) def _run_with_mock_child(self, role_arg, mock_cfg, mock_creds): mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1, "messages": [], } mock_child._delegate_saved_tool_names = [] mock_child._credential_pool = None mock_child.session_prompt_tokens = 0 mock_child.session_completion_tokens = 0 mock_child.model = "test" MockAgent.return_value = mock_child kwargs = {"goal": "test", "parent_agent": parent} if role_arg is not _SENTINEL: kwargs["role"] = role_arg delegate_task(**kwargs) return mock_child def test_default_role_is_leaf(self): child = self._run_with_mock_child(_SENTINEL) self.assertEqual(child._delegate_role, "leaf") def test_explicit_orchestrator_role_stashed(self): """role='orchestrator' reaches _build_child_agent and is stashed. Full behavior (toolset re-add) lands in commit 3; commit 2 only verifies the plumbing.""" child = self._run_with_mock_child("orchestrator") self.assertEqual(child._delegate_role, "orchestrator") def test_unknown_role_coerces_to_leaf(self): """role='nonsense' → _normalize_role warns and returns 'leaf'.""" import logging with self.assertLogs("tools.delegate_tool", level=logging.WARNING) as cm: child = self._run_with_mock_child("nonsense") self.assertEqual(child._delegate_role, "leaf") self.assertTrue(any("coercing" in m.lower() for m in cm.output)) def test_schema_has_role_top_level_and_per_task(self): from tools.delegate_tool import DELEGATE_TASK_SCHEMA props = DELEGATE_TASK_SCHEMA["parameters"]["properties"] self.assertIn("role", props) self.assertEqual(props["role"]["enum"], ["leaf", "orchestrator"]) task_props = props["tasks"]["items"]["properties"] self.assertIn("role", task_props) self.assertEqual(task_props["role"]["enum"], ["leaf", "orchestrator"]) # Sentinel used to distinguish "role kwarg omitted" from "role=None". _SENTINEL = object() # ========================================================================= # role-honoring behavior # ========================================================================= def _make_role_mock_child(): """Helper: mock child with minimal fields for delegate_task to process.""" mock_child = MagicMock() mock_child.run_conversation.return_value = { "final_response": "done", "completed": True, "api_calls": 1, "messages": [], } mock_child._delegate_saved_tool_names = [] mock_child._credential_pool = None mock_child.session_prompt_tokens = 0 mock_child.session_completion_tokens = 0 mock_child.model = "test" return mock_child class TestOrchestratorRoleBehavior(unittest.TestCase): """Tests that role='orchestrator' actually changes toolset + prompt.""" @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 2}) def test_orchestrator_role_keeps_delegation_at_depth_1( self, mock_cfg, mock_creds ): """role='orchestrator' + depth-0 parent with max_spawn_depth=2 → child at depth 1 gets 'delegation' in enabled_toolsets (can further delegate). Requires max_spawn_depth>=2 since the new default is 1 (flat).""" mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) parent.enabled_toolsets = ["terminal", "file"] with patch("run_agent.AIAgent") as MockAgent: mock_child = _make_role_mock_child() MockAgent.return_value = mock_child delegate_task(goal="test", role="orchestrator", parent_agent=parent) kwargs = MockAgent.call_args[1] self.assertIn("delegation", kwargs["enabled_toolsets"]) self.assertEqual(mock_child._delegate_role, "orchestrator") @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 2}) def test_orchestrator_blocked_at_max_spawn_depth( self, mock_cfg, mock_creds ): """Parent at depth 1 with max_spawn_depth=2 spawns child at depth 2 (the floor); role='orchestrator' degrades to leaf.""" mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=1) parent.enabled_toolsets = ["terminal", "delegation"] with patch("run_agent.AIAgent") as MockAgent: mock_child = _make_role_mock_child() MockAgent.return_value = mock_child delegate_task(goal="test", role="orchestrator", parent_agent=parent) kwargs = MockAgent.call_args[1] self.assertNotIn("delegation", kwargs["enabled_toolsets"]) self.assertEqual(mock_child._delegate_role, "leaf") @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={}) def test_orchestrator_blocked_at_default_flat_depth( self, mock_cfg, mock_creds ): """With default max_spawn_depth=1 (flat), role='orchestrator' on a depth-0 parent produces a depth-1 child that is already at the floor — the role degrades to 'leaf' and the delegation toolset is stripped. This is the new default posture.""" mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) parent.enabled_toolsets = ["terminal", "file", "delegation"] with patch("run_agent.AIAgent") as MockAgent: mock_child = _make_role_mock_child() MockAgent.return_value = mock_child delegate_task(goal="test", role="orchestrator", parent_agent=parent) kwargs = MockAgent.call_args[1] self.assertNotIn("delegation", kwargs["enabled_toolsets"]) self.assertEqual(mock_child._delegate_role, "leaf") @patch("tools.delegate_tool._resolve_delegation_credentials") def test_orchestrator_enabled_false_forces_leaf(self, mock_creds): """Kill switch delegation.orchestrator_enabled=false overrides role='orchestrator'.""" mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) parent.enabled_toolsets = ["terminal", "delegation"] with patch("tools.delegate_tool._load_config", return_value={"orchestrator_enabled": False}): with patch("run_agent.AIAgent") as MockAgent: mock_child = _make_role_mock_child() MockAgent.return_value = mock_child delegate_task(goal="test", role="orchestrator", parent_agent=parent) kwargs = MockAgent.call_args[1] self.assertNotIn("delegation", kwargs["enabled_toolsets"]) self.assertEqual(mock_child._delegate_role, "leaf") # ── Role-aware system prompt ──────────────────────────────────────── def test_leaf_prompt_does_not_mention_delegation(self): prompt = _build_child_system_prompt( "Fix tests", role="leaf", max_spawn_depth=2, child_depth=1, ) self.assertNotIn("delegate_task", prompt) self.assertNotIn("Orchestrator Role", prompt) def test_orchestrator_prompt_mentions_delegation_capability(self): prompt = _build_child_system_prompt( "Survey approaches", role="orchestrator", max_spawn_depth=2, child_depth=1, ) self.assertIn("delegate_task", prompt) self.assertIn("Orchestrator Role", prompt) # Depth/max-depth note present and literal: self.assertIn("depth 1", prompt) self.assertIn("max_spawn_depth=2", prompt) def test_orchestrator_prompt_at_depth_floor_says_children_are_leaves(self): """With max_spawn_depth=2 and child_depth=1, the orchestrator's own children would be at depth 2 (the floor) → must be leaves.""" prompt = _build_child_system_prompt( "Survey", role="orchestrator", max_spawn_depth=2, child_depth=1, ) self.assertIn("MUST be leaves", prompt) def test_orchestrator_prompt_below_floor_allows_more_nesting(self): """With max_spawn_depth=3 and child_depth=1, the orchestrator's own children can themselves be orchestrators (depth 2 < 3).""" prompt = _build_child_system_prompt( "Deep work", role="orchestrator", max_spawn_depth=3, child_depth=1, ) self.assertIn("can themselves be orchestrators", prompt) # ── Batch mode and intersection ───────────────────────────────────── @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 2}) def test_batch_mode_per_task_role_override(self, mock_cfg, mock_creds): """Per-task role beats top-level; no top-level role → "leaf". tasks=[{role:'orchestrator'},{role:'leaf'},{}] → first gets delegation, second and third don't. Requires max_spawn_depth>=2 (raised explicitly here) since the new default is 1 (flat). """ mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) parent.enabled_toolsets = ["terminal", "file", "delegation"] built_toolsets = [] def _factory(*a, **kw): m = _make_role_mock_child() built_toolsets.append(kw.get("enabled_toolsets")) return m with patch("run_agent.AIAgent", side_effect=_factory): delegate_task( tasks=[ {"goal": "A", "role": "orchestrator"}, {"goal": "B", "role": "leaf"}, {"goal": "C"}, # no role → falls back to top_role (leaf) ], parent_agent=parent, ) self.assertIn("delegation", built_toolsets[0]) self.assertNotIn("delegation", built_toolsets[1]) self.assertNotIn("delegation", built_toolsets[2]) @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 2}) def test_intersection_preserves_delegation_bound( self, mock_cfg, mock_creds ): """Design decision: orchestrator capability is granted by role, NOT inherited from the parent's toolset. A parent without 'delegation' in its enabled_toolsets can still spawn an orchestrator child — the re-add in _build_child_agent runs unconditionally for orchestrators (when max_spawn_depth allows). If you want to change to "parent must have delegation too", update _build_child_agent to check parent_toolsets before the re-add and update this test to match. """ mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) parent.enabled_toolsets = ["terminal", "file"] # no delegation with patch("run_agent.AIAgent") as MockAgent: mock_child = _make_role_mock_child() MockAgent.return_value = mock_child delegate_task(goal="test", role="orchestrator", parent_agent=parent) self.assertIn("delegation", MockAgent.call_args[1]["enabled_toolsets"]) class TestOrchestratorEndToEnd(unittest.TestCase): """End-to-end: parent -> orchestrator -> two-leaf nested orchestration. Covers the acceptance gate: parent delegates to an orchestrator child; the orchestrator delegates to two leaf grandchildren; the role/toolset/depth chain all resolve correctly. Mock strategy: a single AIAgent patch with a side_effect factory that keys on the child's ephemeral_system_prompt — orchestrator prompts contain the string "Orchestrator Role" (see _build_child_system_prompt), leaves don't. The orchestrator mock's run_conversation recursively calls delegate_task with tasks=[{goal:...},{goal:...}] to spawn two leaves. This keeps the test in one patch context and avoids depth-indexed nesting. """ @patch("tools.delegate_tool._resolve_delegation_credentials") @patch("tools.delegate_tool._load_config", return_value={"max_spawn_depth": 2}) def test_end_to_end_nested_orchestration(self, mock_cfg, mock_creds): mock_creds.return_value = { "provider": None, "base_url": None, "api_key": None, "api_mode": None, "model": None, } parent = _make_mock_parent(depth=0) parent.enabled_toolsets = ["terminal", "file", "delegation"] # (enabled_toolsets, _delegate_role) for each agent built built_agents: list = [] # Keep the orchestrator mock around so the re-entrant delegate_task # can reach it via closure. orch_mock = {} def _factory(*a, **kw): prompt = kw.get("ephemeral_system_prompt", "") or "" is_orchestrator = "Orchestrator Role" in prompt m = _make_role_mock_child() built_agents.append({ "enabled_toolsets": list(kw.get("enabled_toolsets") or []), "is_orchestrator_prompt": is_orchestrator, }) if is_orchestrator: # Prepare the orchestrator mock as a parent-capable object # so the nested delegate_task call succeeds. m._delegate_depth = 1 m._delegate_role = "orchestrator" m._active_children = [] m._active_children_lock = threading.Lock() m._session_db = None m.platform = "cli" m.enabled_toolsets = ["terminal", "file", "delegation"] m.api_key = "***" m.base_url = "" m.provider = None m.api_mode = None m.providers_allowed = None m.providers_ignored = None m.providers_order = None m.provider_sort = None m._print_fn = None m.tool_progress_callback = None m.thinking_callback = None orch_mock["agent"] = m def _orchestrator_run(user_message=None, task_id=None): # Re-entrant: orchestrator spawns two leaves delegate_task( tasks=[{"goal": "leaf-A"}, {"goal": "leaf-B"}], parent_agent=m, ) return { "final_response": "orchestrated 2 workers", "completed": True, "api_calls": 1, "messages": [], } m.run_conversation.side_effect = _orchestrator_run return m with patch("run_agent.AIAgent", side_effect=_factory) as MockAgent: delegate_task( goal="top-level orchestration", role="orchestrator", parent_agent=parent, ) # 1 orchestrator + 2 leaf grandchildren = 3 agents self.assertEqual(MockAgent.call_count, 3) # First built = the orchestrator (parent's direct child) self.assertIn("delegation", built_agents[0]["enabled_toolsets"]) self.assertTrue(built_agents[0]["is_orchestrator_prompt"]) # Next two = leaves (grandchildren) self.assertNotIn("delegation", built_agents[1]["enabled_toolsets"]) self.assertFalse(built_agents[1]["is_orchestrator_prompt"]) self.assertNotIn("delegation", built_agents[2]["enabled_toolsets"]) self.assertFalse(built_agents[2]["is_orchestrator_prompt"]) if __name__ == "__main__": unittest.main()