From eddfecd2cedebe7eda3efd2c356376722a0073f6 Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Mon, 29 Jun 2026 15:18:01 +1000 Subject: [PATCH] fix(vision): cap vision_analyze fan-out concurrency process-wide MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A single agent turn can fan out N vision_analyze calls at once — the classic trigger is "analyze every frame of this video", where ffmpeg explodes a clip into dozens of frames and the model calls vision_analyze on each. Every call does a CPU-heavy base64-encode/resize burst AND holds a long-lived LLM stream open. The tool executor runs concurrent tool calls on a per-session ThreadPoolExecutor (_MAX_TOOL_WORKERS=8), and multiple agent sessions share one process (the dashboard runs the agent in-process), so there was no global ceiling. In prod (June 2026) a video-frame fan-out pinned a worker thread at ~100% CPU and starved the shared asyncio event loop that also serves the dashboard's /api/status liveness probe, flapping the instance to UNHEALTHY even though nothing had crashed. Add a process-global threading.BoundedSemaphore that bounds how many vision analyses run concurrently across the whole process, held across the entire analysis (image load + encode + LLM call) in the single _handle_vision_analyze chokepoint (covers both the native fast path and the legacy aux-LLM path). It is a threading semaphore, NOT asyncio: each vision call is dispatched through model_tools._run_async on a per-thread event loop, so an asyncio primitive bound to one loop cannot coordinate across them. The acquire is offloaded via run_in_executor so waiting for a slot never blocks the calling loop. Default: min(host CPUs, 4), floored at 1 — respect the host's concurrency, or lower. Override via auxiliary.vision.max_concurrency (config.yaml) or HERMES_VISION_MAX_CONCURRENCY (env). Values < 1 are ignored so the cap can never be disabled into an unbounded fan-out. Tests: bounded-fan-out regression guard + a control proving it would fail without the cap; resolver tests for host-cpu default, ceiling clamp, low-cpu host, env override, and sub-1 rejection. Pre-existing handler tests updated for the now-async _handle_vision_analyze. Verified via the real registry.dispatch -> _run_async per-thread-loop path (16 concurrent calls, peak bounded to cap). --- tests/agent/test_auxiliary_config_bridge.py | 24 +- tests/tools/test_vision_tools.py | 205 ++++++++++++++++-- tools/vision_tools.py | 150 +++++++++++-- .../docs/reference/environment-variables.md | 1 + website/docs/user-guide/configuration.md | 3 + 5 files changed, 346 insertions(+), 37 deletions(-) diff --git a/tests/agent/test_auxiliary_config_bridge.py b/tests/agent/test_auxiliary_config_bridge.py index b2727d33608..450f7e7fe4c 100644 --- a/tests/agent/test_auxiliary_config_bridge.py +++ b/tests/agent/test_auxiliary_config_bridge.py @@ -7,7 +7,9 @@ Also tests the vision_tools and browser_tool model override env vars. import os import sys from pathlib import Path -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, AsyncMock + +import pytest sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..")) @@ -238,22 +240,30 @@ class TestGatewayBridgeCodeParity: class TestVisionModelOverride: """Test that AUXILIARY_VISION_MODEL env var overrides the default model in the handler.""" - def test_env_var_overrides_default(self, monkeypatch): + @pytest.mark.asyncio + async def test_env_var_overrides_default(self, monkeypatch): monkeypatch.setenv("AUXILIARY_VISION_MODEL", "openai/gpt-4o") from tools.vision_tools import _handle_vision_analyze - with patch("tools.vision_tools.vision_analyze_tool", new_callable=MagicMock) as mock_tool: + with ( + patch("tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock) as mock_tool, + patch("tools.vision_tools._should_use_native_vision_fast_path", return_value=False), + ): mock_tool.return_value = '{"success": true}' - _handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"}) + await _handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"}) call_args = mock_tool.call_args # 3rd positional arg = model assert call_args[0][2] == "openai/gpt-4o" - def test_default_model_when_no_override(self, monkeypatch): + @pytest.mark.asyncio + async def test_default_model_when_no_override(self, monkeypatch): monkeypatch.delenv("AUXILIARY_VISION_MODEL", raising=False) from tools.vision_tools import _handle_vision_analyze - with patch("tools.vision_tools.vision_analyze_tool", new_callable=MagicMock) as mock_tool: + with ( + patch("tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock) as mock_tool, + patch("tools.vision_tools._should_use_native_vision_fast_path", return_value=False), + ): mock_tool.return_value = '{"success": true}' - _handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"}) + await _handle_vision_analyze({"image_url": "http://test.jpg", "question": "test"}) call_args = mock_tool.call_args # With no AUXILIARY_VISION_MODEL env var, model should be None # (the centralized call_llm router picks the provider default) diff --git a/tests/tools/test_vision_tools.py b/tests/tools/test_vision_tools.py index 9373d08f25a..54567ac02d7 100644 --- a/tests/tools/test_vision_tools.py +++ b/tests/tools/test_vision_tools.py @@ -191,57 +191,70 @@ class TestHandleVisionAnalyze: # Clean up the coroutine to avoid RuntimeWarning result.close() - def test_prompt_contains_question(self): + @pytest.mark.asyncio + async def test_prompt_contains_question(self): """The full prompt should incorporate the user's question.""" - with patch( - "tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock - ) as mock_tool: + with ( + patch( + "tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock + ) as mock_tool, + patch( + "tools.vision_tools._should_use_native_vision_fast_path", + return_value=False, + ), + ): mock_tool.return_value = json.dumps({"result": "ok"}) - coro = _handle_vision_analyze( + await _handle_vision_analyze( { "image_url": "https://example.com/img.png", "question": "Describe the cat", } ) - # Clean up coroutine - coro.close() call_args = mock_tool.call_args full_prompt = call_args[0][1] # second positional arg assert "Describe the cat" in full_prompt assert "Fully describe and explain" in full_prompt - def test_uses_auxiliary_vision_model_env(self): + @pytest.mark.asyncio + async def test_uses_auxiliary_vision_model_env(self): """AUXILIARY_VISION_MODEL env var should override DEFAULT_VISION_MODEL.""" with ( patch( "tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock ) as mock_tool, + patch( + "tools.vision_tools._should_use_native_vision_fast_path", + return_value=False, + ), patch.dict(os.environ, {"AUXILIARY_VISION_MODEL": "custom/model-v1"}), ): mock_tool.return_value = json.dumps({"result": "ok"}) - coro = _handle_vision_analyze( + await _handle_vision_analyze( {"image_url": "https://example.com/img.png", "question": "test"} ) - coro.close() call_args = mock_tool.call_args model = call_args[0][2] # third positional arg assert model == "custom/model-v1" - def test_falls_back_to_default_model(self): + @pytest.mark.asyncio + async def test_falls_back_to_default_model(self): """Without AUXILIARY_VISION_MODEL, model should be None (let call_llm resolve default).""" with ( patch( "tools.vision_tools.vision_analyze_tool", new_callable=AsyncMock ) as mock_tool, + patch( + "tools.vision_tools._should_use_native_vision_fast_path", + return_value=False, + ), patch.dict(os.environ, {}, clear=False), ): # Ensure AUXILIARY_VISION_MODEL is not set os.environ.pop("AUXILIARY_VISION_MODEL", None) mock_tool.return_value = json.dumps({"result": "ok"}) - coro = _handle_vision_analyze( + await _handle_vision_analyze( {"image_url": "https://example.com/img.png", "question": "test"} ) - coro.close() call_args = mock_tool.call_args model = call_args[0][2] # With no AUXILIARY_VISION_MODEL set, model should be None @@ -1069,3 +1082,169 @@ class TestDownloadRetryClassification: # All three attempts used, two backoff sleeps between them. assert mock_client.get.await_count == 3 assert mock_sleep.await_count == 2 + + +# --------------------------------------------------------------------------- +# Fan-out concurrency cap — a single turn (or several concurrent sessions in +# one process) can launch dozens of vision_analyze calls at once. The +# process-global semaphore must bound how many run simultaneously so a video- +# frame storm can't pin a worker thread and starve the dashboard event loop. +# --------------------------------------------------------------------------- + + +class TestVisionFanoutConcurrencyCap: + """The process-global semaphore bounds concurrent vision analyses.""" + + def test_resolver_defaults_to_min_cpus_and_ceiling(self): + from tools import vision_tools as vt + + with ( + patch.dict(os.environ, {}, clear=False), + patch("tools.vision_tools._detect_host_cpus", return_value=64), + ): + os.environ.pop("HERMES_VISION_MAX_CONCURRENCY", None) + # No config override available in the test env → falls to default, + # which is clamped to the ceiling even on a 64-core host. + with patch("hermes_cli.config.load_config", side_effect=Exception): + assert vt._resolve_vision_max_concurrency() == vt._VISION_DEFAULT_CONCURRENCY_CEILING + + def test_resolver_respects_low_host_cpu_count(self): + from tools import vision_tools as vt + + with ( + patch.dict(os.environ, {}, clear=False), + patch("tools.vision_tools._detect_host_cpus", return_value=2), + patch("hermes_cli.config.load_config", side_effect=Exception), + ): + os.environ.pop("HERMES_VISION_MAX_CONCURRENCY", None) + # 2-core host → cap is 2 (host limit, below the ceiling of 4). + assert vt._resolve_vision_max_concurrency() == 2 + + def test_resolver_env_override(self): + from tools import vision_tools as vt + + with patch.dict(os.environ, {"HERMES_VISION_MAX_CONCURRENCY": "1"}): + assert vt._resolve_vision_max_concurrency() == 1 + + def test_resolver_rejects_sub_one_override(self): + from tools import vision_tools as vt + + with ( + patch.dict(os.environ, {"HERMES_VISION_MAX_CONCURRENCY": "0"}), + patch("tools.vision_tools._detect_host_cpus", return_value=2), + patch("hermes_cli.config.load_config", side_effect=Exception), + ): + # 0 is ignored (cap can never be disabled) → falls back to default. + assert vt._resolve_vision_max_concurrency() == 2 + + @pytest.mark.asyncio + async def test_fanout_is_bounded_by_semaphore(self): + """Firing many concurrent vision calls must never exceed the cap in flight. + + This is the regression guard for the prod incident: an unbounded + fan-out pinned the event loop. With the cap, peak concurrency is + clamped to the semaphore value regardless of how many calls launch. + """ + import asyncio + import importlib + import threading + # Resolve the module fresh and drive BOTH the handler and the patch + # targets through that SAME module object. Sibling suites + # (test_vision_routing_31179) delete tools.vision_tools from + # sys.modules, so the top-level ``_handle_vision_analyze`` import can + # be bound to a stale module while ``patch`` hits the current one — + # patching the wrong object lets the real function run (peak stays 0). + vt = importlib.import_module("tools.vision_tools") + + CAP = 3 + in_flight = 0 + peak = 0 + lock = asyncio.Lock() + + async def fake_native(image_url, question): + nonlocal in_flight, peak + async with lock: + in_flight += 1 + peak = max(peak, in_flight) + try: + # Hold the slot long enough that, without a cap, all callers + # would overlap and drive peak up to N. + await asyncio.sleep(0.05) + finally: + async with lock: + in_flight -= 1 + return json.dumps({"ok": True}) + + N = 12 + # Install a fresh semaphore at the test cap so the assertion is + # deterministic regardless of the host's core count. + with ( + patch.object(vt, "_vision_concurrency_semaphore", + threading.BoundedSemaphore(CAP)), + patch.object(vt, "_should_use_native_vision_fast_path", + return_value=True), + patch.object(vt, "_vision_analyze_native", side_effect=fake_native), + ): + await asyncio.gather(*[ + vt._handle_vision_analyze( + {"image_url": f"https://example.com/frame_{i}.png", + "question": "what is this"} + ) + for i in range(N) + ]) + + assert peak <= CAP, f"peak concurrency {peak} exceeded cap {CAP}" + # Sanity: with N > CAP and a real wait, we should have actually + # saturated the cap (otherwise the test proves nothing). + assert peak == CAP, f"expected to saturate cap {CAP}, only reached {peak}" + + @pytest.mark.asyncio + async def test_unbounded_fanout_would_exceed_cap_without_semaphore(self): + """Control: with a no-op (effectively unbounded) semaphore, peak blows past CAP. + + Proves the guard above would fail if the semaphore weren't enforcing + the limit — i.e. the test is actually exercising the cap. + """ + import asyncio + import importlib + import threading + vt = importlib.import_module("tools.vision_tools") + + CAP = 3 + in_flight = 0 + peak = 0 + lock = asyncio.Lock() + + async def fake_native(image_url, question): + nonlocal in_flight, peak + async with lock: + in_flight += 1 + peak = max(peak, in_flight) + try: + await asyncio.sleep(0.05) + finally: + async with lock: + in_flight -= 1 + return json.dumps({"ok": True}) + + N = 12 + # A semaphore sized to N imposes no real limit for this workload. + with ( + patch.object(vt, "_vision_concurrency_semaphore", + threading.BoundedSemaphore(N)), + patch.object(vt, "_should_use_native_vision_fast_path", + return_value=True), + patch.object(vt, "_vision_analyze_native", side_effect=fake_native), + ): + await asyncio.gather(*[ + vt._handle_vision_analyze( + {"image_url": f"https://example.com/frame_{i}.png", + "question": "what is this"} + ) + for i in range(N) + ]) + + assert peak > CAP, ( + "control failed: peak did not exceed CAP even without a real cap " + f"(peak={peak})" + ) diff --git a/tools/vision_tools.py b/tools/vision_tools.py index cfc933dae0f..69bbc383276 100644 --- a/tools/vision_tools.py +++ b/tools/vision_tools.py @@ -29,6 +29,8 @@ Usage: """ import base64 +import contextlib +import asyncio import json import logging import os @@ -74,6 +76,91 @@ _VISION_DOWNLOAD_TIMEOUT = _resolve_download_timeout() _VISION_MAX_DOWNLOAD_BYTES = 50 * 1024 * 1024 +# --------------------------------------------------------------------------- +# Fan-out concurrency cap +# --------------------------------------------------------------------------- +# A single agent turn can fan out N vision_analyze calls at once (the classic +# trigger is "analyze every frame of this video" — ffmpeg explodes a clip into +# dozens of frames, the model then calls vision_analyze on each). Every call +# does a CPU-heavy base64-encode/resize burst AND holds a long-lived LLM stream +# open. The tool executor runs concurrent tool calls on a ThreadPoolExecutor +# (agent.tool_executor._MAX_TOOL_WORKERS = 8) PER SESSION, and several agent +# sessions share one process (the dashboard runs the agent in-process). With no +# global ceiling, a video-frame fan-out across one or more sessions pins a +# worker thread at ~100% CPU and starves the shared asyncio event loop that also +# serves the dashboard's /api/status liveness probe — so the instance flaps to +# UNHEALTHY even though nothing has actually crashed (observed in prod, June +# 2026). +# +# This semaphore bounds the number of vision analyses running concurrently +# across the WHOLE process, regardless of how many sessions or worker threads +# issue them. It is a threading.Semaphore (NOT asyncio.Semaphore): each vision +# call is dispatched through model_tools._run_async on a PER-THREAD event loop, +# so an asyncio primitive bound to one loop cannot coordinate across them. A +# threading semaphore is loop- and thread-agnostic, which is exactly what we +# need here. +# +# Default: min(host CPU count, 4), floored at 1 — "respect the host's +# concurrency, or lower". 4 is a conservative ceiling: vision work is a mix of +# CPU (encode/resize) and network (LLM stream), and we would rather under- +# subscribe than let a frame storm wedge the loop. Override with +# HERMES_VISION_MAX_CONCURRENCY (env) or auxiliary.vision.max_concurrency +# (config.yaml). 0 / negative / unparseable falls back to the default. +import threading + + +def _detect_host_cpus() -> int: + """Best-effort host CPU count, honoring cgroup/affinity limits when set. + + Prefers ``os.sched_getaffinity`` (the CPUs this process may actually run + on — respects container/cpuset pinning) and falls back to + ``os.cpu_count()``. Returns at least 1. + """ + try: + return max(1, len(os.sched_getaffinity(0))) # type: ignore[attr-defined] + except (AttributeError, OSError): + return max(1, os.cpu_count() or 1) + + +# Absolute ceiling for the default (not for explicit overrides): even on a +# many-core host, more than this many simultaneous in-process vision analyses +# is rarely worth the event-loop pressure. +_VISION_DEFAULT_CONCURRENCY_CEILING = 4 + + +def _resolve_vision_max_concurrency() -> int: + """Resolve the max concurrent vision analyses for this process. + + Resolution order: HERMES_VISION_MAX_CONCURRENCY env → config.yaml + auxiliary.vision.max_concurrency → default ``min(host_cpus, 4)``. Any + value that parses to < 1 is ignored in favor of the next source so the + cap can never be disabled into an unbounded fan-out. + """ + env_val = os.getenv("HERMES_VISION_MAX_CONCURRENCY", "").strip() + if env_val: + try: + parsed = int(env_val) + if parsed >= 1: + return parsed + except ValueError: + pass + try: + from hermes_cli.config import cfg_get, load_config + cfg = load_config() + val = cfg_get(cfg, "auxiliary", "vision", "max_concurrency") + if val is not None: + parsed = int(val) + if parsed >= 1: + return parsed + except Exception: + pass + return max(1, min(_detect_host_cpus(), _VISION_DEFAULT_CONCURRENCY_CEILING)) + + +_VISION_MAX_CONCURRENCY = _resolve_vision_max_concurrency() +_vision_concurrency_semaphore = threading.BoundedSemaphore(_VISION_MAX_CONCURRENCY) + + def _image_url_shape_ok(url: str) -> bool: """HTTP(S) shape check only (scheme, netloc). No DNS.""" if not url or not isinstance(url, str): @@ -685,6 +772,26 @@ def _build_native_vision_tool_result( } +@contextlib.asynccontextmanager +async def _vision_concurrency_slot(): + """Hold one process-global vision-concurrency slot for the duration. + + Acquires :data:`_vision_concurrency_semaphore` before yielding and always + releases it on exit. The blocking acquire is offloaded to a worker thread + via ``run_in_executor`` so that waiting for a slot never blocks the calling + event loop (callers run on per-thread loops; blocking the acquire on the + loop thread would freeze that loop's other tasks while we wait). The + semaphore is a ``BoundedSemaphore`` so a double-release would raise rather + than silently inflate the limit. + """ + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, _vision_concurrency_semaphore.acquire) + try: + yield + finally: + _vision_concurrency_semaphore.release() + + async def _vision_analyze_native( image_url: str, question: str, @@ -1194,27 +1301,36 @@ VISION_ANALYZE_SCHEMA = { } -def _handle_vision_analyze(args: Dict[str, Any], **kw: Any) -> Awaitable[str]: +async def _handle_vision_analyze(args: Dict[str, Any], **kw: Any) -> str: image_url = args.get("image_url", "") question = args.get("question", "") - # Fast path: when native image routing is in effect for the active main - # model (provider accepts images in tool results, or the user set the - # model.supports_vision override), short-circuit the auxiliary LLM and - # return the image bytes as a multimodal tool-result envelope. The main - # model sees the pixels directly on its next turn — no aux call, no - # information loss, no extra latency. - if _should_use_native_vision_fast_path(): - logger.info("vision_analyze: native fast path") - return _vision_analyze_native(image_url, question) + # Bound process-wide vision fan-out: a single turn (or several concurrent + # sessions sharing this process) can launch dozens of vision_analyze calls + # at once — e.g. "analyze every frame of this video". Each one is a + # CPU-heavy encode/resize plus a long LLM stream; unbounded, they pin a + # worker thread and starve the shared event loop that serves /api/status, + # flapping the instance to UNHEALTHY. The slot is held across the WHOLE + # analysis (image load + encode + LLM call), and acquiring it waits off the + # event loop, so excess calls queue instead of piling on simultaneously. + async with _vision_concurrency_slot(): + # Fast path: when native image routing is in effect for the active main + # model (provider accepts images in tool results, or the user set the + # model.supports_vision override), short-circuit the auxiliary LLM and + # return the image bytes as a multimodal tool-result envelope. The main + # model sees the pixels directly on its next turn — no aux call, no + # information loss, no extra latency. + if _should_use_native_vision_fast_path(): + logger.info("vision_analyze: native fast path") + return await _vision_analyze_native(image_url, question) - # Legacy path: aux LLM describes the image and we return its text. - full_prompt = ( - "Fully describe and explain everything about this image, then answer the " - f"following question:\n\n{question}" - ) - model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None - return vision_analyze_tool(image_url, full_prompt, model) + # Legacy path: aux LLM describes the image and we return its text. + full_prompt = ( + "Fully describe and explain everything about this image, then answer the " + f"following question:\n\n{question}" + ) + model = os.getenv("AUXILIARY_VISION_MODEL", "").strip() or None + return await vision_analyze_tool(image_url, full_prompt, model) registry.register( diff --git a/website/docs/reference/environment-variables.md b/website/docs/reference/environment-variables.md index c0ab01c0195..83c08397be9 100644 --- a/website/docs/reference/environment-variables.md +++ b/website/docs/reference/environment-variables.md @@ -684,6 +684,7 @@ Advanced per-platform knobs for throttling the outbound message batcher. Most us | `HERMES_FEISHU_DEDUP_CACHE_SIZE` | Size of the Feishu webhook dedup cache (default: `1024`). | | `HERMES_WECOM_TEXT_BATCH_DELAY_SECONDS` / `_SPLIT_DELAY_SECONDS` | WeCom batcher tuning. | | `HERMES_VISION_DOWNLOAD_TIMEOUT` | Timeout in seconds for downloading an image before handing it to vision models (default: `30`). | +| `HERMES_VISION_MAX_CONCURRENCY` | Max vision analyses running concurrently across the whole process (override for `auxiliary.vision.max_concurrency`; default `min(host CPUs, 4)`). Bounds video-frame fan-out so it can't saturate the event loop. Values `< 1` are ignored. | | `HERMES_RESTART_DRAIN_TIMEOUT` | Gateway: seconds to wait for active runs to drain on `/restart` before forcing the restart (default: `900`). | | `HERMES_GATEWAY_PLATFORM_CONNECT_TIMEOUT` | Per-platform connect timeout during gateway startup (seconds). | | `HERMES_GATEWAY_BUSY_INPUT_MODE` | Default gateway busy-input behavior: `queue`, `steer`, or `interrupt`. Can be overridden per chat with `/busy`. | diff --git a/website/docs/user-guide/configuration.md b/website/docs/user-guide/configuration.md index ae76793499b..e1ba4c6b525 100644 --- a/website/docs/user-guide/configuration.md +++ b/website/docs/user-guide/configuration.md @@ -1005,6 +1005,9 @@ auxiliary: api_key: "" # API key for base_url (falls back to OPENAI_API_KEY) timeout: 120 # seconds — LLM API call timeout; vision payloads need generous timeout download_timeout: 30 # seconds — image HTTP download; increase for slow connections + max_concurrency: 4 # max vision analyses running at once across the whole process + # (default: min(host CPUs, 4)) — bounds video-frame fan-out so it + # can't saturate the event loop. Minimum 1; values < 1 are ignored. # Web page summarization + browser page text extraction web_extract: