From dd8ab40556cc25e7e70e730ff3eaca803cc93550 Mon Sep 17 00:00:00 2001 From: Kongxi Date: Wed, 22 Apr 2026 11:20:16 +0800 Subject: [PATCH] fix(delegation): add hard timeout and stale detection for subagent execution (#13770) - Wrap child.run_conversation() in a ThreadPoolExecutor with configurable timeout (delegation.child_timeout_seconds, default 300s) to prevent indefinite blocking when a subagent's API call or tool HTTP request hangs. - Add heartbeat stale detection: if a child's api_call_count doesn't advance for 5 consecutive heartbeat cycles (~2.5 min), stop touching the parent's activity timestamp so the gateway inactivity timeout can fire as a last resort. - Add 'timeout' as a new exit_reason/status alongside the existing completed/max_iterations/interrupted states. - Use shutdown(wait=False) on the timeout executor to avoid the ThreadPoolExecutor.__exit__ deadlock when a child is stuck on blocking I/O. Closes #13768 --- scripts/release.py | 1 + tools/delegate_tool.py | 108 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/scripts/release.py b/scripts/release.py index 705099313..0a6f7b88d 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -334,6 +334,7 @@ AUTHOR_MAP = { "asslaenn5@gmail.com": "Aslaaen", "shalompmc0505@naver.com": "pinion05", "105142614+VTRiot@users.noreply.github.com": "VTRiot", + "vivien000812@gmail.com": "iamagenius00", } diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index ebf771d15..8bac6eba5 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) import os import threading import time -from concurrent.futures import ThreadPoolExecutor, as_completed +from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError, as_completed from typing import Any, Dict, List, Optional from toolsets import TOOLSETS @@ -112,6 +112,31 @@ def _get_max_concurrent_children() -> int: return _DEFAULT_MAX_CONCURRENT_CHILDREN +def _get_child_timeout() -> float: + """Read delegation.child_timeout_seconds from config. + + Returns the number of seconds a single child agent is allowed to run + before being considered stuck. Default: 300 s (5 minutes). + """ + cfg = _load_config() + val = cfg.get("child_timeout_seconds") + if val is not None: + try: + return max(30.0, float(val)) + except (TypeError, ValueError): + logger.warning( + "delegation.child_timeout_seconds=%r is not a valid number; " + "using default %d", val, DEFAULT_CHILD_TIMEOUT, + ) + env_val = os.getenv("DELEGATION_CHILD_TIMEOUT_SECONDS") + if env_val: + try: + return max(30.0, float(env_val)) + except (TypeError, ValueError): + pass + return float(DEFAULT_CHILD_TIMEOUT) + + def _get_max_spawn_depth() -> int: """Read delegation.max_spawn_depth from config, clamped to [1, 3]. @@ -165,7 +190,9 @@ def _get_orchestrator_enabled() -> bool: DEFAULT_MAX_ITERATIONS = 50 +DEFAULT_CHILD_TIMEOUT = 300 # seconds before a child agent is considered stuck _HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation +_HEARTBEAT_STALE_CYCLES = 5 # mark child stale after this many heartbeats with no iteration progress DEFAULT_TOOLSETS = ["terminal", "file", "web"] @@ -689,6 +716,8 @@ def _run_single_child( # Without this, the parent's _last_activity_ts freezes when delegate_task # starts and the gateway eventually kills the agent for "no activity". _heartbeat_stop = threading.Event() + _last_seen_iter = [0] # mutable container for heartbeat stale detection + _stale_count = [0] def _heartbeat_loop(): while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL): @@ -704,6 +733,25 @@ def _run_single_child( child_tool = child_summary.get("current_tool") child_iter = child_summary.get("api_call_count", 0) child_max = child_summary.get("max_iterations", 0) + + # Stale detection: if iteration count hasn't advanced, + # increment stale counter. After N cycles with no + # progress, stop masking the hang so the gateway + # inactivity timeout can fire as a last resort. + if child_iter <= _last_seen_iter[0]: + _stale_count[0] += 1 + else: + _last_seen_iter[0] = child_iter + _stale_count[0] = 0 + + if _stale_count[0] >= _HEARTBEAT_STALE_CYCLES: + logger.warning( + "Subagent %d appears stale (no iteration progress " + "for %d heartbeat cycles) — stopping heartbeat", + task_index, _stale_count[0], + ) + break # stop touching parent, let gateway timeout fire + if child_tool: desc = (f"delegate_task: subagent running {child_tool} " f"(iteration {child_iter}/{child_max})") @@ -744,7 +792,63 @@ def _run_single_child( if parent_task_id else [] ) - result = child.run_conversation(user_message=goal, task_id=child_task_id) + # Run child with a hard timeout to prevent indefinite blocking + # when the child's API call or tool-level HTTP request hangs. + child_timeout = _get_child_timeout() + _timeout_executor = ThreadPoolExecutor(max_workers=1) + _child_future = _timeout_executor.submit( + child.run_conversation, user_message=goal, task_id=child_task_id, + ) + try: + result = _child_future.result(timeout=child_timeout) + except Exception as _timeout_exc: + # Signal the child to stop so its thread can exit cleanly. + try: + if hasattr(child, 'interrupt'): + child.interrupt() + elif hasattr(child, '_interrupt_requested'): + child._interrupt_requested = True + except Exception: + pass + + is_timeout = isinstance(_timeout_exc, (FuturesTimeoutError, TimeoutError)) + duration = round(time.monotonic() - child_start, 2) + logger.warning( + "Subagent %d %s after %.1fs", + task_index, + "timed out" if is_timeout else f"raised {type(_timeout_exc).__name__}", + duration, + ) + + if child_progress_cb: + try: + child_progress_cb( + "subagent.complete", + preview=f"Timed out after {duration}s" if is_timeout else str(_timeout_exc), + status="timeout" if is_timeout else "error", + duration_seconds=duration, + summary="", + ) + except Exception: + pass + + return { + "task_index": task_index, + "status": "timeout" if is_timeout else "error", + "summary": None, + "error": ( + f"Subagent timed out after {child_timeout}s with no response. " + "The child may be stuck on a slow API call or unresponsive network request." + ) if is_timeout else str(_timeout_exc), + "exit_reason": "timeout" if is_timeout else "error", + "api_calls": 0, + "duration_seconds": duration, + "_child_role": getattr(child, "_delegate_role", None), + } + finally: + # Shut down executor without waiting — if the child thread + # is stuck on blocking I/O, wait=True would hang forever. + _timeout_executor.shutdown(wait=False) # Flush any remaining batched progress to gateway if child_progress_cb and hasattr(child_progress_cb, '_flush'):