Merge remote-tracking branch 'origin/main' into sid/types-and-lints

# Conflicts:
#	gateway/run.py
#	tools/delegate_tool.py
This commit is contained in:
alt-glitch 2026-04-22 14:27:37 +05:30
commit 847ffca715
171 changed files with 15125 additions and 1675 deletions

View file

@ -16,16 +16,18 @@ The parent's context only sees the delegation call and the summary result,
never the child's intermediate tool calls or reasoning.
"""
import enum
import json
import logging
logger = logging.getLogger(__name__)
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Callable, Dict, List, Optional
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError, as_completed
from typing import Any, Dict, List, Optional
from toolsets import TOOLSETS
from tools import file_state
from utils import base_url_hostname
@ -41,6 +43,12 @@ DELEGATE_BLOCKED_TOOLS = frozenset([
# Build a description fragment listing toolsets available for subagents.
# Excludes toolsets where ALL tools are blocked, composite/platform toolsets
# (hermes-* prefixed), and scenario toolsets.
#
# NOTE: "delegation" is in this exclusion set so the subagent-facing
# capability hint string (_TOOLSET_LIST_STR) doesn't advertise it as a
# toolset to request explicitly — the correct mechanism for nested
# delegation is role='orchestrator', which re-adds "delegation" in
# _build_child_agent regardless of this exclusion.
_EXCLUDED_TOOLSET_NAMES = frozenset({"debugging", "safe", "delegation", "moa", "rl"})
_SUBAGENT_TOOLSETS = sorted(
name for name, defn in TOOLSETS.items()
@ -51,13 +59,36 @@ _SUBAGENT_TOOLSETS = sorted(
_TOOLSET_LIST_STR = ", ".join(f"'{n}'" for n in _SUBAGENT_TOOLSETS)
_DEFAULT_MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
MAX_DEPTH = 1 # flat by default: parent (0) -> child (1); grandchild rejected unless max_spawn_depth raised.
# Configurable depth cap consulted by _get_max_spawn_depth; MAX_DEPTH
# stays as the default fallback and is still the symbol tests import.
_MIN_SPAWN_DEPTH = 1
_MAX_SPAWN_DEPTH_CAP = 3
def _normalize_role(r: Optional[str]) -> str:
"""Normalise a caller-provided role to 'leaf' or 'orchestrator'.
None/empty -> 'leaf'. Unknown strings coerce to 'leaf' with a
warning log (matches the silent-degrade pattern of
_get_orchestrator_enabled). _build_child_agent adds a second
degrade layer for depth/kill-switch bounds.
"""
if r is None or not r:
return "leaf"
r_norm = str(r).strip().lower()
if r_norm in ("leaf", "orchestrator"):
return r_norm
logger.warning("Unknown delegate_task role=%r, coercing to 'leaf'", r)
return "leaf"
def _get_max_concurrent_children() -> int:
"""Read delegation.max_concurrent_children from config, falling back to
DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3).
Users can raise this as high as they want; only the floor (1) is enforced.
Uses the same ``_load_config()`` path that the rest of ``delegate_task``
uses, keeping config priority consistent (config.yaml > env > default).
"""
@ -71,18 +102,135 @@ def _get_max_concurrent_children() -> int:
"delegation.max_concurrent_children=%r is not a valid integer; "
"using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN,
)
return _DEFAULT_MAX_CONCURRENT_CHILDREN
env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN")
if env_val:
try:
return max(1, int(env_val))
except (TypeError, ValueError):
pass
return _DEFAULT_MAX_CONCURRENT_CHILDREN
return _DEFAULT_MAX_CONCURRENT_CHILDREN
def _get_child_timeout() -> float:
"""Read delegation.child_timeout_seconds from config.
Returns the number of seconds a single child agent is allowed to run
before being considered stuck. Default: 300 s (5 minutes).
"""
cfg = _load_config()
val = cfg.get("child_timeout_seconds")
if val is not None:
try:
return max(30.0, float(val))
except (TypeError, ValueError):
logger.warning(
"delegation.child_timeout_seconds=%r is not a valid number; "
"using default %d", val, DEFAULT_CHILD_TIMEOUT,
)
env_val = os.getenv("DELEGATION_CHILD_TIMEOUT_SECONDS")
if env_val:
try:
return max(30.0, float(env_val))
except (TypeError, ValueError):
pass
return float(DEFAULT_CHILD_TIMEOUT)
def _get_max_spawn_depth() -> int:
"""Read delegation.max_spawn_depth from config, clamped to [1, 3].
depth 0 = parent agent. max_spawn_depth = N means agents at depths
0..N-1 can spawn; depth N is the leaf floor. Default 1 is flat:
parent spawns children (depth 1), depth-1 children cannot spawn
(blocked by this guard AND, for leaf children, by the delegation
toolset strip in _strip_blocked_tools).
Raise to 2 or 3 to unlock nested orchestration. role="orchestrator"
removes the toolset strip for depth-1 children when
max_spawn_depth >= 2, enabling them to spawn their own workers.
"""
cfg = _load_config()
val = cfg.get("max_spawn_depth")
if val is None:
return MAX_DEPTH
try:
ival = int(val)
except (TypeError, ValueError):
logger.warning(
"delegation.max_spawn_depth=%r is not a valid integer; "
"using default %d", val, MAX_DEPTH,
)
return MAX_DEPTH
clamped = max(_MIN_SPAWN_DEPTH, min(_MAX_SPAWN_DEPTH_CAP, ival))
if clamped != ival:
logger.warning(
"delegation.max_spawn_depth=%d out of range [%d, %d]; "
"clamping to %d", ival, _MIN_SPAWN_DEPTH,
_MAX_SPAWN_DEPTH_CAP, clamped,
)
return clamped
def _get_orchestrator_enabled() -> bool:
"""Global kill switch for the orchestrator role.
When False, role="orchestrator" is silently forced to "leaf" in
_build_child_agent and the delegation toolset is stripped as before.
Lets an operator disable the feature without a code revert.
"""
cfg = _load_config()
val = cfg.get("orchestrator_enabled", True)
if isinstance(val, bool):
return val
# Accept "true"/"false" strings from YAML that doesn't auto-coerce.
if isinstance(val, str):
return val.strip().lower() in ("true", "1", "yes", "on")
return True
DEFAULT_MAX_ITERATIONS = 50
DEFAULT_CHILD_TIMEOUT = 300 # seconds before a child agent is considered stuck
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
_HEARTBEAT_STALE_CYCLES = 5 # mark child stale after this many heartbeats with no iteration progress
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
# ---------------------------------------------------------------------------
# Delegation progress event types
# ---------------------------------------------------------------------------
class DelegateEvent(str, enum.Enum):
"""Formal event types emitted during delegation progress.
_build_child_progress_callback normalises incoming legacy strings
(``tool.started``, ``_thinking``, ) to these enum values via
``_LEGACY_EVENT_MAP``. External consumers (gateway SSE, ACP adapter,
CLI) still receive the legacy strings during the deprecation window.
TASK_SPAWNED / TASK_COMPLETED / TASK_FAILED are reserved for
future orchestrator lifecycle events and are not currently emitted.
"""
TASK_SPAWNED = "delegate.task_spawned"
TASK_PROGRESS = "delegate.task_progress"
TASK_COMPLETED = "delegate.task_completed"
TASK_FAILED = "delegate.task_failed"
TASK_THINKING = "delegate.task_thinking"
TASK_TOOL_STARTED = "delegate.tool_started"
TASK_TOOL_COMPLETED = "delegate.tool_completed"
# Legacy event strings → DelegateEvent mapping.
# Incoming child-agent events use the old names; the callback normalises them.
_LEGACY_EVENT_MAP: Dict[str, DelegateEvent] = {
"_thinking": DelegateEvent.TASK_THINKING,
"reasoning.available": DelegateEvent.TASK_THINKING,
"tool.started": DelegateEvent.TASK_TOOL_STARTED,
"tool.completed": DelegateEvent.TASK_TOOL_COMPLETED,
"subagent_progress": DelegateEvent.TASK_PROGRESS,
}
def check_delegate_requirements() -> bool:
"""Delegation has no external requirements -- always available."""
return True
@ -93,8 +241,18 @@ def _build_child_system_prompt(
context: Optional[str] = None,
*,
workspace_path: Optional[str] = None,
role: str = "leaf",
max_spawn_depth: int = 2,
child_depth: int = 1,
) -> str:
"""Build a focused system prompt for a child agent."""
"""Build a focused system prompt for a child agent.
When role='orchestrator', appends a delegation-capability block
modeled on OpenClaw's buildSubagentSystemPrompt (canSpawn branch at
inspiration/openclaw/src/agents/subagent-system-prompt.ts:63-95).
The depth note is literal truth (grounded in the passed config) so
the LLM doesn't confabulate nesting capabilities that don't exist.
"""
parts = [
"You are a focused subagent working on a specific delegated task.",
"",
@ -120,6 +278,37 @@ def _build_child_system_prompt(
"Be thorough but concise -- your response is returned to the "
"parent agent as a summary."
)
if role == "orchestrator":
child_note = (
"Your own children MUST be leaves (cannot delegate further) "
"because they would be at the depth floor — you cannot pass "
"role='orchestrator' to your own delegate_task calls."
if child_depth + 1 >= max_spawn_depth else
"Your own children can themselves be orchestrators or leaves, "
"depending on the `role` you pass to delegate_task. Default is "
"'leaf'; pass role='orchestrator' explicitly when a child "
"needs to further decompose its work."
)
parts.append(
"\n## Subagent Spawning (Orchestrator Role)\n"
"You have access to the `delegate_task` tool and CAN spawn "
"your own subagents to parallelize independent work.\n\n"
"WHEN to delegate:\n"
"- The goal decomposes into 2+ independent subtasks that can "
"run in parallel (e.g. research A and B simultaneously).\n"
"- A subtask is reasoning-heavy and would flood your context "
"with intermediate data.\n\n"
"WHEN NOT to delegate:\n"
"- Single-step mechanical work — do it directly.\n"
"- Trivial tasks you can execute in one or two tool calls.\n"
"- Re-delegating your entire assigned goal to one worker "
"(that's just pass-through with no value added).\n\n"
"Coordinate your workers' results and synthesize them before "
"reporting back to your parent. You are responsible for the "
"final summary, not your workers.\n\n"
f"NOTE: You are at depth {child_depth}. The delegation tree "
f"is capped at max_spawn_depth={max_spawn_depth}. {child_note}"
)
return "\n".join(parts)
@ -197,10 +386,9 @@ def _build_child_progress_callback(task_index: int, goal: str, parent_agent, tas
except Exception as e:
logger.debug("Parent callback failed: %s", e)
def _callback(event_type: str, tool_name: str = None, preview: str = None, args=None, **kwargs):
# event_type is one of: "tool.started", "tool.completed",
# "reasoning.available", "_thinking", "subagent.*"
def _callback(event_type, tool_name: str = None, preview: str = None, args=None, **kwargs):
# Lifecycle events emitted by the orchestrator itself — handled
# before enum normalisation since they are not part of DelegateEvent.
if event_type == "subagent.start":
if spinner and goal_label:
short = (goal_label[:55] + "...") if len(goal_label) > 55 else goal_label
@ -215,8 +403,21 @@ def _build_child_progress_callback(task_index: int, goal: str, parent_agent, tas
_relay("subagent.complete", preview=preview, **kwargs)
return
# "_thinking" / reasoning events
if event_type in ("_thinking", "reasoning.available"):
# Normalise legacy strings, new-style "delegate.*" strings, and
# DelegateEvent enum values all to a single DelegateEvent. The
# original implementation only accepted the five legacy strings;
# enum-typed callers were silently dropped.
if isinstance(event_type, DelegateEvent):
event = event_type
else:
event = _LEGACY_EVENT_MAP.get(event_type)
if event is None:
try:
event = DelegateEvent(event_type)
except (ValueError, TypeError):
return # Unknown event — ignore
if event == DelegateEvent.TASK_THINKING:
text = preview or tool_name or ""
if spinner:
short = (text[:55] + "...") if len(text) > 55 else text
@ -227,11 +428,31 @@ def _build_child_progress_callback(task_index: int, goal: str, parent_agent, tas
_relay("subagent.thinking", preview=text)
return
# tool.completed — no display needed here (spinner shows on started)
if event_type == "tool.completed":
if event == DelegateEvent.TASK_TOOL_COMPLETED:
return
# tool.started — display and batch for parent relay
if event == DelegateEvent.TASK_PROGRESS:
# Pre-batched progress summary relayed from a nested
# orchestrator's grandchild (upstream emits as
# parent_cb("subagent_progress", summary_string) where the
# summary lands in the tool_name positional slot). Treat as
# a pass-through: render distinctly (not via the tool-start
# emoji lookup, which would mistake the summary string for a
# tool name) and relay upward without re-batching.
summary_text = tool_name or preview or ""
if spinner and summary_text:
try:
spinner.print_above(f" {prefix}├─ 🔀 {summary_text}")
except Exception as e:
logger.debug("Spinner print_above failed: %s", e)
if parent_cb:
try:
parent_cb("subagent_progress", f"{prefix}{summary_text}")
except Exception as e:
logger.debug("Parent callback relay failed: %s", e)
return
# TASK_TOOL_STARTED — display and batch for parent relay
if spinner:
short = (preview[:35] + "...") if preview and len(preview) > 35 else (preview or "")
from agent.display import get_tool_emoji
@ -280,6 +501,10 @@ def _build_child_agent(
# ACP transport overrides — lets a non-ACP parent spawn ACP child agents
override_acp_command: Optional[str] = None,
override_acp_args: Optional[List[str]] = None,
# Per-call role controlling whether the child can further delegate.
# 'leaf' (default) cannot; 'orchestrator' retains the delegation
# toolset subject to depth/kill-switch bounds applied below.
role: str = "leaf",
):
"""
Build a child AIAgent on the main thread (thread-safe construction).
@ -292,6 +517,17 @@ def _build_child_agent(
"""
from run_agent import AIAgent
# ── Role resolution ─────────────────────────────────────────────────
# Honor the caller's role only when BOTH the kill switch and the
# child's depth allow it. This is the single point where role
# degrades to 'leaf' — keeps the rule predictable. Callers pass
# the normalised role (_normalize_role ran in delegate_task) so
# we only deal with 'leaf' or 'orchestrator' here.
child_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
max_spawn = _get_max_spawn_depth()
orchestrator_ok = _get_orchestrator_enabled() and child_depth < max_spawn
effective_role = role if (role == "orchestrator" and orchestrator_ok) else "leaf"
# When no explicit toolsets given, inherit from parent's enabled toolsets
# so disabled tools (e.g. web) don't leak to subagents.
# Note: enabled_toolsets=None means "all tools enabled" (the default),
@ -319,8 +555,21 @@ def _build_child_agent(
else:
child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
# Orchestrators retain the 'delegation' toolset that _strip_blocked_tools
# removed. The re-add is unconditional on parent-toolset membership because
# orchestrator capability is granted by role, not inherited — see the
# test_intersection_preserves_delegation_bound test for the design rationale.
if effective_role == "orchestrator" and "delegation" not in child_toolsets:
child_toolsets.append("delegation")
workspace_hint = _resolve_workspace_hint(parent_agent)
child_prompt = _build_child_system_prompt(goal, context, workspace_path=workspace_hint)
child_prompt = _build_child_system_prompt(
goal, context,
workspace_path=workspace_hint,
role=effective_role,
max_spawn_depth=max_spawn,
child_depth=child_depth,
)
# Extract parent's API key so subagents inherit auth (e.g. Nous Portal).
parent_api_key = getattr(parent_agent, "api_key", None)
if (not parent_api_key) and hasattr(parent_agent, "_client_kwargs"):
@ -406,7 +655,10 @@ def _build_child_agent(
)
child._print_fn = getattr(parent_agent, '_print_fn', None)
# Set delegation depth so children can't spawn grandchildren
child._delegate_depth = getattr(parent_agent, '_delegate_depth', 0) + 1
child._delegate_depth = child_depth
# Stash the post-degrade role for introspection (leaf if the
# kill switch or depth bounded the caller's requested role).
child._delegate_role = effective_role
# Share a credential pool with the child when possible so subagents can
# rotate credentials on rate limits instead of getting pinned to one key.
@ -464,6 +716,8 @@ def _run_single_child(
# Without this, the parent's _last_activity_ts freezes when delegate_task
# starts and the gateway eventually kills the agent for "no activity".
_heartbeat_stop = threading.Event()
_last_seen_iter = [0] # mutable container for heartbeat stale detection
_stale_count = [0]
def _heartbeat_loop():
while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
@ -479,6 +733,25 @@ def _run_single_child(
child_tool = child_summary.get("current_tool")
child_iter = child_summary.get("api_call_count", 0)
child_max = child_summary.get("max_iterations", 0)
# Stale detection: if iteration count hasn't advanced,
# increment stale counter. After N cycles with no
# progress, stop masking the hang so the gateway
# inactivity timeout can fire as a last resort.
if child_iter <= _last_seen_iter[0]:
_stale_count[0] += 1
else:
_last_seen_iter[0] = child_iter
_stale_count[0] = 0
if _stale_count[0] >= _HEARTBEAT_STALE_CYCLES:
logger.warning(
"Subagent %d appears stale (no iteration progress "
"for %d heartbeat cycles) — stopping heartbeat",
task_index, _stale_count[0],
)
break # stop touching parent, let gateway timeout fire
if child_tool:
desc = (f"delegate_task: subagent running {child_tool} "
f"(iteration {child_iter}/{child_max})")
@ -504,7 +777,78 @@ def _run_single_child(
except Exception as e:
logger.debug("Progress callback start failed: %s", e)
result = child.run_conversation(user_message=goal)
# File-state coordination: generate a stable child task_id so the
# file_state registry can attribute writes back to this subagent,
# and snapshot the parent's read set at launch time. After the
# child returns we compare to detect "sibling modified files the
# parent previously read" and surface it as a reminder on the
# returned summary.
import uuid as _uuid
child_task_id = f"subagent-{task_index}-{_uuid.uuid4().hex[:8]}"
parent_task_id = getattr(parent_agent, "_current_task_id", None)
wall_start = time.time()
parent_reads_snapshot = (
list(file_state.known_reads(parent_task_id))
if parent_task_id else []
)
# Run child with a hard timeout to prevent indefinite blocking
# when the child's API call or tool-level HTTP request hangs.
child_timeout = _get_child_timeout()
_timeout_executor = ThreadPoolExecutor(max_workers=1)
_child_future = _timeout_executor.submit(
child.run_conversation, user_message=goal, task_id=child_task_id,
)
try:
result = _child_future.result(timeout=child_timeout)
except Exception as _timeout_exc:
# Signal the child to stop so its thread can exit cleanly.
try:
if hasattr(child, 'interrupt'):
child.interrupt()
elif hasattr(child, '_interrupt_requested'):
child._interrupt_requested = True
except Exception:
pass
is_timeout = isinstance(_timeout_exc, (FuturesTimeoutError, TimeoutError))
duration = round(time.monotonic() - child_start, 2)
logger.warning(
"Subagent %d %s after %.1fs",
task_index,
"timed out" if is_timeout else f"raised {type(_timeout_exc).__name__}",
duration,
)
if child_progress_cb:
try:
child_progress_cb(
"subagent.complete",
preview=f"Timed out after {duration}s" if is_timeout else str(_timeout_exc),
status="timeout" if is_timeout else "error",
duration_seconds=duration,
summary="",
)
except Exception:
pass
return {
"task_index": task_index,
"status": "timeout" if is_timeout else "error",
"summary": None,
"error": (
f"Subagent timed out after {child_timeout}s with no response. "
"The child may be stuck on a slow API call or unresponsive network request."
) if is_timeout else str(_timeout_exc),
"exit_reason": "timeout" if is_timeout else "error",
"api_calls": 0,
"duration_seconds": duration,
"_child_role": getattr(child, "_delegate_role", None),
}
finally:
# Shut down executor without waiting — if the child thread
# is stuck on blocking I/O, wait=True would hang forever.
_timeout_executor.shutdown(wait=False)
# Flush any remaining batched progress to gateway
if child_progress_cb and hasattr(child_progress_cb, '_flush'):
@ -602,6 +946,36 @@ def _run_single_child(
if status == "failed":
entry["error"] = result.get("error", "Subagent did not produce a response.")
# Cross-agent file-state reminder. If this subagent wrote any
# files the parent had already read, surface it so the parent
# knows to re-read before editing — the scenario that motivated
# the registry. We check writes by ANY non-parent task_id (not
# just this child's), which also covers transitive writes from
# nested orchestrator→worker chains.
try:
if parent_task_id and parent_reads_snapshot:
sibling_writes = file_state.writes_since(
parent_task_id, wall_start, parent_reads_snapshot
)
if sibling_writes:
mod_paths = sorted(
{p for paths in sibling_writes.values() for p in paths}
)
if mod_paths:
reminder = (
"\n\n[NOTE: subagent modified files the parent "
"previously read — re-read before editing: "
+ ", ".join(mod_paths[:8])
+ (f" (+{len(mod_paths) - 8} more)" if len(mod_paths) > 8 else "")
+ "]"
)
if entry.get("summary"):
entry["summary"] = entry["summary"] + reminder
else:
entry["stale_paths"] = mod_paths
except Exception:
logger.debug("file_state sibling-write check failed", exc_info=True)
if child_progress_cb:
try:
child_progress_cb(
@ -691,27 +1065,40 @@ def delegate_task(
max_iterations: Optional[int] = None,
acp_command: Optional[str] = None,
acp_args: Optional[List[str]] = None,
role: Optional[str] = None,
parent_agent=None,
) -> str:
"""
Spawn one or more child agents to handle delegated tasks.
Supports two modes:
- Single: provide goal (+ optional context, toolsets)
- Batch: provide tasks array [{goal, context, toolsets}, ...]
- Single: provide goal (+ optional context, toolsets, role)
- Batch: provide tasks array [{goal, context, toolsets, role}, ...]
The 'role' parameter controls whether a child can further delegate:
'leaf' (default) cannot; 'orchestrator' retains the delegation
toolset and can spawn its own workers, bounded by
delegation.max_spawn_depth. Per-task role beats the top-level one.
Returns JSON with results array, one entry per task.
"""
if parent_agent is None:
return tool_error("delegate_task requires a parent agent context.")
# Depth limit
# Normalise the top-level role once; per-task overrides re-normalise.
top_role = _normalize_role(role)
# Depth limit — configurable via delegation.max_spawn_depth,
# default 2 for parity with the original MAX_DEPTH constant.
depth = getattr(parent_agent, '_delegate_depth', 0)
if depth >= MAX_DEPTH:
max_spawn = _get_max_spawn_depth()
if depth >= max_spawn:
return json.dumps({
"error": (
f"Delegation depth limit reached ({MAX_DEPTH}). "
"Subagents cannot spawn further subagents."
f"Delegation depth limit reached (depth={depth}, "
f"max_spawn_depth={max_spawn}). Raise "
f"delegation.max_spawn_depth in config.yaml if deeper "
f"nesting is required (cap: {_MAX_SPAWN_DEPTH_CAP})."
)
})
@ -743,7 +1130,8 @@ def delegate_task(
)
task_list = tasks
elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}]
task_list = [{"goal": goal, "context": context,
"toolsets": toolsets, "role": top_role}]
else:
return tool_error("Provide either 'goal' (single task) or 'tasks' (batch).")
@ -775,6 +1163,9 @@ def delegate_task(
try:
for i, t in enumerate(task_list):
task_acp_args = t.get("acp_args") if "acp_args" in t else None
# Per-task role beats top-level; normalise again so unknown
# per-task values warn and degrade to leaf uniformly.
effective_role = _normalize_role(t.get("role") or top_role)
child = _build_child_agent(
task_index=i, goal=t["goal"], context=t.get("context"),
toolsets=t.get("toolsets") or toolsets, model=creds["model"],
@ -786,6 +1177,7 @@ def delegate_task(
override_acp_args=task_acp_args if task_acp_args is not None else (
acp_args if acp_args is not None else creds.get("args")
),
role=effective_role,
)
# Override with correct parent tool names (before child construction mutated global)
child._delegate_saved_tool_names = _parent_tool_names
@ -1034,6 +1426,9 @@ def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict:
elif base_url_hostname(configured_base_url) == "api.anthropic.com":
provider = "anthropic"
api_mode = "anthropic_messages"
elif "api.kimi.com/coding" in base_lower:
provider = "custom"
api_mode = "anthropic_messages"
return {
"model": configured_model,
@ -1119,7 +1514,7 @@ DELEGATE_TASK_SCHEMA = {
"never enter your context window.\n\n"
"TWO MODES (one of 'goal' or 'tasks' is required):\n"
"1. Single task: provide 'goal' (+ optional context, toolsets)\n"
"2. Batch (parallel): provide 'tasks' array with up to 3 items. "
"2. Batch (parallel): provide 'tasks' array with up to delegation.max_concurrent_children items (default 3). "
"All run concurrently and results are returned together.\n\n"
"WHEN TO USE delegate_task:\n"
"- Reasoning-heavy subtasks (debugging, code review, research synthesis)\n"
@ -1132,8 +1527,14 @@ DELEGATE_TASK_SCHEMA = {
"IMPORTANT:\n"
"- Subagents have NO memory of your conversation. Pass all relevant "
"info (file paths, error messages, constraints) via the 'context' field.\n"
"- Subagents CANNOT call: delegate_task, clarify, memory, send_message, "
"execute_code.\n"
"- Leaf subagents (role='leaf', the default) CANNOT call: "
"delegate_task, clarify, memory, send_message, execute_code.\n"
"- Orchestrator subagents (role='orchestrator') retain "
"delegate_task so they can spawn their own workers, but still "
"cannot use clarify, memory, send_message, or execute_code. "
"Orchestrators are bounded by delegation.max_spawn_depth "
"(default 2) and can be disabled globally via "
"delegation.orchestrator_enabled=false.\n"
"- Each subagent gets its own terminal session (separate working directory and state).\n"
"- Results are always returned as an array, one entry per task."
),
@ -1189,6 +1590,11 @@ DELEGATE_TASK_SCHEMA = {
"items": {"type": "string"},
"description": "Per-task ACP args override.",
},
"role": {
"type": "string",
"enum": ["leaf", "orchestrator"],
"description": "Per-task role override. See top-level 'role' for semantics.",
},
},
"required": ["goal"],
},
@ -1208,6 +1614,19 @@ DELEGATE_TASK_SCHEMA = {
"Only set lower for simple tasks."
),
},
"role": {
"type": "string",
"enum": ["leaf", "orchestrator"],
"description": (
"Role of the child agent. 'leaf' (default) = focused "
"worker, cannot delegate further. 'orchestrator' = can "
"use delegate_task to spawn its own workers. Requires "
"delegation.max_spawn_depth >= 2 in config; ignored "
"(treated as 'leaf') when the child would exceed "
"max_spawn_depth or when "
"delegation.orchestrator_enabled=false."
),
},
"acp_command": {
"type": "string",
"description": (
@ -1246,6 +1665,7 @@ registry.register(
max_iterations=args.get("max_iterations"),
acp_command=args.get("acp_command"),
acp_args=args.get("acp_args"),
role=args.get("role"),
parent_agent=kw.get("parent_agent")),
check_fn=check_delegate_requirements,
emoji="🔀",

332
tools/file_state.py Normal file
View file

@ -0,0 +1,332 @@
"""Cross-agent file state coordination.
Prevents mangled edits when concurrent subagents (same process, same
filesystem) touch the same file. Complements the single-agent path-overlap
check in ``run_agent._should_parallelize_tool_batch`` this module catches
the case where subagent B writes a file that subagent A already read, so
A's next write would overwrite B's changes with stale content.
Design
------
A process-wide singleton ``FileStateRegistry`` tracks, per resolved path:
* per-agent read stamps: {task_id: {path: (mtime, read_ts, partial)}}
* last writer globally: {path: (task_id, write_ts)}
* per-path ``threading.Lock`` for readmodifywrite critical sections
Three public hooks are used by the file tools:
* ``record_read(task_id, path, *, partial)`` called by read_file
* ``note_write(task_id, path)`` called after write_file / patch
* ``check_stale(task_id, path)`` called BEFORE write_file / patch
Plus ``lock_path(path)`` a context-manager returning a per-path lock to
wrap the whole readmodifywrite block. And ``writes_since(task_id,
since_ts, paths)`` for the subagent-completion reminder in delegate_tool.
All methods are no-ops when ``HERMES_DISABLE_FILE_STATE_GUARD=1`` is set.
This module is intentionally separate from ``_read_tracker`` in
``file_tools.py`` that tracker is per-task and handles consecutive-read
loop detection, which is a different concern.
"""
from __future__ import annotations
import os
import threading
import time
from collections import defaultdict
from contextlib import contextmanager
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple
# ── Public stamp type ────────────────────────────────────────────────
# (mtime, read_ts, partial). partial=True when read_file returned a
# windowed view (offset > 1 or limit < total_lines) — writes that happen
# after a partial read should still warn so the model re-reads in full.
ReadStamp = Tuple[float, float, bool]
# Number of resolved-path entries retained per agent. Bounded to keep
# long sessions from accumulating unbounded state. On overflow we drop
# the oldest entries by insertion order.
_MAX_PATHS_PER_AGENT = 4096
# Global last-writer map cap. Same policy.
_MAX_GLOBAL_WRITERS = 4096
class FileStateRegistry:
"""Process-wide coordinator for cross-agent file edits."""
def __init__(self) -> None:
self._reads: Dict[str, Dict[str, ReadStamp]] = defaultdict(dict)
self._last_writer: Dict[str, Tuple[str, float]] = {}
self._path_locks: Dict[str, threading.Lock] = {}
self._meta_lock = threading.Lock() # guards _path_locks
self._state_lock = threading.Lock() # guards _reads + _last_writer
# ── Path lock management ────────────────────────────────────────
def _lock_for(self, resolved: str) -> threading.Lock:
with self._meta_lock:
lock = self._path_locks.get(resolved)
if lock is None:
lock = threading.Lock()
self._path_locks[resolved] = lock
return lock
@contextmanager
def lock_path(self, resolved: str):
"""Acquire the per-path lock for a read→modify→write section.
Same process, same filesystem threads on the same path serialize.
Different paths proceed in parallel.
"""
lock = self._lock_for(resolved)
lock.acquire()
try:
yield
finally:
lock.release()
# ── Read/write accounting ───────────────────────────────────────
def record_read(
self,
task_id: str,
resolved: str,
*,
partial: bool = False,
mtime: Optional[float] = None,
) -> None:
if _disabled():
return
if mtime is None:
try:
mtime = os.path.getmtime(resolved)
except OSError:
return
now = time.time()
with self._state_lock:
agent_reads = self._reads[task_id]
agent_reads[resolved] = (float(mtime), now, bool(partial))
_cap_dict(agent_reads, _MAX_PATHS_PER_AGENT)
def note_write(
self,
task_id: str,
resolved: str,
*,
mtime: Optional[float] = None,
) -> None:
"""Record a successful write.
Updates the global last-writer map AND this agent's own read stamp
(a write is an implicit read the agent now knows the current
content).
"""
if _disabled():
return
if mtime is None:
try:
mtime = os.path.getmtime(resolved)
except OSError:
return
now = time.time()
with self._state_lock:
self._last_writer[resolved] = (task_id, now)
_cap_dict(self._last_writer, _MAX_GLOBAL_WRITERS)
# Writer's own view is now up-to-date.
self._reads[task_id][resolved] = (float(mtime), now, False)
_cap_dict(self._reads[task_id], _MAX_PATHS_PER_AGENT)
def check_stale(self, task_id: str, resolved: str) -> Optional[str]:
"""Return a model-facing warning if this write would be stale.
Three staleness classes, in order of severity:
1. Sibling subagent wrote this file after this agent's last read.
2. External/unknown change (mtime differs from our last read).
3. Agent never read the file (write-without-read).
Returns ``None`` when the write is safe. Does not raise callers
decide whether to block or warn.
"""
if _disabled():
return None
with self._state_lock:
stamp = self._reads.get(task_id, {}).get(resolved)
last_writer = self._last_writer.get(resolved)
# Case 3: never read AND we have no write record — net-new file or
# first touch by this agent. Let existing _check_sensitive_path
# and file-exists logic handle it; nothing to warn about here.
if stamp is None and last_writer is None:
return None
try:
current_mtime = os.path.getmtime(resolved)
except OSError:
# File doesn't exist — write will create it; not stale.
return None
# Case 1: sibling subagent modified after our last read.
if last_writer is not None:
writer_tid, writer_ts = last_writer
if writer_tid != task_id:
if stamp is None:
return (
f"{resolved} was modified by sibling subagent "
f"{writer_tid!r} but this agent never read it. "
"Read the file before writing to avoid overwriting "
"the sibling's changes."
)
read_ts = stamp[1]
if writer_ts > read_ts:
return (
f"{resolved} was modified by sibling subagent "
f"{writer_tid!r} at {_fmt_ts(writer_ts)} — after "
f"this agent's last read at {_fmt_ts(read_ts)}. "
"Re-read the file before writing."
)
# Case 2: external / unknown modification (mtime drifted).
if stamp is not None:
read_mtime, _read_ts, partial = stamp
if current_mtime != read_mtime:
return (
f"{resolved} was modified since you last read it "
"on disk (external edit or unrecorded writer). "
"Re-read the file before writing."
)
if partial:
return (
f"{resolved} was last read with offset/limit pagination "
"(partial view). Re-read the whole file before "
"overwriting it."
)
# Case 3b: agent truly never read the file.
if stamp is None:
return (
f"{resolved} was not read by this agent. "
"Read the file first so you can write an informed edit."
)
return None
# ── Reminder helper for delegate_tool ───────────────────────────
def writes_since(
self,
exclude_task_id: str,
since_ts: float,
paths: Iterable[str],
) -> Dict[str, List[str]]:
"""Return ``{writer_task_id: [paths]}`` for writes done after
``since_ts`` by agents OTHER than ``exclude_task_id``.
Used by delegate_task to append a "subagent modified files the
parent previously read" reminder to the delegation result.
"""
if _disabled():
return {}
paths_set = set(paths)
out: Dict[str, List[str]] = defaultdict(list)
with self._state_lock:
for p, (writer_tid, ts) in self._last_writer.items():
if writer_tid == exclude_task_id:
continue
if ts < since_ts:
continue
if p in paths_set:
out[writer_tid].append(p)
return dict(out)
def known_reads(self, task_id: str) -> List[str]:
"""Return the list of resolved paths this agent has read."""
if _disabled():
return []
with self._state_lock:
return list(self._reads.get(task_id, {}).keys())
# ── Testing hooks ───────────────────────────────────────────────
def clear(self) -> None:
"""Reset all state. Intended for tests only."""
with self._state_lock:
self._reads.clear()
self._last_writer.clear()
with self._meta_lock:
self._path_locks.clear()
# ── Module-level singleton + helpers ─────────────────────────────────
_registry = FileStateRegistry()
def get_registry() -> FileStateRegistry:
return _registry
def _disabled() -> bool:
# Re-read each call so tests can toggle via monkeypatch.setenv.
return os.environ.get("HERMES_DISABLE_FILE_STATE_GUARD", "").strip() == "1"
def _fmt_ts(ts: float) -> str:
# Short relative wall-clock for error messages; avoids pulling in
# datetime formatting overhead on the hot path.
return time.strftime("%H:%M:%S", time.localtime(ts))
def _cap_dict(d: dict, limit: int) -> None:
"""Trim a dict to ``limit`` entries by dropping insertion-order oldest."""
over = len(d) - limit
if over <= 0:
return
# dict preserves insertion order (PY>=3.7) — pop the oldest keys.
it = iter(d)
for _ in range(over):
try:
d.pop(next(it))
except (StopIteration, KeyError):
break
# ── Convenience wrappers (short names used at call sites) ────────────
def record_read(task_id: str, resolved_or_path: str | Path, *, partial: bool = False) -> None:
_registry.record_read(task_id, str(resolved_or_path), partial=partial)
def note_write(task_id: str, resolved_or_path: str | Path) -> None:
_registry.note_write(task_id, str(resolved_or_path))
def check_stale(task_id: str, resolved_or_path: str | Path) -> Optional[str]:
return _registry.check_stale(task_id, str(resolved_or_path))
def lock_path(resolved_or_path: str | Path):
return _registry.lock_path(str(resolved_or_path))
def writes_since(
exclude_task_id: str,
since_ts: float,
paths: Iterable[str | Path],
) -> Dict[str, List[str]]:
return _registry.writes_since(exclude_task_id, since_ts, [str(p) for p in paths])
def known_reads(task_id: str) -> List[str]:
return _registry.known_reads(task_id)
__all__ = [
"FileStateRegistry",
"get_registry",
"record_read",
"note_write",
"check_stale",
"lock_path",
"writes_since",
"known_reads",
]

View file

@ -12,6 +12,7 @@ from typing import Optional
from agent.file_safety import get_read_block_error
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from tools import file_state
from agent.redact import redact_sensitive_text
logger = logging.getLogger(__name__)
@ -483,6 +484,19 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
# accumulate megabytes of dict/set state. See _cap_read_tracker_data.
_cap_read_tracker_data(task_data)
# Cross-agent file-state registry (separate from per-task read
# tracker above): records that THIS agent has read this path so
# write/patch can detect sibling-subagent writes that happened
# after our read. Partial read when offset>1 or the read was
# truncated (large file with more content than limit covered).
# Outside the _read_tracker_lock so the registry's own locking
# isn't nested under ours.
try:
_partial = (offset > 1) or bool(result_dict.get("truncated"))
file_state.record_read(task_id, resolved_str, partial=_partial)
except Exception:
logger.debug("file_state.record_read failed", exc_info=True)
if count >= 4:
# Hard block: stop returning content to break the loop
return json.dumps({
@ -602,15 +616,43 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
if sensitive_err:
return tool_error(sensitive_err)
try:
stale_warning = _check_file_staleness(path, task_id)
file_ops = _get_file_ops(task_id)
result = file_ops.write_file(path, content)
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning
# Refresh the stored timestamp so consecutive writes by this
# task don't trigger false staleness warnings.
_update_read_timestamp(path, task_id)
# Resolve once for the registry lock + stale check. Failures here
# fall back to the legacy path — write proceeds, per-task staleness
# check below still runs.
try:
_resolved = str(_resolve_path(path))
except Exception:
_resolved = None
if _resolved is None:
stale_warning = _check_file_staleness(path, task_id)
file_ops = _get_file_ops(task_id)
result = file_ops.write_file(path, content)
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning
_update_read_timestamp(path, task_id)
return json.dumps(result_dict, ensure_ascii=False)
# Serialize the read→modify→write region per-path so concurrent
# subagents can't interleave on the same file. Different paths
# remain fully parallel.
with file_state.lock_path(_resolved):
# Cross-agent staleness wins over per-task warning when both
# fire — its message names the sibling subagent.
cross_warning = file_state.check_stale(task_id, _resolved)
stale_warning = _check_file_staleness(path, task_id)
file_ops = _get_file_ops(task_id)
result = file_ops.write_file(path, content)
result_dict = result.to_dict()
effective_warning = cross_warning or stale_warning
if effective_warning:
result_dict["_warning"] = effective_warning
# Refresh stamps after the successful write so consecutive
# writes by this task don't trigger false staleness warnings.
_update_read_timestamp(path, task_id)
if not result_dict.get("error"):
file_state.note_write(task_id, _resolved)
return json.dumps(result_dict, ensure_ascii=False)
except Exception as e:
if _is_expected_write_exception(e):
@ -637,36 +679,70 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
if sensitive_err:
return tool_error(sensitive_err)
try:
# Check staleness for all files this patch will touch.
stale_warnings = []
# Resolve paths for locking. Ordered + deduplicated so concurrent
# callers lock in the same order — prevents deadlock on overlapping
# multi-file V4A patches.
_resolved_paths: list[str] = []
_seen: set[str] = set()
for _p in _paths_to_check:
_sw = _check_file_staleness(_p, task_id)
if _sw:
stale_warnings.append(_sw)
try:
_r = str(_resolve_path(_p))
except Exception:
_r = None
if _r and _r not in _seen:
_resolved_paths.append(_r)
_seen.add(_r)
_resolved_paths.sort()
file_ops = _get_file_ops(task_id)
if mode == "replace":
if not path:
return tool_error("path required")
if old_string is None or new_string is None:
return tool_error("old_string and new_string required")
result = file_ops.patch_replace(path, old_string, new_string, replace_all)
elif mode == "patch":
if not patch:
return tool_error("patch content required")
result = file_ops.patch_v4a(patch)
else:
return tool_error(f"Unknown mode: {mode}")
result_dict = result.to_dict()
if stale_warnings:
result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings)
# Refresh stored timestamps for all successfully-patched paths so
# consecutive edits by this task don't trigger false warnings.
if not result_dict.get("error"):
# Acquire per-path locks in sorted order via ExitStack. On single
# path this degenerates to one lock; on empty list (unresolvable)
# it's a no-op and execution falls through unchanged.
from contextlib import ExitStack
with ExitStack() as _locks:
for _r in _resolved_paths:
_locks.enter_context(file_state.lock_path(_r))
# Collect warnings — cross-agent registry first (names sibling),
# then per-task tracker as a fallback.
stale_warnings: list[str] = []
_path_to_resolved: dict[str, str] = {}
for _p in _paths_to_check:
_update_read_timestamp(_p, task_id)
try:
_r = str(_resolve_path(_p))
except Exception:
_r = None
_path_to_resolved[_p] = _r
_cross = file_state.check_stale(task_id, _r) if _r else None
_sw = _cross or _check_file_staleness(_p, task_id)
if _sw:
stale_warnings.append(_sw)
file_ops = _get_file_ops(task_id)
if mode == "replace":
if not path:
return tool_error("path required")
if old_string is None or new_string is None:
return tool_error("old_string and new_string required")
result = file_ops.patch_replace(path, old_string, new_string, replace_all)
elif mode == "patch":
if not patch:
return tool_error("patch content required")
result = file_ops.patch_v4a(patch)
else:
return tool_error(f"Unknown mode: {mode}")
result_dict = result.to_dict()
if stale_warnings:
result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings)
# Refresh stored timestamps for all successfully-patched paths so
# consecutive edits by this task don't trigger false warnings.
if not result_dict.get("error"):
for _p in _paths_to_check:
_update_read_timestamp(_p, task_id)
_r = _path_to_resolved.get(_p)
if _r:
file_state.note_write(task_id, _r)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.

View file

@ -189,6 +189,38 @@ FAL_MODELS: Dict[str, Dict[str, Any]] = {
},
"upscale": False,
},
"fal-ai/gpt-image-2": {
"display": "GPT Image 2",
"speed": "~20s",
"strengths": "SOTA text rendering + CJK, world-aware photorealism",
"price": "$0.040.06/image",
# GPT Image 2 uses FAL's standard preset enum (unlike 1.5's literal
# dimensions). We map to the 4:3 variants — the 16:9 presets
# (1024x576) fall below GPT-Image-2's 655,360 min-pixel requirement
# and would be rejected. 4:3 keeps us above the minimum on all
# three aspect ratios.
"size_style": "image_size_preset",
"sizes": {
"landscape": "landscape_4_3", # 1024x768
"square": "square_hd", # 1024x1024
"portrait": "portrait_4_3", # 768x1024
},
"defaults": {
# Same quality pinning as gpt-image-1.5: medium keeps Nous
# Portal billing predictable. "high" is 3-4x the per-image
# cost at the same size; "low" is too rough for production use.
"quality": "medium",
"num_images": 1,
"output_format": "png",
},
"supports": {
"prompt", "image_size", "quality", "num_images", "output_format",
"sync_mode",
# openai_api_key (BYOK) intentionally omitted — all users go
# through the shared FAL billing path.
},
"upscale": False,
},
"fal-ai/ideogram/v3": {
"display": "Ideogram V3",
"speed": "~5s",
@ -749,14 +781,41 @@ def check_fal_api_key() -> bool:
def check_image_generation_requirements() -> bool:
"""True if FAL credentials and fal_client SDK are both available."""
"""True if any image gen backend is available.
Providers are considered in this order:
1. The in-tree FAL backend (FAL_KEY or managed gateway).
2. Any plugin-registered provider whose ``is_available()`` returns True.
Plugins win only when the in-tree FAL path is NOT ready, which matches
the historical behavior: shipping hermes with a FAL key configured
should still expose the tool. The active selection among ready
providers is resolved per-call by ``image_gen.provider``.
"""
try:
if not check_fal_api_key():
return False
fal_client # noqa: F401 — SDK presence check
return True
if check_fal_api_key():
fal_client # noqa: F401 — SDK presence check
return True
except ImportError:
return False
pass
# Probe plugin providers. Discovery is idempotent and cheap.
try:
from agent.image_gen_registry import list_providers
from hermes_cli.plugins import _ensure_plugins_discovered
_ensure_plugins_discovered()
for provider in list_providers():
try:
if provider.is_available():
return True
except Exception:
continue
except Exception:
pass
return False
# ---------------------------------------------------------------------------
@ -802,10 +861,11 @@ from tools.registry import registry, tool_error
IMAGE_GENERATE_SCHEMA = {
"name": "image_generate",
"description": (
"Generate high-quality images from text prompts using FAL.ai. "
"The underlying model is user-configured (default: FLUX 2 Klein 9B, "
"sub-1s generation) and is not selectable by the agent. Returns a "
"single image URL. Display it using markdown: ![description](URL)"
"Generate high-quality images from text prompts. The underlying "
"backend (FAL, OpenAI, etc.) and model are user-configured and not "
"selectable by the agent. Returns either a URL or an absolute file "
"path in the `image` field; display it with markdown "
"![description](url-or-path) and the gateway will deliver it."
),
"parameters": {
"type": "object",
@ -826,13 +886,104 @@ IMAGE_GENERATE_SCHEMA = {
}
def _read_configured_image_provider():
"""Return the value of ``image_gen.provider`` from config.yaml, or None.
We only consult the plugin registry when this is explicitly set an
unset value keeps users on the legacy in-tree FAL path even when other
providers happen to be registered (e.g. a user has OPENAI_API_KEY set
for other features but never asked for OpenAI image gen).
"""
try:
from hermes_cli.config import load_config
cfg = load_config()
section = cfg.get("image_gen") if isinstance(cfg, dict) else None
if isinstance(section, dict):
value = section.get("provider")
if isinstance(value, str) and value.strip():
return value.strip()
except Exception as exc:
logger.debug("Could not read image_gen.provider: %s", exc)
return None
def _dispatch_to_plugin_provider(prompt: str, aspect_ratio: str):
"""Route the call to a plugin-registered provider when one is selected.
Returns a JSON string on dispatch, or ``None`` to fall through to the
built-in FAL path.
Dispatch only fires when ``image_gen.provider`` is explicitly set AND
it does not point to ``fal`` (FAL still lives in-tree in this PR;
a later PR ports it into ``plugins/image_gen/fal/``). Any other value
that matches a registered plugin provider wins.
"""
configured = _read_configured_image_provider()
if not configured or configured == "fal":
return None
try:
# Import locally so plugin discovery isn't triggered just by
# importing this module (tests rely on that).
from agent.image_gen_registry import get_provider
from hermes_cli.plugins import _ensure_plugins_discovered
_ensure_plugins_discovered()
provider = get_provider(configured)
except Exception as exc:
logger.debug("image_gen plugin dispatch skipped: %s", exc)
return None
if provider is None:
return json.dumps({
"success": False,
"image": None,
"error": (
f"image_gen.provider='{configured}' is set but no plugin "
f"registered that name. Run `hermes plugins list` to see "
f"available image gen backends."
),
"error_type": "provider_not_registered",
})
try:
result = provider.generate(prompt=prompt, aspect_ratio=aspect_ratio)
except Exception as exc:
logger.warning(
"Image gen provider '%s' raised: %s",
getattr(provider, "name", "?"), exc,
)
return json.dumps({
"success": False,
"image": None,
"error": f"Provider '{getattr(provider, 'name', '?')}' error: {exc}",
"error_type": "provider_exception",
})
if not isinstance(result, dict):
return json.dumps({
"success": False,
"image": None,
"error": "Provider returned a non-dict result",
"error_type": "provider_contract",
})
return json.dumps(result)
def _handle_image_generate(args, **kw):
prompt = args.get("prompt", "")
if not prompt:
return tool_error("prompt is required for image generation")
aspect_ratio = args.get("aspect_ratio", DEFAULT_ASPECT_RATIO)
# Route to a plugin-registered provider if one is active (and it's
# not the in-tree FAL path).
dispatched = _dispatch_to_plugin_provider(prompt, aspect_ratio)
if dispatched is not None:
return dispatched
return image_generate_tool(
prompt=prompt,
aspect_ratio=args.get("aspect_ratio", DEFAULT_ASPECT_RATIO),
aspect_ratio=aspect_ratio,
)

View file

@ -121,7 +121,80 @@ def _get_default_output_dir() -> str:
return str(get_hermes_dir("cache/audio", "audio_cache"))
DEFAULT_OUTPUT_DIR = _get_default_output_dir()
MAX_TEXT_LENGTH = 4000
# ---------------------------------------------------------------------------
# Per-provider input-character limits (from official provider docs).
# A single global cap was wrong: OpenAI is 4096, xAI is 15k, MiniMax is 10k,
# ElevenLabs is model-dependent (5k / 10k / 30k / 40k), Gemini caps at ~8k
# input tokens. Users can override any of these via
# ``tts.<provider>.max_text_length`` in config.yaml.
# ---------------------------------------------------------------------------
PROVIDER_MAX_TEXT_LENGTH: Dict[str, int] = {
"edge": 5000, # edge-tts practical sync limit
"openai": 4096, # https://platform.openai.com/docs/guides/text-to-speech
"xai": 15000, # https://docs.x.ai/developers/model-capabilities/audio/text-to-speech
"minimax": 10000, # https://platform.minimax.io/docs/api-reference/speech-t2a-http (sync)
"mistral": 4000, # conservative; no published per-request cap
"gemini": 5000, # Gemini TTS caps at ~8k input tokens / ~655s audio
"elevenlabs": 10000, # fallback when model-aware lookup can't resolve (multilingual_v2)
"neutts": 2000, # local model, quality falls off on long text
"kittentts": 2000, # local 25MB model
}
# ElevenLabs caps vary by model_id. https://elevenlabs.io/docs/overview/models
ELEVENLABS_MODEL_MAX_TEXT_LENGTH: Dict[str, int] = {
"eleven_v3": 5000,
"eleven_ttv_v3": 5000,
"eleven_multilingual_v2": 10000,
"eleven_multilingual_v1": 10000,
"eleven_english_sts_v2": 10000,
"eleven_english_sts_v1": 10000,
"eleven_flash_v2": 30000,
"eleven_flash_v2_5": 40000,
}
# Final fallback when provider isn't recognised at all.
FALLBACK_MAX_TEXT_LENGTH = 4000
# Back-compat alias. Prefer ``_resolve_max_text_length()`` for new code.
MAX_TEXT_LENGTH = FALLBACK_MAX_TEXT_LENGTH
def _resolve_max_text_length(
provider: Optional[str],
tts_config: Optional[Dict[str, Any]] = None,
) -> int:
"""Return the input-character cap for *provider*.
Resolution order:
1. ``tts.<provider>.max_text_length`` (user override in config.yaml)
2. ElevenLabs model-aware table (keyed on configured ``model_id``)
3. ``PROVIDER_MAX_TEXT_LENGTH`` default
4. ``FALLBACK_MAX_TEXT_LENGTH`` (4000)
Non-positive or non-integer overrides fall through to the default so a
broken config can't accidentally disable truncation entirely.
"""
if not provider:
return FALLBACK_MAX_TEXT_LENGTH
key = provider.lower().strip()
cfg = tts_config or {}
prov_cfg = cfg.get(key) if isinstance(cfg.get(key), dict) else {}
override = prov_cfg.get("max_text_length") if prov_cfg else None
if isinstance(override, bool):
# bool is an int subclass; treat explicit booleans as "not set"
override = None
if isinstance(override, int) and override > 0:
return override
if key == "elevenlabs":
model_id = (prov_cfg or {}).get("model_id") or DEFAULT_ELEVENLABS_MODEL_ID
mapped = ELEVENLABS_MODEL_MAX_TEXT_LENGTH.get(str(model_id).strip())
if mapped:
return mapped
return PROVIDER_MAX_TEXT_LENGTH.get(key, FALLBACK_MAX_TEXT_LENGTH)
# ===========================================================================
@ -865,14 +938,19 @@ def text_to_speech_tool(
if not text or not text.strip():
return tool_error("Text is required", success=False)
# Truncate very long text with a warning
if len(text) > MAX_TEXT_LENGTH:
logger.warning("TTS text too long (%d chars), truncating to %d", len(text), MAX_TEXT_LENGTH)
text = text[:MAX_TEXT_LENGTH]
tts_config = _load_tts_config()
provider = _get_provider(tts_config)
# Truncate very long text with a warning. The cap is per-provider
# (OpenAI 4096, xAI 15k, MiniMax 10k, ElevenLabs model-aware, etc.).
max_len = _resolve_max_text_length(provider, tts_config)
if len(text) > max_len:
logger.warning(
"TTS text too long for provider %s (%d chars), truncating to %d",
provider, len(text), max_len,
)
text = text[:max_len]
# Detect platform from gateway env var to choose the best output format.
# Telegram voice bubbles require Opus (.ogg); OpenAI and ElevenLabs can
# produce Opus natively (no ffmpeg needed). Edge TTS always outputs MP3
@ -1191,6 +1269,14 @@ def stream_tts_to_speaker(
voice_id = el_config.get("voice_id", voice_id)
model_id = el_config.get("streaming_model_id",
el_config.get("model_id", model_id))
# Per-sentence cap for the streaming path. Look up the cap against
# the *streaming* model_id (defaults to eleven_flash_v2_5 = 40k chars),
# not the sync model_id. A user override
# (tts.elevenlabs.max_text_length) still wins.
stream_max_len = _resolve_max_text_length(
"elevenlabs",
{**tts_config, "elevenlabs": {**el_config, "model_id": model_id}},
)
api_key = os.getenv("ELEVENLABS_API_KEY", "")
if not api_key:
@ -1246,9 +1332,9 @@ def stream_tts_to_speaker(
# Skip audio generation if no TTS client available
if client is None:
return
# Truncate very long sentences
if len(cleaned) > MAX_TEXT_LENGTH:
cleaned = cleaned[:MAX_TEXT_LENGTH]
# Truncate very long sentences (ElevenLabs streaming path)
if len(cleaned) > stream_max_len:
cleaned = cleaned[:stream_max_len]
try:
audio_iter = client.text_to_speech.convert(
text=cleaned,
@ -1406,7 +1492,7 @@ TTS_SCHEMA = {
"properties": {
"text": {
"type": "string",
"description": "The text to convert to speech. Keep under 4000 characters."
"description": "The text to convert to speech. Provider-specific character caps apply and are enforced automatically (OpenAI 4096, xAI 15000, MiniMax 10000, ElevenLabs 5k-40k depending on model); over-long input is truncated."
},
"output_path": {
"type": "string",