feat(relay): managed-boot self-provision client (Phase 3, gateway side) (#48242)

The gateway half of relay Phase 3. On a MANAGED boot with relay configured and
no secret pinned, the runtime self-provisions its relay credentials IN-PROCESS:
resolve the agent's own Nous access token (resolve_nous_access_token) -> POST
the connector's /relay/provision asserting its own endpoint + route keys ->
set GATEWAY_RELAY_ID/SECRET/DELIVERY_KEY into os.environ so the immediately-
following register_relay_adapter() reads them and dials out authenticated.

No human, no enrollment token, no disk write — the creds live only in process
memory (save_env_value refuses under managed anyway, and keeping the secret off
any volume is the stronger posture). Stateless: process-env creds don't survive
a restart, so a managed container re-provisions every boot; the connector's
rotation window covers a still-connected prior instance. An explicitly-pinned
GATEWAY_RELAY_SECRET is respected (skip). Self-hosted is unchanged: humans keep
using `hermes gateway enroll`.

Endpoint provenance is gateway-asserted (GATEWAY_RELAY_ENDPOINT +
GATEWAY_RELAY_ROUTE_KEYS, env or gateway.relay_* config) — uniform code path
whether the operator sets it (self-hosted) or NAS stamps it (hosted, the only
case NAS knows the public URL). Both absent -> outbound-only provisioning
(credentials, no inbound routes). The connector scopes the asserted endpoint to
the verified tenant, so it stays within the security model.

- gateway/relay/__init__.py: relay_endpoint(), relay_route_keys(),
  _provision_url(), _post_provision(), self_provision_if_managed() (never
  raises — a provision failure logs and boots without relay auth).
- gateway/run.py: call self_provision_if_managed() immediately before
  register_relay_adapter() in the startup path.

Tests: 12 unit (trigger logic, respect-pinned-secret, in-process env wiring,
endpoint+routes vs outbound-only, fail-soft on token/connector failure);
mutation-checked (drop is_managed guard / pinned-secret guard -> tests fail).
Cross-repo live E2E driver lands on the connector side (depends on this).

EXPERIMENTAL: relay auth scheme may change until >=2 Class-1 platforms validate.
This commit is contained in:
Ben Barclay 2026-06-18 15:25:29 +10:00 committed by GitHub
parent 4440d77bf3
commit 0ddd21c74e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 398 additions and 1 deletions

View file

@ -113,6 +113,227 @@ def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]:
return (key or None, host or "0.0.0.0", port)
def relay_endpoint() -> Optional[str]:
"""The gateway's own PUBLIC inbound URL, asserted to the connector at provision.
The connector delivers signed inbound POSTs to this URL and stores it on the
tenant's route rows. It is gateway-asserted (the connector scopes it to the
verified tenant, so a dishonest gateway can only misdirect its OWN inbound).
The *source* of the value differs by deployment but the code path is uniform:
a self-hosted operator sets ``GATEWAY_RELAY_ENDPOINT`` (mirrors how they set
``HERMES_DASHBOARD_PUBLIC_URL``); a hosted/NAS container has the same var
stamped in (NAS knows the public URL only in that case). Absent -> the
gateway provisions outbound-only (no inbound routes written).
Env first (Docker), then ``gateway.relay_endpoint`` in config.yaml.
"""
url = os.environ.get("GATEWAY_RELAY_ENDPOINT", "").strip()
if not url:
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = (_load_gateway_config().get("gateway") or {})
url = str(cfg.get("relay_endpoint", "") or "").strip()
except Exception: # noqa: BLE001 - config absence/parse must never crash boot
url = ""
return url.rstrip("/") or None
def relay_route_keys() -> list[str]:
"""Discriminators (guild_ids / chat_ids / paths) this gateway's tenant owns.
Gateway-provided config, paired with ``relay_endpoint()``: the connector
writes one route row per (routeKey -> tenant, endpoint), so route keys only
take effect alongside an endpoint. Empty -> outbound-only provisioning (the
connector accepts an empty set and writes no route rows).
``GATEWAY_RELAY_ROUTE_KEYS`` is comma-separated; config.yaml
``gateway.relay_route_keys`` may be a list or a comma string.
"""
raw = os.environ.get("GATEWAY_RELAY_ROUTE_KEYS", "").strip()
if not raw:
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = (_load_gateway_config().get("gateway") or {})
val = cfg.get("relay_route_keys", "")
if isinstance(val, (list, tuple)):
return [str(k).strip() for k in val if str(k).strip()]
raw = str(val or "").strip()
except Exception: # noqa: BLE001
raw = ""
return [k.strip() for k in raw.split(",") if k.strip()]
def _provision_url(relay_dial_url: str) -> str:
"""Map the ``ws(s)://…/relay`` dial URL to the ``http(s)://…/relay/provision`` POST URL."""
raw = relay_dial_url.rstrip("/")
if raw.startswith("ws://"):
raw = "http://" + raw[len("ws://"):]
elif raw.startswith("wss://"):
raw = "https://" + raw[len("wss://"):]
if raw.endswith("/relay"):
raw = raw[: -len("/relay")]
return f"{raw}/relay/provision"
def _post_provision(
*,
provision_url: str,
access_token: str,
gateway_id: str,
platform: str,
bot_id: str,
gateway_endpoint: Optional[str],
route_keys: list[str],
timeout: float = 15.0,
) -> dict:
"""POST to the connector's ``/relay/provision`` and return the JSON body.
The connector validates ``access_token`` against NAS, derives the
authoritative tenant, mints the per-gateway secret + per-tenant delivery key,
upserts the tenant's route rows, and returns
``{secret, deliveryKey, tenant, gatewayId, routeKeys}``. Raises RuntimeError
with a user-facing message on any non-2xx / transport failure.
"""
import json
import urllib.error
import urllib.request
body: dict = {
"gatewayId": gateway_id,
"platform": platform,
"botId": bot_id,
"gatewayEndpoint": gateway_endpoint or "",
"routeKeys": route_keys,
}
data = json.dumps(body).encode("utf-8")
req = urllib.request.Request(
provision_url,
data=data,
method="POST",
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
detail = ""
try:
detail = (json.loads(exc.read().decode()) or {}).get("error", "")
except Exception:
pass
raise RuntimeError(
f"connector returned HTTP {exc.code}" + (f": {detail}" if detail else "")
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"could not reach connector: {exc.reason}") from exc
if not isinstance(payload, dict) or not payload.get("secret"):
raise RuntimeError("connector returned an unexpected response (no secret)")
return payload
def self_provision_if_managed() -> bool:
"""Managed-boot self-provision: mint relay creds in-process, no human, no disk.
Fires only on a MANAGED boot (``is_managed()``) with relay configured
(``relay_url()`` set) and NO per-gateway secret already present. In that case
the runtime resolves the agent's own Nous access token (the same
``resolve_nous_access_token()`` the enroll CLI / dashboard register use),
POSTs ``/relay/provision`` asserting its own endpoint + route keys, and sets
``GATEWAY_RELAY_ID`` / ``GATEWAY_RELAY_SECRET`` / ``GATEWAY_RELAY_DELIVERY_KEY``
into ``os.environ`` so the subsequent ``register_relay_adapter()`` picks them
up. The creds live ONLY in process memory never written to ``~/.hermes/.env``
(``save_env_value`` refuses under managed anyway, and keeping the secret off
any volume is the stronger posture).
Stateless: process-env creds don't survive a restart, so a managed container
re-provisions every boot; the connector's rotation window covers a still-
connected prior instance. An explicitly-pinned ``GATEWAY_RELAY_SECRET`` (env
or config) is RESPECTED self-provision skips so an operator pin isn't
stomped.
Returns True if it provisioned, False otherwise. NEVER raises: a provision
failure logs and returns False so the gateway still boots (and
``register_relay_adapter`` will simply dial unauthenticated / be rejected,
rather than the whole gateway crashing).
"""
import logging
logger = logging.getLogger("gateway.relay")
try:
from hermes_cli.config import is_managed
except Exception: # noqa: BLE001
return False
if not is_managed():
return False
dial_url = relay_url()
if not dial_url:
return False
# Respect an already-present (pinned/stamped) secret — don't stomp it.
existing_id, existing_secret = relay_connection_auth()
if existing_id and existing_secret:
logger.info("relay self-provision skipped: GATEWAY_RELAY_SECRET already set")
return False
try:
from hermes_cli.auth import resolve_nous_access_token
access_token = resolve_nous_access_token()
except Exception as exc: # noqa: BLE001 - boot must survive a token failure
logger.warning("relay self-provision skipped: could not resolve Nous token (%s)", exc)
return False
platform, bot_id = relay_platform_identity()
# gatewayId default mirrors the enroll CLI's hostname-based slug.
import socket
try:
host = socket.gethostname().strip()
except Exception: # noqa: BLE001
host = ""
gateway_id = os.environ.get("GATEWAY_RELAY_ID", "").strip() or f"gw-{host or 'hermes'}"
endpoint = relay_endpoint()
route_keys = relay_route_keys()
try:
result = _post_provision(
provision_url=_provision_url(dial_url),
access_token=access_token,
gateway_id=gateway_id,
platform=platform,
bot_id=bot_id,
gateway_endpoint=endpoint,
route_keys=route_keys,
)
except RuntimeError as exc:
logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc)
return False
# Set creds in-process so register_relay_adapter() + relay_inbound_config()
# read them from os.environ. Never logged.
os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id)
os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "")
os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "")
tenant = str(result.get("tenant") or "")
logger.info(
"relay self-provisioned (gateway_id=%s tenant=%s routes=%d inbound=%s)",
os.environ["GATEWAY_RELAY_ID"],
tenant or "?",
len(route_keys),
"yes" if endpoint else "outbound-only",
)
return True
def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool:
"""Register the generic ``relay`` platform via the platform registry.

View file

@ -5116,7 +5116,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# adapter dials the connector over a WebSocket, negotiates its capability
# descriptor at handshake, and bridges inbound/outbound like any platform.
try:
from gateway.relay import register_relay_adapter, relay_url
from gateway.relay import (
register_relay_adapter,
relay_url,
self_provision_if_managed,
)
# Managed boot: self-provision relay creds in-process (resolve the
# agent's NAS token -> POST /relay/provision -> set GATEWAY_RELAY_* in
# os.environ) BEFORE registration reads them. No-op when not managed,
# relay unconfigured, or a secret is already pinned. Never raises.
self_provision_if_managed()
if register_relay_adapter():
logger.info("relay adapter registered (connector at %s)", relay_url())

View file

@ -0,0 +1,166 @@
"""Unit tests for managed-boot relay self-provisioning.
Covers gateway.relay.self_provision_if_managed() + the relay_endpoint() /
relay_route_keys() config readers. The connector HTTP POST is monkeypatched
(the cross-repo E2E exercises the real /relay/provision); these prove the
TRIGGER logic, in-process env wiring, and fail-soft boot behaviour.
"""
from __future__ import annotations
import pytest
import gateway.relay as relay
@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
for k in (
"GATEWAY_RELAY_URL",
"GATEWAY_RELAY_ID",
"GATEWAY_RELAY_SECRET",
"GATEWAY_RELAY_DELIVERY_KEY",
"GATEWAY_RELAY_ENDPOINT",
"GATEWAY_RELAY_ROUTE_KEYS",
"GATEWAY_RELAY_PLATFORM",
"GATEWAY_RELAY_BOT_ID",
):
monkeypatch.delenv(k, raising=False)
# Never read config.yaml off disk in these tests.
monkeypatch.setattr("gateway.run._load_gateway_config", lambda: {}, raising=False)
def _stub_post(captured: dict):
"""A fake _post_provision that records its kwargs and returns creds."""
def _fake(**kwargs):
captured.update(kwargs)
return {
"secret": "a" * 64,
"deliveryKey": "b" * 64,
"tenant": "org-tenant-x",
"gatewayId": kwargs["gateway_id"],
"routeKeys": kwargs["route_keys"],
}
return _fake
def _arm(monkeypatch, *, managed=True, url="wss://connector.example/relay", token="nas-token"):
monkeypatch.setattr("hermes_cli.config.is_managed", lambda: managed)
monkeypatch.setattr(relay, "relay_url", lambda: url)
monkeypatch.setattr("hermes_cli.auth.resolve_nous_access_token", lambda: token)
# ─────────────────────────── config readers ───────────────────────────
def test_relay_endpoint_from_env(monkeypatch):
monkeypatch.setenv("GATEWAY_RELAY_ENDPOINT", "https://gw.example.com/inbound/")
assert relay.relay_endpoint() == "https://gw.example.com/inbound"
def test_relay_endpoint_absent_is_none():
assert relay.relay_endpoint() is None
def test_relay_route_keys_csv(monkeypatch):
monkeypatch.setenv("GATEWAY_RELAY_ROUTE_KEYS", "guild-1, guild-2 ,, guild-3")
assert relay.relay_route_keys() == ["guild-1", "guild-2", "guild-3"]
def test_relay_route_keys_empty():
assert relay.relay_route_keys() == []
def test_provision_url_maps_ws_to_http():
assert relay._provision_url("wss://c.example/relay") == "https://c.example/relay/provision"
assert relay._provision_url("ws://c.example/relay") == "http://c.example/relay/provision"
assert relay._provision_url("https://c.example") == "https://c.example/relay/provision"
# ─────────────────────────── trigger logic ───────────────────────────
def test_skips_when_not_managed(monkeypatch):
_arm(monkeypatch, managed=False)
called = {"n": 0}
monkeypatch.setattr(relay, "_post_provision", lambda **k: called.__setitem__("n", called["n"] + 1) or {})
assert relay.self_provision_if_managed() is False
assert called["n"] == 0
def test_skips_when_relay_not_configured(monkeypatch):
_arm(monkeypatch, url=None)
called = {"n": 0}
monkeypatch.setattr(relay, "_post_provision", lambda **k: called.__setitem__("n", called["n"] + 1) or {})
assert relay.self_provision_if_managed() is False
assert called["n"] == 0
def test_skips_when_secret_already_pinned(monkeypatch):
_arm(monkeypatch)
monkeypatch.setenv("GATEWAY_RELAY_ID", "gw-pinned")
monkeypatch.setenv("GATEWAY_RELAY_SECRET", "deadbeef")
called = {"n": 0}
monkeypatch.setattr(relay, "_post_provision", lambda **k: called.__setitem__("n", called["n"] + 1) or {})
assert relay.self_provision_if_managed() is False
assert called["n"] == 0
# The pinned secret is untouched.
assert relay.relay_connection_auth() == ("gw-pinned", "deadbeef")
# ─────────────────────────── happy path ───────────────────────────
def test_provisions_and_sets_env_in_process(monkeypatch):
_arm(monkeypatch)
monkeypatch.setenv("GATEWAY_RELAY_ENDPOINT", "https://gw.example.com/inbound")
monkeypatch.setenv("GATEWAY_RELAY_ROUTE_KEYS", "guild-1,guild-2")
captured: dict = {}
monkeypatch.setattr(relay, "_post_provision", _stub_post(captured))
assert relay.self_provision_if_managed() is True
# The connector POST carried the gateway-asserted endpoint + route keys.
assert captured["provision_url"] == "https://connector.example/relay/provision"
assert captured["access_token"] == "nas-token"
assert captured["gateway_endpoint"] == "https://gw.example.com/inbound"
assert captured["route_keys"] == ["guild-1", "guild-2"]
# Creds landed in os.environ (in-process), so register_relay_adapter() reads them.
gid, secret = relay.relay_connection_auth()
assert gid and secret == "a" * 64
key, _host, _port = relay.relay_inbound_config()
assert key == "b" * 64
def test_outbound_only_when_no_endpoint(monkeypatch):
_arm(monkeypatch)
captured: dict = {}
monkeypatch.setattr(relay, "_post_provision", _stub_post(captured))
assert relay.self_provision_if_managed() is True
assert captured["gateway_endpoint"] is None
assert captured["route_keys"] == []
assert relay.relay_connection_auth()[1] == "a" * 64
# ─────────────────────────── fail-soft ───────────────────────────
def test_token_failure_is_non_fatal(monkeypatch):
_arm(monkeypatch)
def _boom():
raise RuntimeError("no token")
monkeypatch.setattr("hermes_cli.auth.resolve_nous_access_token", _boom)
# Must not raise; returns False; no creds set.
assert relay.self_provision_if_managed() is False
assert relay.relay_connection_auth() == (None, None)
def test_connector_failure_is_non_fatal(monkeypatch):
_arm(monkeypatch)
def _boom(**kwargs):
raise RuntimeError("connector returned HTTP 503")
monkeypatch.setattr(relay, "_post_provision", _boom)
assert relay.self_provision_if_managed() is False
assert relay.relay_connection_auth() == (None, None)