fix: propagate child activity to parent during delegate_task (#7295)

When delegate_task runs, the parent agent's activity tracker freezes
because child.run_conversation() blocks and the child's own
_touch_activity() never propagates back to the parent. The gateway
inactivity timeout then fires a spurious 'No activity' warning and
eventually kills the agent, even though the subagent is actively working.

Fix: add a heartbeat thread in _run_single_child that calls
parent._touch_activity() every 30 seconds with detail from the child's
activity summary (current tool, iteration count). The thread is a daemon
that starts before child.run_conversation() and is cleaned up in the
finally block.

This also improves the gateway 'Still working...' status messages —
instead of just 'running: delegate_task', users now see what the
subagent is actually doing (e.g., 'delegate_task: subagent running
terminal (iteration 5/50)').
This commit is contained in:
Teknium 2026-04-10 12:51:30 -07:00 committed by GitHub
parent f72faf191c
commit a093eb47f7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 200 additions and 0 deletions

View file

@ -13,6 +13,7 @@ import json
import os
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch
@ -1052,5 +1053,159 @@ class TestChildCredentialLeasing(unittest.TestCase):
child._credential_pool.release_lease.assert_called_once_with("cred-a")
class TestDelegateHeartbeat(unittest.TestCase):
"""Heartbeat propagates child activity to parent during delegation.
Without the heartbeat, the gateway inactivity timeout fires because the
parent's _last_activity_ts freezes when delegate_task starts.
"""
def test_heartbeat_touches_parent_activity_during_child_run(self):
"""Parent's _touch_activity is called while child.run_conversation blocks."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "terminal",
"api_call_count": 3,
"max_iterations": 50,
"last_activity_desc": "executing tool: terminal",
}
# Make run_conversation block long enough for heartbeats to fire
def slow_run(**kwargs):
time.sleep(0.25)
return {"final_response": "done", "completed": True, "api_calls": 3}
child.run_conversation.side_effect = slow_run
# Patch the heartbeat interval to fire quickly
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test heartbeat",
child=child,
parent_agent=parent,
)
# Heartbeat should have fired at least once during the 0.25s sleep
self.assertGreater(len(touch_calls), 0,
"Heartbeat did not propagate activity to parent")
# Verify the description includes child's current tool detail
self.assertTrue(
any("terminal" in desc for desc in touch_calls),
f"Heartbeat descriptions should include child tool info: {touch_calls}")
def test_heartbeat_stops_after_child_completes(self):
"""Heartbeat thread is cleaned up when the child finishes."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 1,
"max_iterations": 50,
"last_activity_desc": "done",
}
child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1,
}
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test cleanup",
child=child,
parent_agent=parent,
)
# Record count after completion, wait, and verify no more calls
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child completed")
def test_heartbeat_stops_after_child_error(self):
"""Heartbeat thread is cleaned up even when the child raises."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "web_search",
"api_call_count": 2,
"max_iterations": 50,
"last_activity_desc": "executing tool: web_search",
}
def slow_fail(**kwargs):
time.sleep(0.15)
raise RuntimeError("network timeout")
child.run_conversation.side_effect = slow_fail
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
result = _run_single_child(
task_index=0,
goal="Test error cleanup",
child=child,
parent_agent=parent,
)
self.assertEqual(result["status"], "error")
# Verify heartbeat stopped
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child error")
def test_heartbeat_includes_child_activity_desc_when_no_tool(self):
"""When child has no current_tool, heartbeat uses last_activity_desc."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 5,
"max_iterations": 90,
"last_activity_desc": "API call #5 completed",
}
def slow_run(**kwargs):
time.sleep(0.15)
return {"final_response": "done", "completed": True, "api_calls": 5}
child.run_conversation.side_effect = slow_run
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test desc fallback",
child=child,
parent_agent=parent,
)
self.assertGreater(len(touch_calls), 0)
self.assertTrue(
any("API call #5 completed" in desc for desc in touch_calls),
f"Heartbeat should include last_activity_desc: {touch_calls}")
if __name__ == "__main__":
unittest.main()

View file

@ -20,6 +20,7 @@ import json
import logging
logger = logging.getLogger(__name__)
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
@ -37,6 +38,7 @@ DELEGATE_BLOCKED_TOOLS = frozenset([
MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
DEFAULT_MAX_ITERATIONS = 50
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
@ -369,6 +371,44 @@ def _run_single_child(
except Exception as exc:
logger.debug("Failed to bind child to leased credential: %s", exc)
# Heartbeat: periodically propagate child activity to the parent so the
# gateway inactivity timeout doesn't fire while the subagent is working.
# Without this, the parent's _last_activity_ts freezes when delegate_task
# starts and the gateway eventually kills the agent for "no activity".
_heartbeat_stop = threading.Event()
def _heartbeat_loop():
while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
if parent_agent is None:
continue
touch = getattr(parent_agent, '_touch_activity', None)
if not touch:
continue
# Pull detail from the child's own activity tracker
desc = f"delegate_task: subagent {task_index} working"
try:
child_summary = child.get_activity_summary()
child_tool = child_summary.get("current_tool")
child_iter = child_summary.get("api_call_count", 0)
child_max = child_summary.get("max_iterations", 0)
if child_tool:
desc = (f"delegate_task: subagent running {child_tool} "
f"(iteration {child_iter}/{child_max})")
else:
child_desc = child_summary.get("last_activity_desc", "")
if child_desc:
desc = (f"delegate_task: subagent {child_desc} "
f"(iteration {child_iter}/{child_max})")
except Exception:
pass
try:
touch(desc)
except Exception:
pass
_heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True)
_heartbeat_thread.start()
try:
result = child.run_conversation(user_message=goal)
@ -479,6 +519,11 @@ def _run_single_child(
}
finally:
# Stop the heartbeat thread so it doesn't keep touching parent activity
# after the child has finished (or failed).
_heartbeat_stop.set()
_heartbeat_thread.join(timeout=5)
if child_pool is not None and leased_cred_id is not None:
try:
child_pool.release_lease(leased_cred_id)