fix(delegation): add hard timeout and stale detection for subagent execution (#13770)

- Wrap child.run_conversation() in a ThreadPoolExecutor with configurable
  timeout (delegation.child_timeout_seconds, default 300s) to prevent
  indefinite blocking when a subagent's API call or tool HTTP request hangs.

- Add heartbeat stale detection: if a child's api_call_count doesn't
  advance for 5 consecutive heartbeat cycles (~2.5 min), stop touching
  the parent's activity timestamp so the gateway inactivity timeout
  can fire as a last resort.

- Add 'timeout' as a new exit_reason/status alongside the existing
  completed/max_iterations/interrupted states.

- Use shutdown(wait=False) on the timeout executor to avoid the
  ThreadPoolExecutor.__exit__ deadlock when a child is stuck on
  blocking I/O.

Closes #13768
This commit is contained in:
Kongxi 2026-04-22 11:20:16 +08:00 committed by GitHub
parent c832ebd67c
commit dd8ab40556
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 107 additions and 2 deletions

View file

@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError, as_completed
from typing import Any, Dict, List, Optional
from toolsets import TOOLSETS
@ -112,6 +112,31 @@ def _get_max_concurrent_children() -> int:
return _DEFAULT_MAX_CONCURRENT_CHILDREN
def _get_child_timeout() -> float:
"""Read delegation.child_timeout_seconds from config.
Returns the number of seconds a single child agent is allowed to run
before being considered stuck. Default: 300 s (5 minutes).
"""
cfg = _load_config()
val = cfg.get("child_timeout_seconds")
if val is not None:
try:
return max(30.0, float(val))
except (TypeError, ValueError):
logger.warning(
"delegation.child_timeout_seconds=%r is not a valid number; "
"using default %d", val, DEFAULT_CHILD_TIMEOUT,
)
env_val = os.getenv("DELEGATION_CHILD_TIMEOUT_SECONDS")
if env_val:
try:
return max(30.0, float(env_val))
except (TypeError, ValueError):
pass
return float(DEFAULT_CHILD_TIMEOUT)
def _get_max_spawn_depth() -> int:
"""Read delegation.max_spawn_depth from config, clamped to [1, 3].
@ -165,7 +190,9 @@ def _get_orchestrator_enabled() -> bool:
DEFAULT_MAX_ITERATIONS = 50
DEFAULT_CHILD_TIMEOUT = 300 # seconds before a child agent is considered stuck
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
_HEARTBEAT_STALE_CYCLES = 5 # mark child stale after this many heartbeats with no iteration progress
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
@ -689,6 +716,8 @@ def _run_single_child(
# Without this, the parent's _last_activity_ts freezes when delegate_task
# starts and the gateway eventually kills the agent for "no activity".
_heartbeat_stop = threading.Event()
_last_seen_iter = [0] # mutable container for heartbeat stale detection
_stale_count = [0]
def _heartbeat_loop():
while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
@ -704,6 +733,25 @@ def _run_single_child(
child_tool = child_summary.get("current_tool")
child_iter = child_summary.get("api_call_count", 0)
child_max = child_summary.get("max_iterations", 0)
# Stale detection: if iteration count hasn't advanced,
# increment stale counter. After N cycles with no
# progress, stop masking the hang so the gateway
# inactivity timeout can fire as a last resort.
if child_iter <= _last_seen_iter[0]:
_stale_count[0] += 1
else:
_last_seen_iter[0] = child_iter
_stale_count[0] = 0
if _stale_count[0] >= _HEARTBEAT_STALE_CYCLES:
logger.warning(
"Subagent %d appears stale (no iteration progress "
"for %d heartbeat cycles) — stopping heartbeat",
task_index, _stale_count[0],
)
break # stop touching parent, let gateway timeout fire
if child_tool:
desc = (f"delegate_task: subagent running {child_tool} "
f"(iteration {child_iter}/{child_max})")
@ -744,7 +792,63 @@ def _run_single_child(
if parent_task_id else []
)
result = child.run_conversation(user_message=goal, task_id=child_task_id)
# Run child with a hard timeout to prevent indefinite blocking
# when the child's API call or tool-level HTTP request hangs.
child_timeout = _get_child_timeout()
_timeout_executor = ThreadPoolExecutor(max_workers=1)
_child_future = _timeout_executor.submit(
child.run_conversation, user_message=goal, task_id=child_task_id,
)
try:
result = _child_future.result(timeout=child_timeout)
except Exception as _timeout_exc:
# Signal the child to stop so its thread can exit cleanly.
try:
if hasattr(child, 'interrupt'):
child.interrupt()
elif hasattr(child, '_interrupt_requested'):
child._interrupt_requested = True
except Exception:
pass
is_timeout = isinstance(_timeout_exc, (FuturesTimeoutError, TimeoutError))
duration = round(time.monotonic() - child_start, 2)
logger.warning(
"Subagent %d %s after %.1fs",
task_index,
"timed out" if is_timeout else f"raised {type(_timeout_exc).__name__}",
duration,
)
if child_progress_cb:
try:
child_progress_cb(
"subagent.complete",
preview=f"Timed out after {duration}s" if is_timeout else str(_timeout_exc),
status="timeout" if is_timeout else "error",
duration_seconds=duration,
summary="",
)
except Exception:
pass
return {
"task_index": task_index,
"status": "timeout" if is_timeout else "error",
"summary": None,
"error": (
f"Subagent timed out after {child_timeout}s with no response. "
"The child may be stuck on a slow API call or unresponsive network request."
) if is_timeout else str(_timeout_exc),
"exit_reason": "timeout" if is_timeout else "error",
"api_calls": 0,
"duration_seconds": duration,
"_child_role": getattr(child, "_delegate_role", None),
}
finally:
# Shut down executor without waiting — if the child thread
# is stuck on blocking I/O, wait=True would hang forever.
_timeout_executor.shutdown(wait=False)
# Flush any remaining batched progress to gateway
if child_progress_cb and hasattr(child_progress_cb, '_flush'):