From e499d69e3eed4b7fc5b90edc5844ff9ddfa84f2e Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 21 Jun 2026 07:26:03 -0700 Subject: [PATCH] feat(api-server): configurable concurrent-run cap to prevent DoS (#50007) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OpenAI-compatible API server only enforced a hardcoded cap of 10 concurrent runs on /v1/runs, leaving /v1/chat/completions and /v1/responses unbounded — a request flood could exhaust CPU, memory, and upstream LLM quota (#7483). - Add gateway.api_server.max_concurrent_runs (config.yaml, default 10, 0 disables). No env var. - Shared concurrency gate across all three agent-serving endpoints, counting both the chat/responses in-flight counter and the /v1/runs stream set. Returns OpenAI-style 429 + Retry-After when at the cap. - Remove the dead hardcoded _MAX_CONCURRENT_RUNS class attribute. Closes #7483. --- gateway/platforms/api_server.py | 86 +++++++++++++++++++++++++++++--- hermes_cli/config.py | 12 +++++ tests/gateway/test_api_server.py | 57 +++++++++++++++++++++ 3 files changed, 147 insertions(+), 8 deletions(-) diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index aa968dcb98c..1d2dfea8a4c 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -782,6 +782,15 @@ class APIServerAdapter(BasePlatformAdapter): # in-flight run by run_id. self._run_approval_sessions: Dict[str, str] = {} self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity + # Concurrency cap shared across all agent-serving endpoints + # (/v1/chat/completions, /v1/responses, /v1/runs). Read from + # config.yaml gateway.api_server.max_concurrent_runs; 0 disables + # the cap. Bounds CPU / memory / upstream-LLM-quota exhaustion + # from a request flood (#7483). + self._max_concurrent_runs: int = self._resolve_max_concurrent_runs() + # Number of in-flight runs on the non-streaming chat/responses paths + # (the /v1/runs path tracks its own in-flight set via _run_streams). + self._inflight_agent_runs: int = 0 @staticmethod def _parse_cors_origins(value: Any) -> tuple[str, ...]: @@ -798,6 +807,30 @@ class APIServerAdapter(BasePlatformAdapter): return tuple(str(item).strip() for item in items if str(item).strip()) + @staticmethod + def _resolve_max_concurrent_runs() -> int: + """Read the concurrent-run cap from config.yaml (0 disables). + + gateway.api_server.max_concurrent_runs. Falls back to the historical + default of 10 when unset or malformed. Negative values are clamped + to 0 (disabled). + """ + default = 10 + try: + from hermes_cli.config import cfg_get, load_config + + raw = cfg_get( + load_config(), + "gateway", + "api_server", + "max_concurrent_runs", + default=default, + ) + value = int(raw) + except Exception: + return default + return max(0, value) + @staticmethod def _resolve_model_name(explicit: str) -> str: """Derive the advertised model name for /v1/models. @@ -1767,6 +1800,11 @@ class APIServerAdapter(BasePlatformAdapter): if auth_err: return auth_err + # Bound total in-flight agent runs (configurable; #7483). + limited = self._concurrency_limited_response() + if limited is not None: + return limited + # Parse request body try: body = await request.json() @@ -2836,6 +2874,11 @@ class APIServerAdapter(BasePlatformAdapter): if auth_err: return auth_err + # Bound total in-flight agent runs (configurable; #7483). + limited = self._concurrency_limited_response() + if limited is not None: + return limited + # Long-term memory scope header (see chat_completions for details). gateway_session_key, key_err = self._parse_session_key_header(request) if key_err is not None: @@ -3587,6 +3630,31 @@ class APIServerAdapter(BasePlatformAdapter): # Agent execution # ------------------------------------------------------------------ + def _concurrency_limited_response(self) -> Optional["web.Response"]: + """Return a 429 response if the concurrent-run cap is reached, else None. + + The cap bounds total in-flight agent activity across every + agent-serving endpoint: the non-streaming chat/responses paths + (tracked by ``_inflight_agent_runs``) plus the ``/v1/runs`` streaming + path (tracked by ``_run_streams``). A configured value of 0 disables + the cap entirely. + """ + limit = self._max_concurrent_runs + if limit <= 0: + return None + inflight = self._inflight_agent_runs + len(self._run_streams) + if inflight >= limit: + return web.json_response( + _openai_error( + f"Too many concurrent runs (max {limit})", + err_type="rate_limit_error", + code="rate_limit_exceeded", + ), + status=429, + headers={"Retry-After": "1"}, + ) + return None + async def _run_agent( self, user_message: str, @@ -3655,13 +3723,16 @@ class APIServerAdapter(BasePlatformAdapter): finally: clear_session_vars(tokens) - return await loop.run_in_executor(None, _run) + self._inflight_agent_runs += 1 + try: + return await loop.run_in_executor(None, _run) + finally: + self._inflight_agent_runs -= 1 # ------------------------------------------------------------------ # /v1/runs — structured event streaming # ------------------------------------------------------------------ - _MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation _RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept _RUN_STATUS_TTL = 3600 # seconds to retain terminal run status for polling @@ -3737,12 +3808,11 @@ class APIServerAdapter(BasePlatformAdapter): if key_err is not None: return key_err - # Enforce concurrency limit - if len(self._run_streams) >= self._MAX_CONCURRENT_RUNS: - return web.json_response( - _openai_error(f"Too many concurrent runs (max {self._MAX_CONCURRENT_RUNS})", code="rate_limit_exceeded"), - status=429, - ) + # Enforce concurrency limit (shared across all agent-serving + # endpoints; configurable via gateway.api_server.max_concurrent_runs). + limited = self._concurrency_limited_response() + if limited is not None: + return limited try: body = await request.json() diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 260d0da5c2b..c44bf8de6c0 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -2511,6 +2511,18 @@ DEFAULT_CONFIG = { # multi-tool agent turn. Bridged to HERMES_MEDIA_TRUST_RECENT_SECONDS. # Only consulted when ``strict`` is true. "trust_recent_files_seconds": 600, + + # OpenAI-compatible API server platform + # (gateway/platforms/api_server.py). + "api_server": { + # Maximum number of agent runs the API server will service + # concurrently. Requests to /v1/chat/completions, /v1/responses, + # and /v1/runs that arrive while this many runs are already + # in flight are rejected with HTTP 429 + a Retry-After header, + # bounding CPU / memory / upstream-LLM-quota exhaustion from a + # request flood. Set to 0 to disable the cap entirely. + "max_concurrent_runs": 10, + }, }, # Real-time token streaming to messaging platforms (Telegram, Discord, diff --git a/tests/gateway/test_api_server.py b/tests/gateway/test_api_server.py index 6588a70fa7a..a941d4afc93 100644 --- a/tests/gateway/test_api_server.py +++ b/tests/gateway/test_api_server.py @@ -420,6 +420,63 @@ class TestAuth: assert result.status == 401 +# --------------------------------------------------------------------------- +# Concurrency cap (gateway.api_server.max_concurrent_runs) — #7483 +# --------------------------------------------------------------------------- + + +class TestConcurrencyCap: + def test_resolve_defaults_to_10_when_unset(self): + with patch("hermes_cli.config.load_config", return_value={}): + assert APIServerAdapter._resolve_max_concurrent_runs() == 10 + + def test_resolve_reads_config_value(self): + cfg = {"gateway": {"api_server": {"max_concurrent_runs": 3}}} + with patch("hermes_cli.config.load_config", return_value=cfg): + assert APIServerAdapter._resolve_max_concurrent_runs() == 3 + + def test_resolve_clamps_negative_to_zero(self): + cfg = {"gateway": {"api_server": {"max_concurrent_runs": -5}}} + with patch("hermes_cli.config.load_config", return_value=cfg): + assert APIServerAdapter._resolve_max_concurrent_runs() == 0 + + def test_resolve_malformed_falls_back_to_default(self): + cfg = {"gateway": {"api_server": {"max_concurrent_runs": "not-an-int"}}} + with patch("hermes_cli.config.load_config", return_value=cfg): + assert APIServerAdapter._resolve_max_concurrent_runs() == 10 + + def test_under_cap_returns_none(self): + adapter = _make_adapter() + adapter._max_concurrent_runs = 5 + adapter._inflight_agent_runs = 2 + assert adapter._concurrency_limited_response() is None + + def test_at_cap_returns_429_with_retry_after(self): + adapter = _make_adapter() + adapter._max_concurrent_runs = 3 + adapter._inflight_agent_runs = 3 + resp = adapter._concurrency_limited_response() + assert resp is not None + assert resp.status == 429 + assert resp.headers.get("Retry-After") + + def test_cap_counts_both_buckets(self): + # /v1/runs (tracked by _run_streams) + chat/responses (inflight) + adapter = _make_adapter() + adapter._max_concurrent_runs = 4 + adapter._inflight_agent_runs = 2 + adapter._run_streams = {"r1": object(), "r2": object()} + resp = adapter._concurrency_limited_response() + assert resp is not None + assert resp.status == 429 + + def test_zero_disables_cap(self): + adapter = _make_adapter() + adapter._max_concurrent_runs = 0 + adapter._inflight_agent_runs = 9999 + assert adapter._concurrency_limited_response() is None + + # --------------------------------------------------------------------------- # Helpers for HTTP tests # ---------------------------------------------------------------------------