mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-24 10:52:21 +00:00
feat(api-server): configurable concurrent-run cap to prevent DoS (#50007)
The OpenAI-compatible API server only enforced a hardcoded cap of 10 concurrent runs on /v1/runs, leaving /v1/chat/completions and /v1/responses unbounded — a request flood could exhaust CPU, memory, and upstream LLM quota (#7483). - Add gateway.api_server.max_concurrent_runs (config.yaml, default 10, 0 disables). No env var. - Shared concurrency gate across all three agent-serving endpoints, counting both the chat/responses in-flight counter and the /v1/runs stream set. Returns OpenAI-style 429 + Retry-After when at the cap. - Remove the dead hardcoded _MAX_CONCURRENT_RUNS class attribute. Closes #7483.
This commit is contained in:
parent
99233faf78
commit
e499d69e3e
3 changed files with 147 additions and 8 deletions
|
|
@ -782,6 +782,15 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
# in-flight run by run_id.
|
||||
self._run_approval_sessions: Dict[str, str] = {}
|
||||
self._session_db: Optional[Any] = None # Lazy-init SessionDB for session continuity
|
||||
# Concurrency cap shared across all agent-serving endpoints
|
||||
# (/v1/chat/completions, /v1/responses, /v1/runs). Read from
|
||||
# config.yaml gateway.api_server.max_concurrent_runs; 0 disables
|
||||
# the cap. Bounds CPU / memory / upstream-LLM-quota exhaustion
|
||||
# from a request flood (#7483).
|
||||
self._max_concurrent_runs: int = self._resolve_max_concurrent_runs()
|
||||
# Number of in-flight runs on the non-streaming chat/responses paths
|
||||
# (the /v1/runs path tracks its own in-flight set via _run_streams).
|
||||
self._inflight_agent_runs: int = 0
|
||||
|
||||
@staticmethod
|
||||
def _parse_cors_origins(value: Any) -> tuple[str, ...]:
|
||||
|
|
@ -798,6 +807,30 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
|
||||
return tuple(str(item).strip() for item in items if str(item).strip())
|
||||
|
||||
@staticmethod
|
||||
def _resolve_max_concurrent_runs() -> int:
|
||||
"""Read the concurrent-run cap from config.yaml (0 disables).
|
||||
|
||||
gateway.api_server.max_concurrent_runs. Falls back to the historical
|
||||
default of 10 when unset or malformed. Negative values are clamped
|
||||
to 0 (disabled).
|
||||
"""
|
||||
default = 10
|
||||
try:
|
||||
from hermes_cli.config import cfg_get, load_config
|
||||
|
||||
raw = cfg_get(
|
||||
load_config(),
|
||||
"gateway",
|
||||
"api_server",
|
||||
"max_concurrent_runs",
|
||||
default=default,
|
||||
)
|
||||
value = int(raw)
|
||||
except Exception:
|
||||
return default
|
||||
return max(0, value)
|
||||
|
||||
@staticmethod
|
||||
def _resolve_model_name(explicit: str) -> str:
|
||||
"""Derive the advertised model name for /v1/models.
|
||||
|
|
@ -1767,6 +1800,11 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
# Bound total in-flight agent runs (configurable; #7483).
|
||||
limited = self._concurrency_limited_response()
|
||||
if limited is not None:
|
||||
return limited
|
||||
|
||||
# Parse request body
|
||||
try:
|
||||
body = await request.json()
|
||||
|
|
@ -2836,6 +2874,11 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
if auth_err:
|
||||
return auth_err
|
||||
|
||||
# Bound total in-flight agent runs (configurable; #7483).
|
||||
limited = self._concurrency_limited_response()
|
||||
if limited is not None:
|
||||
return limited
|
||||
|
||||
# Long-term memory scope header (see chat_completions for details).
|
||||
gateway_session_key, key_err = self._parse_session_key_header(request)
|
||||
if key_err is not None:
|
||||
|
|
@ -3587,6 +3630,31 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
# Agent execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _concurrency_limited_response(self) -> Optional["web.Response"]:
|
||||
"""Return a 429 response if the concurrent-run cap is reached, else None.
|
||||
|
||||
The cap bounds total in-flight agent activity across every
|
||||
agent-serving endpoint: the non-streaming chat/responses paths
|
||||
(tracked by ``_inflight_agent_runs``) plus the ``/v1/runs`` streaming
|
||||
path (tracked by ``_run_streams``). A configured value of 0 disables
|
||||
the cap entirely.
|
||||
"""
|
||||
limit = self._max_concurrent_runs
|
||||
if limit <= 0:
|
||||
return None
|
||||
inflight = self._inflight_agent_runs + len(self._run_streams)
|
||||
if inflight >= limit:
|
||||
return web.json_response(
|
||||
_openai_error(
|
||||
f"Too many concurrent runs (max {limit})",
|
||||
err_type="rate_limit_error",
|
||||
code="rate_limit_exceeded",
|
||||
),
|
||||
status=429,
|
||||
headers={"Retry-After": "1"},
|
||||
)
|
||||
return None
|
||||
|
||||
async def _run_agent(
|
||||
self,
|
||||
user_message: str,
|
||||
|
|
@ -3655,13 +3723,16 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
finally:
|
||||
clear_session_vars(tokens)
|
||||
|
||||
return await loop.run_in_executor(None, _run)
|
||||
self._inflight_agent_runs += 1
|
||||
try:
|
||||
return await loop.run_in_executor(None, _run)
|
||||
finally:
|
||||
self._inflight_agent_runs -= 1
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# /v1/runs — structured event streaming
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
_MAX_CONCURRENT_RUNS = 10 # Prevent unbounded resource allocation
|
||||
_RUN_STREAM_TTL = 300 # seconds before orphaned runs are swept
|
||||
_RUN_STATUS_TTL = 3600 # seconds to retain terminal run status for polling
|
||||
|
||||
|
|
@ -3737,12 +3808,11 @@ class APIServerAdapter(BasePlatformAdapter):
|
|||
if key_err is not None:
|
||||
return key_err
|
||||
|
||||
# Enforce concurrency limit
|
||||
if len(self._run_streams) >= self._MAX_CONCURRENT_RUNS:
|
||||
return web.json_response(
|
||||
_openai_error(f"Too many concurrent runs (max {self._MAX_CONCURRENT_RUNS})", code="rate_limit_exceeded"),
|
||||
status=429,
|
||||
)
|
||||
# Enforce concurrency limit (shared across all agent-serving
|
||||
# endpoints; configurable via gateway.api_server.max_concurrent_runs).
|
||||
limited = self._concurrency_limited_response()
|
||||
if limited is not None:
|
||||
return limited
|
||||
|
||||
try:
|
||||
body = await request.json()
|
||||
|
|
|
|||
|
|
@ -2511,6 +2511,18 @@ DEFAULT_CONFIG = {
|
|||
# multi-tool agent turn. Bridged to HERMES_MEDIA_TRUST_RECENT_SECONDS.
|
||||
# Only consulted when ``strict`` is true.
|
||||
"trust_recent_files_seconds": 600,
|
||||
|
||||
# OpenAI-compatible API server platform
|
||||
# (gateway/platforms/api_server.py).
|
||||
"api_server": {
|
||||
# Maximum number of agent runs the API server will service
|
||||
# concurrently. Requests to /v1/chat/completions, /v1/responses,
|
||||
# and /v1/runs that arrive while this many runs are already
|
||||
# in flight are rejected with HTTP 429 + a Retry-After header,
|
||||
# bounding CPU / memory / upstream-LLM-quota exhaustion from a
|
||||
# request flood. Set to 0 to disable the cap entirely.
|
||||
"max_concurrent_runs": 10,
|
||||
},
|
||||
},
|
||||
|
||||
# Real-time token streaming to messaging platforms (Telegram, Discord,
|
||||
|
|
|
|||
|
|
@ -420,6 +420,63 @@ class TestAuth:
|
|||
assert result.status == 401
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Concurrency cap (gateway.api_server.max_concurrent_runs) — #7483
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestConcurrencyCap:
|
||||
def test_resolve_defaults_to_10_when_unset(self):
|
||||
with patch("hermes_cli.config.load_config", return_value={}):
|
||||
assert APIServerAdapter._resolve_max_concurrent_runs() == 10
|
||||
|
||||
def test_resolve_reads_config_value(self):
|
||||
cfg = {"gateway": {"api_server": {"max_concurrent_runs": 3}}}
|
||||
with patch("hermes_cli.config.load_config", return_value=cfg):
|
||||
assert APIServerAdapter._resolve_max_concurrent_runs() == 3
|
||||
|
||||
def test_resolve_clamps_negative_to_zero(self):
|
||||
cfg = {"gateway": {"api_server": {"max_concurrent_runs": -5}}}
|
||||
with patch("hermes_cli.config.load_config", return_value=cfg):
|
||||
assert APIServerAdapter._resolve_max_concurrent_runs() == 0
|
||||
|
||||
def test_resolve_malformed_falls_back_to_default(self):
|
||||
cfg = {"gateway": {"api_server": {"max_concurrent_runs": "not-an-int"}}}
|
||||
with patch("hermes_cli.config.load_config", return_value=cfg):
|
||||
assert APIServerAdapter._resolve_max_concurrent_runs() == 10
|
||||
|
||||
def test_under_cap_returns_none(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._max_concurrent_runs = 5
|
||||
adapter._inflight_agent_runs = 2
|
||||
assert adapter._concurrency_limited_response() is None
|
||||
|
||||
def test_at_cap_returns_429_with_retry_after(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._max_concurrent_runs = 3
|
||||
adapter._inflight_agent_runs = 3
|
||||
resp = adapter._concurrency_limited_response()
|
||||
assert resp is not None
|
||||
assert resp.status == 429
|
||||
assert resp.headers.get("Retry-After")
|
||||
|
||||
def test_cap_counts_both_buckets(self):
|
||||
# /v1/runs (tracked by _run_streams) + chat/responses (inflight)
|
||||
adapter = _make_adapter()
|
||||
adapter._max_concurrent_runs = 4
|
||||
adapter._inflight_agent_runs = 2
|
||||
adapter._run_streams = {"r1": object(), "r2": object()}
|
||||
resp = adapter._concurrency_limited_response()
|
||||
assert resp is not None
|
||||
assert resp.status == 429
|
||||
|
||||
def test_zero_disables_cap(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._max_concurrent_runs = 0
|
||||
adapter._inflight_agent_runs = 9999
|
||||
assert adapter._concurrency_limited_response() is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers for HTTP tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue