fix(vision): cap vision_analyze fan-out concurrency process-wide

A single agent turn can fan out N vision_analyze calls at once — the
classic trigger is "analyze every frame of this video", where ffmpeg
explodes a clip into dozens of frames and the model calls vision_analyze
on each. Every call does a CPU-heavy base64-encode/resize burst AND holds
a long-lived LLM stream open. The tool executor runs concurrent tool calls
on a per-session ThreadPoolExecutor (_MAX_TOOL_WORKERS=8), and multiple
agent sessions share one process (the dashboard runs the agent in-process),
so there was no global ceiling. In prod (June 2026) a video-frame fan-out
pinned a worker thread at ~100% CPU and starved the shared asyncio event
loop that also serves the dashboard's /api/status liveness probe, flapping
the instance to UNHEALTHY even though nothing had crashed.

Add a process-global threading.BoundedSemaphore that bounds how many vision
analyses run concurrently across the whole process, held across the entire
analysis (image load + encode + LLM call) in the single _handle_vision_analyze
chokepoint (covers both the native fast path and the legacy aux-LLM path).

It is a threading semaphore, NOT asyncio: each vision call is dispatched
through model_tools._run_async on a per-thread event loop, so an asyncio
primitive bound to one loop cannot coordinate across them. The acquire is
offloaded via run_in_executor so waiting for a slot never blocks the calling
loop.

Default: min(host CPUs, 4), floored at 1 — respect the host's concurrency,
or lower. Override via auxiliary.vision.max_concurrency (config.yaml) or
HERMES_VISION_MAX_CONCURRENCY (env). Values < 1 are ignored so the cap can
never be disabled into an unbounded fan-out.

Tests: bounded-fan-out regression guard + a control proving it would fail
without the cap; resolver tests for host-cpu default, ceiling clamp, low-cpu
host, env override, and sub-1 rejection. Pre-existing handler tests updated
for the now-async _handle_vision_analyze. Verified via the real
registry.dispatch -> _run_async per-thread-loop path (16 concurrent calls,
peak bounded to cap).
This commit is contained in:
Ben Barclay 2026-06-29 15:18:01 +10:00 committed by Teknium
parent 115e78c377
commit eddfecd2ce
5 changed files with 346 additions and 37 deletions

View file

