mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(web_server,whatsapp-bridge): validate Host header against bound interface (#13530)
DNS rebinding attack: a victim browser that has the dashboard (or the WhatsApp bridge) open could be tricked into fetching from an attacker-controlled hostname that TTL-flips to 127.0.0.1. Same-origin and CORS checks don't help — the browser now treats the attacker origin as same-origin with the local service. Validating the Host header at the app layer rejects any request whose Host isn't one we bound for. Changes: hermes_cli/web_server.py: - New host_header_middleware runs before auth_middleware. Reads app.state.bound_host (set by start_server) and rejects requests whose Host header doesn't match the bound interface with HTTP 400. - Loopback binds accept localhost / 127.0.0.1 / ::1. Non-loopback binds require exact match. 0.0.0.0 binds skip the check (explicit --insecure opt-in; no app-layer defence possible). - IPv6 bracket notation parsed correctly: [::1] and [::1]:9119 both accepted. scripts/whatsapp-bridge/bridge.js: - Express middleware rejects non-loopback Host headers. Bridge already binds 127.0.0.1-only, this adds the complementary app-layer check for DNS rebinding defence. Tests: 8 new in tests/hermes_cli/test_web_server_host_header.py covering loopback/non-loopback/zero-zero binds, IPv6 brackets, case insensitivity, and end-to-end middleware rejection via TestClient. Reported in GHSA-ppp5-vxwm-4cf7 by @bupt-Yy-young. Hardening — not CVE per SECURITY.md §3. The dashboard's main trust boundary is the loopback bind + session token; DNS rebinding defeats the bind assumption but not the token (since the rebinding browser still sees a first-party fetch to 127.0.0.1 with the token-gated API). Host-header validation adds the missing belt-and-braces layer.
This commit is contained in:
parent
16accd44bd
commit
244ae6db15
3 changed files with 268 additions and 0 deletions
|
|
@ -114,6 +114,91 @@ def _require_token(request: Request) -> None:
|
||||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||||
|
|
||||||
|
|
||||||
|
# Accepted Host header values for loopback binds. DNS rebinding attacks
|
||||||
|
# point a victim browser at an attacker-controlled hostname (evil.test)
|
||||||
|
# which resolves to 127.0.0.1 after a TTL flip — bypassing same-origin
|
||||||
|
# checks because the browser now considers evil.test and our dashboard
|
||||||
|
# "same origin". Validating the Host header at the app layer rejects any
|
||||||
|
# request whose Host isn't one we bound for. See GHSA-ppp5-vxwm-4cf7.
|
||||||
|
_LOOPBACK_HOST_VALUES: frozenset = frozenset({
|
||||||
|
"localhost", "127.0.0.1", "::1",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def _is_accepted_host(host_header: str, bound_host: str) -> bool:
|
||||||
|
"""True if the Host header targets the interface we bound to.
|
||||||
|
|
||||||
|
Accepts:
|
||||||
|
- Exact bound host (with or without port suffix)
|
||||||
|
- Loopback aliases when bound to loopback
|
||||||
|
- Any host when bound to 0.0.0.0 (explicit opt-in to non-loopback,
|
||||||
|
no protection possible at this layer)
|
||||||
|
"""
|
||||||
|
if not host_header:
|
||||||
|
return False
|
||||||
|
# Strip port suffix. IPv6 addresses use bracket notation:
|
||||||
|
# [::1] — no port
|
||||||
|
# [::1]:9119 — with port
|
||||||
|
# Plain hosts/v4:
|
||||||
|
# localhost:9119
|
||||||
|
# 127.0.0.1:9119
|
||||||
|
h = host_header.strip()
|
||||||
|
if h.startswith("["):
|
||||||
|
# IPv6 bracketed — port (if any) follows "]:"
|
||||||
|
close = h.find("]")
|
||||||
|
if close != -1:
|
||||||
|
host_only = h[1:close] # strip brackets
|
||||||
|
else:
|
||||||
|
host_only = h.strip("[]")
|
||||||
|
else:
|
||||||
|
host_only = h.rsplit(":", 1)[0] if ":" in h else h
|
||||||
|
host_only = host_only.lower()
|
||||||
|
|
||||||
|
# 0.0.0.0 bind means operator explicitly opted into all-interfaces
|
||||||
|
# (requires --insecure per web_server.start_server). No Host-layer
|
||||||
|
# defence can protect that mode; rely on operator network controls.
|
||||||
|
if bound_host in ("0.0.0.0", "::"):
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Loopback bind: accept the loopback names
|
||||||
|
bound_lc = bound_host.lower()
|
||||||
|
if bound_lc in _LOOPBACK_HOST_VALUES:
|
||||||
|
return host_only in _LOOPBACK_HOST_VALUES
|
||||||
|
|
||||||
|
# Explicit non-loopback bind: require exact host match
|
||||||
|
return host_only == bound_lc
|
||||||
|
|
||||||
|
|
||||||
|
@app.middleware("http")
|
||||||
|
async def host_header_middleware(request: Request, call_next):
|
||||||
|
"""Reject requests whose Host header doesn't match the bound interface.
|
||||||
|
|
||||||
|
Defends against DNS rebinding: a victim browser on a localhost
|
||||||
|
dashboard is tricked into fetching from an attacker hostname that
|
||||||
|
TTL-flips to 127.0.0.1. CORS and same-origin checks don't help —
|
||||||
|
the browser now treats the attacker origin as same-origin with the
|
||||||
|
dashboard. Host-header validation at the app layer catches it.
|
||||||
|
|
||||||
|
See GHSA-ppp5-vxwm-4cf7.
|
||||||
|
"""
|
||||||
|
# Store the bound host on app.state so this middleware can read it —
|
||||||
|
# set by start_server() at listen time.
|
||||||
|
bound_host = getattr(app.state, "bound_host", None)
|
||||||
|
if bound_host:
|
||||||
|
host_header = request.headers.get("host", "")
|
||||||
|
if not _is_accepted_host(host_header, bound_host):
|
||||||
|
return JSONResponse(
|
||||||
|
status_code=400,
|
||||||
|
content={
|
||||||
|
"detail": (
|
||||||
|
"Invalid Host header. Dashboard requests must use "
|
||||||
|
"the hostname the server was bound to."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return await call_next(request)
|
||||||
|
|
||||||
|
|
||||||
@app.middleware("http")
|
@app.middleware("http")
|
||||||
async def auth_middleware(request: Request, call_next):
|
async def auth_middleware(request: Request, call_next):
|
||||||
"""Require the session token on all /api/ routes except the public list."""
|
"""Require the session token on all /api/ routes except the public list."""
|
||||||
|
|
@ -2323,6 +2408,10 @@ def start_server(
|
||||||
"authentication. Only use on trusted networks.", host,
|
"authentication. Only use on trusted networks.", host,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Record the bound host so host_header_middleware can validate incoming
|
||||||
|
# Host headers against it. Defends against DNS rebinding (GHSA-ppp5-vxwm-4cf7).
|
||||||
|
app.state.bound_host = host
|
||||||
|
|
||||||
if open_browser:
|
if open_browser:
|
||||||
import webbrowser
|
import webbrowser
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -372,6 +372,37 @@ async function startSocket() {
|
||||||
const app = express();
|
const app = express();
|
||||||
app.use(express.json());
|
app.use(express.json());
|
||||||
|
|
||||||
|
// Host-header validation — defends against DNS rebinding.
|
||||||
|
// The bridge binds loopback-only (127.0.0.1) but a victim browser on
|
||||||
|
// the same machine could be tricked into fetching from an attacker
|
||||||
|
// hostname that TTL-flips to 127.0.0.1. Reject any request whose Host
|
||||||
|
// header doesn't resolve to a loopback alias.
|
||||||
|
// See GHSA-ppp5-vxwm-4cf7.
|
||||||
|
const _ACCEPTED_HOST_VALUES = new Set([
|
||||||
|
'localhost',
|
||||||
|
'127.0.0.1',
|
||||||
|
'[::1]',
|
||||||
|
'::1',
|
||||||
|
]);
|
||||||
|
|
||||||
|
app.use((req, res, next) => {
|
||||||
|
const raw = (req.headers.host || '').trim();
|
||||||
|
if (!raw) {
|
||||||
|
return res.status(400).json({ error: 'Missing Host header' });
|
||||||
|
}
|
||||||
|
// Strip port suffix: "localhost:3000" → "localhost"
|
||||||
|
const hostOnly = (raw.includes(':')
|
||||||
|
? raw.substring(0, raw.lastIndexOf(':'))
|
||||||
|
: raw
|
||||||
|
).replace(/^\[|\]$/g, '').toLowerCase();
|
||||||
|
if (!_ACCEPTED_HOST_VALUES.has(hostOnly)) {
|
||||||
|
return res.status(400).json({
|
||||||
|
error: 'Invalid Host header. Bridge accepts loopback hosts only.',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
next();
|
||||||
|
});
|
||||||
|
|
||||||
// Poll for new messages (long-poll style)
|
// Poll for new messages (long-poll style)
|
||||||
app.get('/messages', (req, res) => {
|
app.get('/messages', (req, res) => {
|
||||||
const msgs = messageQueue.splice(0, messageQueue.length);
|
const msgs = messageQueue.splice(0, messageQueue.length);
|
||||||
|
|
|
||||||
148
tests/hermes_cli/test_web_server_host_header.py
Normal file
148
tests/hermes_cli/test_web_server_host_header.py
Normal file
|
|
@ -0,0 +1,148 @@
|
||||||
|
"""Tests for GHSA-ppp5-vxwm-4cf7 — Host-header validation.
|
||||||
|
|
||||||
|
DNS rebinding defence: a victim browser that has the dashboard open
|
||||||
|
could be tricked into fetching from an attacker-controlled hostname
|
||||||
|
that TTL-flips to 127.0.0.1. Same-origin / CORS checks won't help —
|
||||||
|
the browser now treats the attacker origin as same-origin. Validating
|
||||||
|
the Host header at the application layer rejects the attack.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
_repo = str(Path(__file__).resolve().parents[1])
|
||||||
|
if _repo not in sys.path:
|
||||||
|
sys.path.insert(0, _repo)
|
||||||
|
|
||||||
|
|
||||||
|
class TestHostHeaderValidator:
|
||||||
|
"""Unit test the _is_accepted_host helper directly — cheaper and
|
||||||
|
more thorough than spinning up the full FastAPI app."""
|
||||||
|
|
||||||
|
def test_loopback_bind_accepts_loopback_names(self):
|
||||||
|
from hermes_cli.web_server import _is_accepted_host
|
||||||
|
|
||||||
|
for bound in ("127.0.0.1", "localhost", "::1"):
|
||||||
|
for host_header in (
|
||||||
|
"127.0.0.1", "127.0.0.1:9119",
|
||||||
|
"localhost", "localhost:9119",
|
||||||
|
"[::1]", "[::1]:9119",
|
||||||
|
):
|
||||||
|
assert _is_accepted_host(host_header, bound), (
|
||||||
|
f"bound={bound} must accept host={host_header}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_loopback_bind_rejects_attacker_hostnames(self):
|
||||||
|
"""The core rebinding defence: attacker-controlled hosts that
|
||||||
|
TTL-flip to 127.0.0.1 must be rejected."""
|
||||||
|
from hermes_cli.web_server import _is_accepted_host
|
||||||
|
|
||||||
|
for bound in ("127.0.0.1", "localhost"):
|
||||||
|
for attacker in (
|
||||||
|
"evil.example",
|
||||||
|
"evil.example:9119",
|
||||||
|
"rebind.attacker.test:80",
|
||||||
|
"localhost.attacker.test", # subdomain trick
|
||||||
|
"127.0.0.1.evil.test", # lookalike IP prefix
|
||||||
|
"", # missing Host
|
||||||
|
):
|
||||||
|
assert not _is_accepted_host(attacker, bound), (
|
||||||
|
f"bound={bound} must reject attacker host={attacker!r}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_zero_zero_bind_accepts_anything(self):
|
||||||
|
"""0.0.0.0 means operator explicitly opted into all-interfaces
|
||||||
|
(requires --insecure). No Host-layer defence is possible — rely
|
||||||
|
on operator network controls."""
|
||||||
|
from hermes_cli.web_server import _is_accepted_host
|
||||||
|
|
||||||
|
for host in ("10.0.0.5", "evil.example", "my-server.corp.net"):
|
||||||
|
assert _is_accepted_host(host, "0.0.0.0")
|
||||||
|
assert _is_accepted_host(host + ":9119", "0.0.0.0")
|
||||||
|
|
||||||
|
def test_explicit_non_loopback_bind_requires_exact_match(self):
|
||||||
|
"""If the operator bound to a specific non-loopback hostname,
|
||||||
|
the Host header must match exactly."""
|
||||||
|
from hermes_cli.web_server import _is_accepted_host
|
||||||
|
|
||||||
|
assert _is_accepted_host("my-server.corp.net", "my-server.corp.net")
|
||||||
|
assert _is_accepted_host("my-server.corp.net:9119", "my-server.corp.net")
|
||||||
|
# Different host — reject
|
||||||
|
assert not _is_accepted_host("evil.example", "my-server.corp.net")
|
||||||
|
# Loopback — reject (we bound to a specific non-loopback name)
|
||||||
|
assert not _is_accepted_host("localhost", "my-server.corp.net")
|
||||||
|
|
||||||
|
def test_case_insensitive_comparison(self):
|
||||||
|
"""Host headers are case-insensitive per RFC — accept variations."""
|
||||||
|
from hermes_cli.web_server import _is_accepted_host
|
||||||
|
|
||||||
|
assert _is_accepted_host("LOCALHOST", "127.0.0.1")
|
||||||
|
assert _is_accepted_host("LocalHost:9119", "127.0.0.1")
|
||||||
|
|
||||||
|
|
||||||
|
class TestHostHeaderMiddleware:
|
||||||
|
"""End-to-end test via the FastAPI app — verify the middleware
|
||||||
|
rejects bad Host headers with 400."""
|
||||||
|
|
||||||
|
def test_rebinding_request_rejected(self):
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from hermes_cli.web_server import app
|
||||||
|
|
||||||
|
# Simulate start_server having set the bound_host
|
||||||
|
app.state.bound_host = "127.0.0.1"
|
||||||
|
try:
|
||||||
|
client = TestClient(app)
|
||||||
|
# The TestClient sends Host: testserver by default — which is
|
||||||
|
# NOT a loopback alias, so the middleware must reject it.
|
||||||
|
resp = client.get(
|
||||||
|
"/api/status",
|
||||||
|
headers={"Host": "evil.example"},
|
||||||
|
)
|
||||||
|
assert resp.status_code == 400
|
||||||
|
assert "Invalid Host header" in resp.json()["detail"]
|
||||||
|
finally:
|
||||||
|
# Clean up so other tests don't inherit the bound_host
|
||||||
|
if hasattr(app.state, "bound_host"):
|
||||||
|
del app.state.bound_host
|
||||||
|
|
||||||
|
def test_legit_loopback_request_accepted(self):
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from hermes_cli.web_server import app
|
||||||
|
|
||||||
|
app.state.bound_host = "127.0.0.1"
|
||||||
|
try:
|
||||||
|
client = TestClient(app)
|
||||||
|
# /api/status is in _PUBLIC_API_PATHS — passes auth — so the
|
||||||
|
# only thing that can reject is the host header middleware
|
||||||
|
resp = client.get(
|
||||||
|
"/api/status",
|
||||||
|
headers={"Host": "localhost:9119"},
|
||||||
|
)
|
||||||
|
# Either 200 (endpoint served) or some other non-400 —
|
||||||
|
# just not the host-rejection 400
|
||||||
|
assert resp.status_code != 400 or (
|
||||||
|
"Invalid Host header" not in resp.json().get("detail", "")
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
if hasattr(app.state, "bound_host"):
|
||||||
|
del app.state.bound_host
|
||||||
|
|
||||||
|
def test_no_bound_host_skips_validation(self):
|
||||||
|
"""If app.state.bound_host isn't set (e.g. running under test
|
||||||
|
infra without calling start_server), middleware must pass through
|
||||||
|
rather than crash."""
|
||||||
|
from fastapi.testclient import TestClient
|
||||||
|
from hermes_cli.web_server import app
|
||||||
|
|
||||||
|
# Make sure bound_host isn't set
|
||||||
|
if hasattr(app.state, "bound_host"):
|
||||||
|
del app.state.bound_host
|
||||||
|
|
||||||
|
client = TestClient(app)
|
||||||
|
resp = client.get("/api/status")
|
||||||
|
# Should get through to the status endpoint, not a 400
|
||||||
|
assert resp.status_code != 400
|
||||||
Loading…
Add table
Add a link
Reference in a new issue