fix(security): enforce API_SERVER_KEY for non-loopback binding

Add is_network_accessible() helper using Python's ipaddress module to
robustly classify bind addresses (IPv4/IPv6 loopback, wildcards,
mapped addresses, hostname resolution with DNS-failure-fails-closed).

The API server connect() now refuses to start when the bind address is
network-accessible and no API_SERVER_KEY is set, preventing RCE from
other machines on the network.

Co-authored-by: entropidelic <entropidelic@users.noreply.github.com>
This commit is contained in:
entropidelic 2026-04-10 16:40:54 -07:00 committed by Teknium
parent 2a6cbf52d0
commit 989b950fbc
6 changed files with 188 additions and 8 deletions

View file

@ -25,6 +25,7 @@ import hmac
import json
import logging
import os
import socket as _socket
import re
import sqlite3
import time
@ -42,6 +43,7 @@ from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
SendResult,
is_network_accessible,
)
logger = logging.getLogger(__name__)
@ -406,7 +408,8 @@ class APIServerAdapter(BasePlatformAdapter):
Validate Bearer token from Authorization header.
Returns None if auth is OK, or a 401 web.Response on failure.
If no API key is configured, all requests are allowed.
If no API key is configured, all requests are allowed (only when API
server is local).
"""
if not self._api_key:
return None # No key configured — allow all (local-only use)
@ -1713,8 +1716,16 @@ class APIServerAdapter(BasePlatformAdapter):
if hasattr(sweep_task, "add_done_callback"):
sweep_task.add_done_callback(self._background_tasks.discard)
# Refuse to start network-accessible without authentication
if is_network_accessible(self._host) and not self._api_key:
logger.error(
"[%s] Refusing to start: binding to %s requires API_SERVER_KEY. "
"Set API_SERVER_KEY or use the default 127.0.0.1.",
self.name, self._host,
)
return False
# Port conflict detection — fail fast if port is already in use
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)

View file

@ -6,10 +6,12 @@ and implement the required methods.
"""
import asyncio
import ipaddress
import logging
import os
import random
import re
import socket as _socket
import subprocess
import sys
import uuid
@ -19,6 +21,41 @@ from urllib.parse import urlsplit
logger = logging.getLogger(__name__)
def is_network_accessible(host: str) -> bool:
"""Return True if *host* would expose the server beyond loopback.
Loopback addresses (127.0.0.1, ::1, IPv4-mapped ::ffff:127.0.0.1)
are local-only. Unspecified addresses (0.0.0.0, ::) bind all
interfaces. Hostnames are resolved; DNS failure fails closed.
"""
try:
addr = ipaddress.ip_address(host)
if addr.is_loopback:
return False
# ::ffff:127.0.0.1 — Python reports is_loopback=False for mapped
# addresses, so check the underlying IPv4 explicitly.
if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
return False
return True
except ValueError:
# when host variable is a hostname, we should try to resolve below
pass
try:
resolved = _socket.getaddrinfo(
host, None, _socket.AF_UNSPEC, _socket.SOCK_STREAM,
)
# if the hostname resolves into at least one non-loopback address,
# then we consider it to be network accessible
for _family, _type, _proto, _canonname, sockaddr in resolved:
addr = ipaddress.ip_address(sockaddr[0])
if not addr.is_loopback:
return True
return False
except (_socket.gaierror, OSError):
return True
def _detect_macos_system_proxy() -> str | None:
"""Read the macOS system HTTP(S) proxy via ``scutil --proxy``.

View file

@ -1209,8 +1209,8 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
"API_SERVER_KEY": {
"description": "Bearer token for API server authentication. If empty, all requests are allowed (local use only).",
"prompt": "API server auth key (optional)",
"description": "Bearer token for API server authentication. Required for non-loopback binding; server refuses to start without it. On loopback (127.0.0.1), all requests are allowed if empty.",
"prompt": "API server auth key (required for network access)",
"url": None,
"password": True,
"category": "messaging",
@ -1225,7 +1225,7 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
"API_SERVER_HOST": {
"description": "Host/bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access — requires API_SERVER_KEY for security.",
"description": "Host/bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access — server refuses to start without API_SERVER_KEY.",
"prompt": "API server host",
"url": None,
"password": False,

View file

@ -0,0 +1,132 @@
"""Tests for the API server bind-address startup guard.
Validates that is_network_accessible() correctly classifies addresses and
that connect() refuses to start on non-loopback without API_SERVER_KEY.
"""
import socket
from unittest.mock import AsyncMock, patch
import pytest
from gateway.config import PlatformConfig
from gateway.platforms.api_server import APIServerAdapter
from gateway.platforms.base import is_network_accessible
# ---------------------------------------------------------------------------
# Unit tests: is_network_accessible()
# ---------------------------------------------------------------------------
class TestIsNetworkAccessible:
"""Direct tests for the address classification helper."""
# -- Loopback (safe, should return False) --
def test_ipv4_loopback(self):
assert is_network_accessible("127.0.0.1") is False
def test_ipv6_loopback(self):
assert is_network_accessible("::1") is False
def test_ipv4_mapped_loopback(self):
# ::ffff:127.0.0.1 — Python's is_loopback returns False for mapped
# addresses; the helper must unwrap and check ipv4_mapped.
assert is_network_accessible("::ffff:127.0.0.1") is False
# -- Network-accessible (should return True) --
def test_ipv4_wildcard(self):
assert is_network_accessible("0.0.0.0") is True
def test_ipv6_wildcard(self):
# This is the bypass vector that the string-based check missed.
assert is_network_accessible("::") is True
def test_ipv4_mapped_unspecified(self):
assert is_network_accessible("::ffff:0.0.0.0") is True
def test_private_ipv4(self):
assert is_network_accessible("10.0.0.1") is True
def test_private_ipv4_class_c(self):
assert is_network_accessible("192.168.1.1") is True
def test_public_ipv4(self):
assert is_network_accessible("8.8.8.8") is True
# -- Hostname resolution --
def test_localhost_resolves_to_loopback(self):
loopback_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=loopback_result):
assert is_network_accessible("localhost") is False
def test_hostname_resolving_to_non_loopback(self):
non_loopback_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=non_loopback_result):
assert is_network_accessible("my-server.local") is True
def test_hostname_mixed_resolution(self):
"""If a hostname resolves to both loopback and non-loopback, it's
network-accessible (any non-loopback address is enough)."""
mixed_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)),
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=mixed_result):
assert is_network_accessible("dual-host.local") is True
def test_dns_failure_fails_closed(self):
"""Unresolvable hostnames should require an API key (fail closed)."""
with patch(
"gateway.platforms.base._socket.getaddrinfo",
side_effect=socket.gaierror("Name resolution failed"),
):
assert is_network_accessible("nonexistent.invalid") is True
# ---------------------------------------------------------------------------
# Integration tests: connect() startup guard
# ---------------------------------------------------------------------------
class TestConnectBindGuard:
"""Verify that connect() refuses dangerous configurations."""
@pytest.mark.asyncio
async def test_refuses_ipv4_wildcard_without_key(self):
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "0.0.0.0"}))
result = await adapter.connect()
assert result is False
@pytest.mark.asyncio
async def test_refuses_ipv6_wildcard_without_key(self):
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "::"}))
result = await adapter.connect()
assert result is False
def test_allows_loopback_without_key(self):
"""Loopback with no key should pass the guard."""
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "127.0.0.1"}))
assert adapter._api_key == ""
# The guard condition: is_network_accessible(host) AND NOT api_key
# For loopback, is_network_accessible is False so the guard does not block.
assert is_network_accessible(adapter._host) is False
@pytest.mark.asyncio
async def test_allows_wildcard_with_key(self):
"""Non-loopback with a key should pass the guard."""
adapter = APIServerAdapter(
PlatformConfig(enabled=True, extra={"host": "0.0.0.0", "key": "sk-test"})
)
# The guard checks: is_network_accessible(host) AND NOT api_key
# With a key set, the guard should not block.
assert adapter._api_key == "sk-test"
assert is_network_accessible("0.0.0.0") is True
# Combined: the guard condition is False (key is set), so it passes

View file

@ -269,10 +269,10 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI
| `WEBHOOK_PORT` | HTTP server port for receiving webhooks (default: `8644`) |
| `WEBHOOK_SECRET` | Global HMAC secret for webhook signature validation (used as fallback when routes don't specify their own) |
| `API_SERVER_ENABLED` | Enable the OpenAI-compatible API server (`true`/`false`). Runs alongside other platforms. |
| `API_SERVER_KEY` | Bearer token for API server authentication. Strongly recommended; required for any network-accessible deployment. |
| `API_SERVER_KEY` | Bearer token for API server authentication. Enforced for non-loopback binding. |
| `API_SERVER_CORS_ORIGINS` | Comma-separated browser origins allowed to call the API server directly (for example `http://localhost:3000,http://127.0.0.1:3000`). Default: disabled. |
| `API_SERVER_PORT` | Port for the API server (default: `8642`) |
| `API_SERVER_HOST` | Host/bind address for the API server (default: `127.0.0.1`). Use `0.0.0.0` for network access only with `API_SERVER_KEY` and a narrow `API_SERVER_CORS_ORIGINS` allowlist. |
| `API_SERVER_HOST` | Host/bind address for the API server (default: `127.0.0.1`). Use `0.0.0.0` for network access — requires `API_SERVER_KEY` and a narrow `API_SERVER_CORS_ORIGINS` allowlist. |
| `API_SERVER_MODEL_NAME` | Model name advertised on `/v1/models`. Defaults to the profile name (or `hermes-agent` for the default profile). Useful for multi-user setups where frontends like Open WebUI need distinct model names per connection. |
| `MESSAGING_CWD` | Working directory for terminal commands in messaging mode (default: `~`) |
| `GATEWAY_ALLOWED_USERS` | Comma-separated user IDs allowed across all platforms |

View file

@ -177,7 +177,7 @@ Authorization: Bearer ***
Configure the key via `API_SERVER_KEY` env var. If you need a browser to call Hermes directly, also set `API_SERVER_CORS_ORIGINS` to an explicit allowlist.
:::warning Security
The API server gives full access to hermes-agent's toolset, **including terminal commands**. If you change the bind address to `0.0.0.0` (network-accessible), **always set `API_SERVER_KEY`** and keep `API_SERVER_CORS_ORIGINS` narrow — without that, remote callers may be able to execute arbitrary commands on your machine.
The API server gives full access to hermes-agent's toolset, **including terminal commands**. When binding to a non-loopback address like `0.0.0.0`, `API_SERVER_KEY` is **required**. Also keep `API_SERVER_CORS_ORIGINS` narrow to control browser access.
The default bind address (`127.0.0.1`) is for local-only use. Browser access is disabled by default; enable it only for explicit trusted origins.
:::