@ -7,7 +7,9 @@ Also tests the vision_tools and browser_tool model override env vars.
import os
import sys
from pathlib import Path
from unittest.mock import patch, MagicMock
from unittest.mock import patch, MagicMock, AsyncMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))
@ -238,22 +240,30 @@ class TestGatewayBridgeCodeParity:
class TestVisionModelOverride:
"""Test that AUXILIARY_VISION_MODEL env var overrides the default model in the handler."""
def test_env_var_overrides_default(self, monkeypatch):
@pytest.mark.asyncio
async def test_env_var_overrides_default(self, monkeypatch):
monkeypatch.setenv("AUXILIARY_VISION_MODEL", "openai/gpt-4o")
from tools.vision_tools import _handle_vision_analyze
with patch("tools.vision_tools.vision_analyze_tool", new_callable=MagicMock) as mock_tool:
with (
patch("tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock) as mock_tool,
patch("tools.vision_tools._should_use_native_vision_fast_path", return_value=False),
):
mock_tool.return_value = '{"success": true}'
_handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"})
await _handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"})
call_args = mock_tool.call_args
# 3rd positional arg = model
assert call_args[0][2] == "openai/gpt-4o"
def test_default_model_when_no_override(self, monkeypatch):
@pytest.mark.asyncio
async def test_default_model_when_no_override(self, monkeypatch):
monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False)
from tools.vision_tools import _handle_vision_analyze
with patch("tools.vision_tools.vision_analyze_tool", new_callable=MagicMock) as mock_tool:
with (
patch("tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock) as mock_tool,
patch("tools.vision_tools._should_use_native_vision_fast_path", return_value=False),
):
mock_tool.return_value = '{"success": true}'
_handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"})
await _handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"})
call_args = mock_tool.call_args
# With no AUXILIARY_VISION_MODEL env var, model should be None
# (the centralized call_llm router picks the provider default)

View file

@ -191,57 +191,70 @@ class TestHandleVisionAnalyze:
# Clean up the coroutine to avoid RuntimeWarning
result.close()
def test_prompt_contains_question(self):
@pytest.mark.asyncio
async def test_prompt_contains_question(self):
"""The full prompt should incorporate the user's question."""
with patch(
"tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock
) as mock_tool:
with (
patch(
"tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock
) as mock_tool,
patch(
"tools.vision_tools._should_use_native_vision_fast_path",
return_value=False,
),
):
mock_tool.return_value = json.dumps({"result": "ok"})
coro = _handle_vision_analyze(
await _handle_vision_analyze(
{
"image_url": "https://example.com/img.png",
"question": "Describe the cat",
}
)
# Clean up coroutine
coro.close()
call_args = mock_tool.call_args
full_prompt = call_args[0][1] # second positional arg
assert "Describe the cat" in full_prompt
assert "Fully describe and explain" in full_prompt
def test_uses_auxiliary_vision_model_env(self):
@pytest.mark.asyncio
async def test_uses_auxiliary_vision_model_env(self):
"""AUXILIARY_VISION_MODEL env var should override DEFAULT_VISION_MODEL."""
with (
patch(
"tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock
) as mock_tool,
patch(
"tools.vision_tools._should_use_native_vision_fast_path",
return_value=False,
),
patch.dict(os.environ, {"AUXILIARY_VISION_MODEL": "custom/model-v1"}),
):
mock_tool.return_value = json.dumps({"result": "ok"})
coro = _handle_vision_analyze(
await _handle_vision_analyze(
{"image_url": "https://example.com/img.png", "question": "test"}
)
coro.close()
call_args = mock_tool.call_args
model = call_args[0][2] # third positional arg
assert model == "custom/model-v1"
def test_falls_back_to_default_model(self):
@pytest.mark.asyncio
async def test_falls_back_to_default_model(self):
"""Without AUXILIARY_VISION_MODEL, model should be None (let call_llm resolve default)."""
with (
patch(
"tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock
) as mock_tool,
patch(
"tools.vision_tools._should_use_native_vision_fast_path",
return_value=False,
),
patch.dict(os.environ, {}, clear=False),
):
# Ensure AUXILIARY_VISION_MODEL is not set
os.environ.pop("AUXILIARY_VISION_MODEL", None)
mock_tool.return_value = json.dumps({"result": "ok"})
coro = _handle_vision_analyze(
await _handle_vision_analyze(
{"image_url": "https://example.com/img.png", "question": "test"}
)
coro.close()
call_args = mock_tool.call_args
model = call_args[0][2]
# With no AUXILIARY_VISION_MODEL set, model should be None
@ -1069,3 +1082,169 @@ class TestDownloadRetryClassification:
# All three attempts used, two backoff sleeps between them.
assert mock_client.get.await_count == 3
assert mock_sleep.await_count == 2
# ---------------------------------------------------------------------------
# Fan-out concurrency cap — a single turn (or several concurrent sessions in
# one process) can launch dozens of vision_analyze calls at once. The
# process-global semaphore must bound how many run simultaneously so a video-
# frame storm can't pin a worker thread and starve the dashboard event loop.
# ---------------------------------------------------------------------------
class TestVisionFanoutConcurrencyCap:
"""The process-global semaphore bounds concurrent vision analyses."""
def test_resolver_defaults_to_min_cpus_and_ceiling(self):
from tools import vision_tools as vt
with (
patch.dict(os.environ, {}, clear=False),
patch("tools.vision_tools._detect_host_cpus", return_value=64),
):
os.environ.pop("HERMES_VISION_MAX_CONCURRENCY", None)
# No config override available in the test env → falls to default,
# which is clamped to the ceiling even on a 64-core host.
with patch("hermes_cli.config.load_config", side_effect=Exception):
assert vt._resolve_vision_max_concurrency() == vt._VISION_DEFAULT_CONCURRENCY_CEILING
def test_resolver_respects_low_host_cpu_count(self):
from tools import vision_tools as vt
with (
patch.dict(os.environ, {}, clear=False),
patch("tools.vision_tools._detect_host_cpus", return_value=2),
patch("hermes_cli.config.load_config", side_effect=Exception),
):
os.environ.pop("HERMES_VISION_MAX_CONCURRENCY", None)
# 2-core host → cap is 2 (host limit, below the ceiling of 4).
assert vt._resolve_vision_max_concurrency() == 2
def test_resolver_env_override(self):
from tools import vision_tools as vt
with patch.dict(os.environ, {"HERMES_VISION_MAX_CONCURRENCY": "1"}):
assert vt._resolve_vision_max_concurrency() == 1
def test_resolver_rejects_sub_one_override(self):
from tools import vision_tools as vt
with (
patch.dict(os.environ, {"HERMES_VISION_MAX_CONCURRENCY": "0"}),
patch("tools.vision_tools._detect_host_cpus", return_value=2),
patch("hermes_cli.config.load_config", side_effect=Exception),
):
# 0 is ignored (cap can never be disabled) → falls back to default.
assert vt._resolve_vision_max_concurrency() == 2
@pytest.mark.asyncio
async def test_fanout_is_bounded_by_semaphore(self):
"""Firing many concurrent vision calls must never exceed the cap in flight.
This is the regression guard for the prod incident: an unbounded
fan-out pinned the event loop. With the cap, peak concurrency is
clamped to the semaphore value regardless of how many calls launch.
"""
import asyncio
import importlib
import threading
# Resolve the module fresh and drive BOTH the handler and the patch
# targets through that SAME module object. Sibling suites
# (test_vision_routing_31179) delete tools.vision_tools from
# sys.modules, so the top-level ``_handle_vision_analyze`` import can
# be bound to a stale module while ``patch`` hits the current one —
# patching the wrong object lets the real function run (peak stays 0).
vt = importlib.import_module("tools.vision_tools")
CAP = 3
in_flight = 0
peak = 0
lock = asyncio.Lock()
async def fake_native(image_url, question):
nonlocal in_flight, peak
async with lock:
in_flight += 1
peak = max(peak, in_flight)
try:
# Hold the slot long enough that, without a cap, all callers
# would overlap and drive peak up to N.
await asyncio.sleep(0.05)
finally:
async with lock:
in_flight -= 1
return json.dumps({"ok": True})
N = 12
# Install a fresh semaphore at the test cap so the assertion is
# deterministic regardless of the host's core count.
with (
patch.object(vt, "_vision_concurrency_semaphore",
threading.BoundedSemaphore(CAP)),
patch.object(vt, "_should_use_native_vision_fast_path",
return_value=True),
patch.object(vt, "_vision_analyze_native", side_effect=fake_native),
):
await asyncio.gather(*[
vt._handle_vision_analyze(
{"image_url": f"https://example.com/frame_{i}.png",
"question": "what is this"}
)
for i in range(N)
])
assert peak <= CAP, f"peak concurrency {peak} exceeded cap {CAP}"
# Sanity: with N > CAP and a real wait, we should have actually
# saturated the cap (otherwise the test proves nothing).
assert peak == CAP, f"expected to saturate cap {CAP}, only reached {peak}"
@pytest.mark.asyncio
async def test_unbounded_fanout_would_exceed_cap_without_semaphore(self):
"""Control: with a no-op (effectively unbounded) semaphore, peak blows past CAP.
Proves the guard above would fail if the semaphore weren't enforcing
the limit i.e. the test is actually exercising the cap.
"""
import asyncio
import importlib
import threading
vt = importlib.import_module("tools.vision_tools")
CAP = 3
in_flight = 0
peak = 0
lock = asyncio.Lock()
async def fake_native(image_url, question):
nonlocal in_flight, peak
async with lock:
in_flight += 1
peak = max(peak, in_flight)
try:
await asyncio.sleep(0.05)
finally:
async with lock:
in_flight -= 1
return json.dumps({"ok": True})
N = 12
# A semaphore sized to N imposes no real limit for this workload.
with (
patch.object(vt, "_vision_concurrency_semaphore",
threading.BoundedSemaphore(N)),
patch.object(vt, "_should_use_native_vision_fast_path",
return_value=True),
patch.object(vt, "_vision_analyze_native", side_effect=fake_native),
):
await asyncio.gather(*[
vt._handle_vision_analyze(
{"image_url": f"https://example.com/frame_{i}.png",
"question": "what is this"}
)
for i in range(N)
])
assert peak > CAP, (
"control failed: peak did not exceed CAP even without a real cap "
f"(peak={peak})"
)

View file

@ -29,6 +29,8 @@ Usage:
"""
import base64
import contextlib
import asyncio
import json
import logging
import os
@ -74,6 +76,91 @@ _VISION_DOWNLOAD_TIMEOUT = _resolve_download_timeout()
_VISION_MAX_DOWNLOAD_BYTES = 50 * 1024 * 1024
# ---------------------------------------------------------------------------
# Fan-out concurrency cap
# ---------------------------------------------------------------------------
# A single agent turn can fan out N vision_analyze calls at once (the classic
# trigger is "analyze every frame of this video" — ffmpeg explodes a clip into
# dozens of frames, the model then calls vision_analyze on each). Every call
# does a CPU-heavy base64-encode/resize burst AND holds a long-lived LLM stream
# open. The tool executor runs concurrent tool calls on a ThreadPoolExecutor
# (agent.tool_executor._MAX_TOOL_WORKERS = 8) PER SESSION, and several agent
# sessions share one process (the dashboard runs the agent in-process). With no
# global ceiling, a video-frame fan-out across one or more sessions pins a
# worker thread at ~100% CPU and starves the shared asyncio event loop that also
# serves the dashboard's /api/status liveness probe — so the instance flaps to
# UNHEALTHY even though nothing has actually crashed (observed in prod, June
# 2026).
#
# This semaphore bounds the number of vision analyses running concurrently
# across the WHOLE process, regardless of how many sessions or worker threads
# issue them. It is a threading.Semaphore (NOT asyncio.Semaphore): each vision
# call is dispatched through model_tools._run_async on a PER-THREAD event loop,
# so an asyncio primitive bound to one loop cannot coordinate across them. A
# threading semaphore is loop- and thread-agnostic, which is exactly what we
# need here.
#
# Default: min(host CPU count, 4), floored at 1 — "respect the host's
# concurrency, or lower". 4 is a conservative ceiling: vision work is a mix of
# CPU (encode/resize) and network (LLM stream), and we would rather under-
# subscribe than let a frame storm wedge the loop. Override with
# HERMES_VISION_MAX_CONCURRENCY (env) or auxiliary.vision.max_concurrency
# (config.yaml). 0 / negative / unparseable falls back to the default.
import threading
def _detect_host_cpus() -> int:
"""Best-effort host CPU count, honoring cgroup/affinity limits when set.
Prefers ``os.sched_getaffinity`` (the CPUs this process may actually run
on respects container/cpuset pinning) and falls back to
``os.cpu_count()``. Returns at least 1.
"""
try:
return max(1, len(os.sched_getaffinity(0))) # type: ignore[attr-defined]
except (AttributeError, OSError):
return max(1, os.cpu_count() or 1)
# Absolute ceiling for the default (not for explicit overrides): even on a
# many-core host, more than this many simultaneous in-process vision analyses
# is rarely worth the event-loop pressure.
_VISION_DEFAULT_CONCURRENCY_CEILING = 4
def _resolve_vision_max_concurrency() -> int:
"""Resolve the max concurrent vision analyses for this process.
Resolution order: HERMES_VISION_MAX_CONCURRENCY env config.yaml
auxiliary.vision.max_concurrency default ``min(host_cpus, 4)``. Any
value that parses to < 1 is ignored in favor of the next source so the
cap can never be disabled into an unbounded fan-out.
"""
env_val = os.getenv("HERMES_VISION_MAX_CONCURRENCY", "").strip()
if env_val:
try:
parsed = int(env_val)
if parsed >= 1:
return parsed
except ValueError:
pass
try:
from hermes_cli.config import cfg_get, load_config
cfg = load_config()
val = cfg_get(cfg, "auxiliary", "vision", "max_concurrency")
if val is not None:
parsed = int(val)
if parsed >= 1:
return parsed
except Exception:
pass
return max(1, min(_detect_host_cpus(), _VISION_DEFAULT_CONCURRENCY_CEILING))
_VISION_MAX_CONCURRENCY = _resolve_vision_max_concurrency()
_vision_concurrency_semaphore = threading.BoundedSemaphore(_VISION_MAX_CONCURRENCY)
def _image_url_shape_ok(url: str) -> bool:
"""HTTP(S) shape check only (scheme, netloc). No DNS."""
if not url or not isinstance(url, str):
@ -685,6 +772,26 @@ def _build_native_vision_tool_result(
}
@contextlib.asynccontextmanager
async def _vision_concurrency_slot():
"""Hold one process-global vision-concurrency slot for the duration.
Acquires :data:`_vision_concurrency_semaphore` before yielding and always
releases it on exit. The blocking acquire is offloaded to a worker thread
via ``run_in_executor`` so that waiting for a slot never blocks the calling
event loop (callers run on per-thread loops; blocking the acquire on the
loop thread would freeze that loop's other tasks while we wait). The
semaphore is a ``BoundedSemaphore`` so a double-release would raise rather
than silently inflate the limit.
"""
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _vision_concurrency_semaphore.acquire)
try:
yield
finally:
_vision_concurrency_semaphore.release()
async def _vision_analyze_native(
image_url: str,
question: str,
@ -1194,27 +1301,36 @@ VISION_ANALYZE_SCHEMA = {
}
def _handle_vision_analyze(args: Dict[str, Any], **kw: Any) -> Awaitable[str]:
async def _handle_vision_analyze(args: Dict[str, Any], **kw: Any) -> str:
image_url = args.get("image_url", "")
question = args.get("question", "")
# Fast path: when native image routing is in effect for the active main
# model (provider accepts images in tool results, or the user set the
# model.supports_vision override), short-circuit the auxiliary LLM and
# return the image bytes as a multimodal tool-result envelope. The main
# model sees the pixels directly on its next turn — no aux call, no
# information loss, no extra latency.
if _should_use_native_vision_fast_path():
logger.info("vision_analyze: native fast path")
return _vision_analyze_native(image_url, question)
# Bound process-wide vision fan-out: a single turn (or several concurrent
# sessions sharing this process) can launch dozens of vision_analyze calls
# at once — e.g. "analyze every frame of this video". Each one is a
# CPU-heavy encode/resize plus a long LLM stream; unbounded, they pin a
# worker thread and starve the shared event loop that serves /api/status,
# flapping the instance to UNHEALTHY. The slot is held across the WHOLE
# analysis (image load + encode + LLM call), and acquiring it waits off the
# event loop, so excess calls queue instead of piling on simultaneously.
async with _vision_concurrency_slot():
# Fast path: when native image routing is in effect for the active main
# model (provider accepts images in tool results, or the user set the
# model.supports_vision override), short-circuit the auxiliary LLM and
# return the image bytes as a multimodal tool-result envelope. The main
# model sees the pixels directly on its next turn — no aux call, no
# information loss, no extra latency.
if _should_use_native_vision_fast_path():
logger.info("vision_analyze: native fast path")
return await _vision_analyze_native(image_url, question)
# Legacy path: aux LLM describes the image and we return its text.
full_prompt = (
"Fully describe and explain everything about this image, then answer the "
f"following question:\n\n{question}"
)
model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
return vision_analyze_tool(image_url, full_prompt, model)
# Legacy path: aux LLM describes the image and we return its text.
full_prompt = (
"Fully describe and explain everything about this image, then answer the "
f"following question:\n\n{question}"
)
model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None
return await vision_analyze_tool(image_url, full_prompt, model)
registry.register(

View file

@ -684,6 +684,7 @@ Advanced per-platform knobs for throttling the outbound message batcher. Most us
| `HERMES_FEISHU_DEDUP_CACHE_SIZE` | Size of the Feishu webhook dedup cache (default: `1024`). |
| `HERMES_WECOM_TEXT_BATCH_DELAY_SECONDS` / `_SPLIT_DELAY_SECONDS` | WeCom batcher tuning. |
| `HERMES_VISION_DOWNLOAD_TIMEOUT` | Timeout in seconds for downloading an image before handing it to vision models (default: `30`). |
| `HERMES_VISION_MAX_CONCURRENCY` | Max vision analyses running concurrently across the whole process (override for `auxiliary.vision.max_concurrency`; default `min(host CPUs, 4)`). Bounds video-frame fan-out so it can't saturate the event loop. Values `< 1` are ignored. |
| `HERMES_RESTART_DRAIN_TIMEOUT` | Gateway: seconds to wait for active runs to drain on `/restart` before forcing the restart (default: `900`). |
| `HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT` | Per-platform connect timeout during gateway startup (seconds). |
| `HERMES_GATEWAY_BUSY_INPUT_MODE` | Default gateway busy-input behavior: `queue`, `steer`, or `interrupt`. Can be overridden per chat with `/busy`. |

View file

@ -1005,6 +1005,9 @@ auxiliary:
api_key: "" # API key for base_url (falls back to OPENAI_API_KEY)
timeout: 120 # seconds — LLM API call timeout; vision payloads need generous timeout
download_timeout: 30 # seconds — image HTTP download; increase for slow connections
max_concurrency: 4 # max vision analyses running at once across the whole process
# (default: min(host CPUs, 4)) — bounds video-frame fan-out so it
# can't saturate the event loop. Minimum 1; values < 1 are ignored.
# Web page summarization + browser page text extraction
web_extract: