From 0ddd21c74ef255b8cf6f5793aa5409cebf4eb1a1 Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Thu, 18 Jun 2026 15:25:29 +1000 Subject: [PATCH] feat(relay): managed-boot self-provision client (Phase 3, gateway side) (#48242) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway half of relay Phase 3. On a MANAGED boot with relay configured and no secret pinned, the runtime self-provisions its relay credentials IN-PROCESS: resolve the agent's own Nous access token (resolve_nous_access_token) -> POST the connector's /relay/provision asserting its own endpoint + route keys -> set GATEWAY_RELAY_ID/SECRET/DELIVERY_KEY into os.environ so the immediately- following register_relay_adapter() reads them and dials out authenticated. No human, no enrollment token, no disk write — the creds live only in process memory (save_env_value refuses under managed anyway, and keeping the secret off any volume is the stronger posture). Stateless: process-env creds don't survive a restart, so a managed container re-provisions every boot; the connector's rotation window covers a still-connected prior instance. An explicitly-pinned GATEWAY_RELAY_SECRET is respected (skip). Self-hosted is unchanged: humans keep using `hermes gateway enroll`. Endpoint provenance is gateway-asserted (GATEWAY_RELAY_ENDPOINT + GATEWAY_RELAY_ROUTE_KEYS, env or gateway.relay_* config) — uniform code path whether the operator sets it (self-hosted) or NAS stamps it (hosted, the only case NAS knows the public URL). Both absent -> outbound-only provisioning (credentials, no inbound routes). The connector scopes the asserted endpoint to the verified tenant, so it stays within the security model. - gateway/relay/__init__.py: relay_endpoint(), relay_route_keys(), _provision_url(), _post_provision(), self_provision_if_managed() (never raises — a provision failure logs and boots without relay auth). - gateway/run.py: call self_provision_if_managed() immediately before register_relay_adapter() in the startup path. Tests: 12 unit (trigger logic, respect-pinned-secret, in-process env wiring, endpoint+routes vs outbound-only, fail-soft on token/connector failure); mutation-checked (drop is_managed guard / pinned-secret guard -> tests fail). Cross-repo live E2E driver lands on the connector side (depends on this). EXPERIMENTAL: relay auth scheme may change until >=2 Class-1 platforms validate. --- gateway/relay/__init__.py | 221 +++++++++++++++++++++ gateway/run.py | 12 +- tests/gateway/relay/test_self_provision.py | 166 ++++++++++++++++ 3 files changed, 398 insertions(+), 1 deletion(-) create mode 100644 tests/gateway/relay/test_self_provision.py diff --git a/gateway/relay/__init__.py b/gateway/relay/__init__.py index f1deb20e5c7..421fe0ac240 100644 --- a/gateway/relay/__init__.py +++ b/gateway/relay/__init__.py @@ -113,6 +113,227 @@ def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]: return (key or None, host or "0.0.0.0", port) +def relay_endpoint() -> Optional[str]: + """The gateway's own PUBLIC inbound URL, asserted to the connector at provision. + + The connector delivers signed inbound POSTs to this URL and stores it on the + tenant's route rows. It is gateway-asserted (the connector scopes it to the + verified tenant, so a dishonest gateway can only misdirect its OWN inbound). + The *source* of the value differs by deployment but the code path is uniform: + a self-hosted operator sets ``GATEWAY_RELAY_ENDPOINT`` (mirrors how they set + ``HERMES_DASHBOARD_PUBLIC_URL``); a hosted/NAS container has the same var + stamped in (NAS knows the public URL only in that case). Absent -> the + gateway provisions outbound-only (no inbound routes written). + + Env first (Docker), then ``gateway.relay_endpoint`` in config.yaml. + """ + url = os.environ.get("GATEWAY_RELAY_ENDPOINT", "").strip() + if not url: + try: + from gateway.run import _load_gateway_config # late import to avoid cycle + + cfg = (_load_gateway_config().get("gateway") or {}) + url = str(cfg.get("relay_endpoint", "") or "").strip() + except Exception: # noqa: BLE001 - config absence/parse must never crash boot + url = "" + return url.rstrip("/") or None + + +def relay_route_keys() -> list[str]: + """Discriminators (guild_ids / chat_ids / paths) this gateway's tenant owns. + + Gateway-provided config, paired with ``relay_endpoint()``: the connector + writes one route row per (routeKey -> tenant, endpoint), so route keys only + take effect alongside an endpoint. Empty -> outbound-only provisioning (the + connector accepts an empty set and writes no route rows). + + ``GATEWAY_RELAY_ROUTE_KEYS`` is comma-separated; config.yaml + ``gateway.relay_route_keys`` may be a list or a comma string. + """ + raw = os.environ.get("GATEWAY_RELAY_ROUTE_KEYS", "").strip() + if not raw: + try: + from gateway.run import _load_gateway_config # late import to avoid cycle + + cfg = (_load_gateway_config().get("gateway") or {}) + val = cfg.get("relay_route_keys", "") + if isinstance(val, (list, tuple)): + return [str(k).strip() for k in val if str(k).strip()] + raw = str(val or "").strip() + except Exception: # noqa: BLE001 + raw = "" + return [k.strip() for k in raw.split(",") if k.strip()] + + +def _provision_url(relay_dial_url: str) -> str: + """Map the ``ws(s)://…/relay`` dial URL to the ``http(s)://…/relay/provision`` POST URL.""" + raw = relay_dial_url.rstrip("/") + if raw.startswith("ws://"): + raw = "http://" + raw[len("ws://"):] + elif raw.startswith("wss://"): + raw = "https://" + raw[len("wss://"):] + if raw.endswith("/relay"): + raw = raw[: -len("/relay")] + return f"{raw}/relay/provision" + + +def _post_provision( + *, + provision_url: str, + access_token: str, + gateway_id: str, + platform: str, + bot_id: str, + gateway_endpoint: Optional[str], + route_keys: list[str], + timeout: float = 15.0, +) -> dict: + """POST to the connector's ``/relay/provision`` and return the JSON body. + + The connector validates ``access_token`` against NAS, derives the + authoritative tenant, mints the per-gateway secret + per-tenant delivery key, + upserts the tenant's route rows, and returns + ``{secret, deliveryKey, tenant, gatewayId, routeKeys}``. Raises RuntimeError + with a user-facing message on any non-2xx / transport failure. + """ + import json + import urllib.error + import urllib.request + + body: dict = { + "gatewayId": gateway_id, + "platform": platform, + "botId": bot_id, + "gatewayEndpoint": gateway_endpoint or "", + "routeKeys": route_keys, + } + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + provision_url, + data=data, + method="POST", + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + "Accept": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + payload = json.loads(resp.read().decode()) + except urllib.error.HTTPError as exc: + detail = "" + try: + detail = (json.loads(exc.read().decode()) or {}).get("error", "") + except Exception: + pass + raise RuntimeError( + f"connector returned HTTP {exc.code}" + (f": {detail}" if detail else "") + ) from exc + except urllib.error.URLError as exc: + raise RuntimeError(f"could not reach connector: {exc.reason}") from exc + + if not isinstance(payload, dict) or not payload.get("secret"): + raise RuntimeError("connector returned an unexpected response (no secret)") + return payload + + +def self_provision_if_managed() -> bool: + """Managed-boot self-provision: mint relay creds in-process, no human, no disk. + + Fires only on a MANAGED boot (``is_managed()``) with relay configured + (``relay_url()`` set) and NO per-gateway secret already present. In that case + the runtime resolves the agent's own Nous access token (the same + ``resolve_nous_access_token()`` the enroll CLI / dashboard register use), + POSTs ``/relay/provision`` asserting its own endpoint + route keys, and sets + ``GATEWAY_RELAY_ID`` / ``GATEWAY_RELAY_SECRET`` / ``GATEWAY_RELAY_DELIVERY_KEY`` + into ``os.environ`` so the subsequent ``register_relay_adapter()`` picks them + up. The creds live ONLY in process memory — never written to ``~/.hermes/.env`` + (``save_env_value`` refuses under managed anyway, and keeping the secret off + any volume is the stronger posture). + + Stateless: process-env creds don't survive a restart, so a managed container + re-provisions every boot; the connector's rotation window covers a still- + connected prior instance. An explicitly-pinned ``GATEWAY_RELAY_SECRET`` (env + or config) is RESPECTED — self-provision skips so an operator pin isn't + stomped. + + Returns True if it provisioned, False otherwise. NEVER raises: a provision + failure logs and returns False so the gateway still boots (and + ``register_relay_adapter`` will simply dial unauthenticated / be rejected, + rather than the whole gateway crashing). + """ + import logging + + logger = logging.getLogger("gateway.relay") + + try: + from hermes_cli.config import is_managed + except Exception: # noqa: BLE001 + return False + + if not is_managed(): + return False + dial_url = relay_url() + if not dial_url: + return False + + # Respect an already-present (pinned/stamped) secret — don't stomp it. + existing_id, existing_secret = relay_connection_auth() + if existing_id and existing_secret: + logger.info("relay self-provision skipped: GATEWAY_RELAY_SECRET already set") + return False + + try: + from hermes_cli.auth import resolve_nous_access_token + + access_token = resolve_nous_access_token() + except Exception as exc: # noqa: BLE001 - boot must survive a token failure + logger.warning("relay self-provision skipped: could not resolve Nous token (%s)", exc) + return False + + platform, bot_id = relay_platform_identity() + # gatewayId default mirrors the enroll CLI's hostname-based slug. + import socket + + try: + host = socket.gethostname().strip() + except Exception: # noqa: BLE001 + host = "" + gateway_id = os.environ.get("GATEWAY_RELAY_ID", "").strip() or f"gw-{host or 'hermes'}" + endpoint = relay_endpoint() + route_keys = relay_route_keys() + + try: + result = _post_provision( + provision_url=_provision_url(dial_url), + access_token=access_token, + gateway_id=gateway_id, + platform=platform, + bot_id=bot_id, + gateway_endpoint=endpoint, + route_keys=route_keys, + ) + except RuntimeError as exc: + logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc) + return False + + # Set creds in-process so register_relay_adapter() + relay_inbound_config() + # read them from os.environ. Never logged. + os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id) + os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "") + os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "") + tenant = str(result.get("tenant") or "") + logger.info( + "relay self-provisioned (gateway_id=%s tenant=%s routes=%d inbound=%s)", + os.environ["GATEWAY_RELAY_ID"], + tenant or "?", + len(route_keys), + "yes" if endpoint else "outbound-only", + ) + return True + + def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool: """Register the generic ``relay`` platform via the platform registry. diff --git a/gateway/run.py b/gateway/run.py index 4b41cfc6aec..8f139341793 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5116,7 +5116,17 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # adapter dials the connector over a WebSocket, negotiates its capability # descriptor at handshake, and bridges inbound/outbound like any platform. try: - from gateway.relay import register_relay_adapter, relay_url + from gateway.relay import ( + register_relay_adapter, + relay_url, + self_provision_if_managed, + ) + + # Managed boot: self-provision relay creds in-process (resolve the + # agent's NAS token -> POST /relay/provision -> set GATEWAY_RELAY_* in + # os.environ) BEFORE registration reads them. No-op when not managed, + # relay unconfigured, or a secret is already pinned. Never raises. + self_provision_if_managed() if register_relay_adapter(): logger.info("relay adapter registered (connector at %s)", relay_url()) diff --git a/tests/gateway/relay/test_self_provision.py b/tests/gateway/relay/test_self_provision.py new file mode 100644 index 00000000000..4b4a6070e7e --- /dev/null +++ b/tests/gateway/relay/test_self_provision.py @@ -0,0 +1,166 @@ +"""Unit tests for managed-boot relay self-provisioning. + +Covers gateway.relay.self_provision_if_managed() + the relay_endpoint() / +relay_route_keys() config readers. The connector HTTP POST is monkeypatched +(the cross-repo E2E exercises the real /relay/provision); these prove the +TRIGGER logic, in-process env wiring, and fail-soft boot behaviour. +""" + +from __future__ import annotations + +import pytest + +import gateway.relay as relay + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + for k in ( + "GATEWAY_RELAY_URL", + "GATEWAY_RELAY_ID", + "GATEWAY_RELAY_SECRET", + "GATEWAY_RELAY_DELIVERY_KEY", + "GATEWAY_RELAY_ENDPOINT", + "GATEWAY_RELAY_ROUTE_KEYS", + "GATEWAY_RELAY_PLATFORM", + "GATEWAY_RELAY_BOT_ID", + ): + monkeypatch.delenv(k, raising=False) + # Never read config.yaml off disk in these tests. + monkeypatch.setattr("gateway.run._load_gateway_config", lambda: {}, raising=False) + + +def _stub_post(captured: dict): + """A fake _post_provision that records its kwargs and returns creds.""" + + def _fake(**kwargs): + captured.update(kwargs) + return { + "secret": "a" * 64, + "deliveryKey": "b" * 64, + "tenant": "org-tenant-x", + "gatewayId": kwargs["gateway_id"], + "routeKeys": kwargs["route_keys"], + } + + return _fake + + +def _arm(monkeypatch, *, managed=True, url="wss://connector.example/relay", token="nas-token"): + monkeypatch.setattr("hermes_cli.config.is_managed", lambda: managed) + monkeypatch.setattr(relay, "relay_url", lambda: url) + monkeypatch.setattr("hermes_cli.auth.resolve_nous_access_token", lambda: token) + + +# ─────────────────────────── config readers ─────────────────────────── + +def test_relay_endpoint_from_env(monkeypatch): + monkeypatch.setenv("GATEWAY_RELAY_ENDPOINT", "https://gw.example.com/inbound/") + assert relay.relay_endpoint() == "https://gw.example.com/inbound" + + +def test_relay_endpoint_absent_is_none(): + assert relay.relay_endpoint() is None + + +def test_relay_route_keys_csv(monkeypatch): + monkeypatch.setenv("GATEWAY_RELAY_ROUTE_KEYS", "guild-1, guild-2 ,, guild-3") + assert relay.relay_route_keys() == ["guild-1", "guild-2", "guild-3"] + + +def test_relay_route_keys_empty(): + assert relay.relay_route_keys() == [] + + +def test_provision_url_maps_ws_to_http(): + assert relay._provision_url("wss://c.example/relay") == "https://c.example/relay/provision" + assert relay._provision_url("ws://c.example/relay") == "http://c.example/relay/provision" + assert relay._provision_url("https://c.example") == "https://c.example/relay/provision" + + +# ─────────────────────────── trigger logic ─────────────────────────── + +def test_skips_when_not_managed(monkeypatch): + _arm(monkeypatch, managed=False) + called = {"n": 0} + monkeypatch.setattr(relay, "_post_provision", lambda **k: called.__setitem__("n", called["n"] + 1) or {}) + assert relay.self_provision_if_managed() is False + assert called["n"] == 0 + + +def test_skips_when_relay_not_configured(monkeypatch): + _arm(monkeypatch, url=None) + called = {"n": 0} + monkeypatch.setattr(relay, "_post_provision", lambda **k: called.__setitem__("n", called["n"] + 1) or {}) + assert relay.self_provision_if_managed() is False + assert called["n"] == 0 + + +def test_skips_when_secret_already_pinned(monkeypatch): + _arm(monkeypatch) + monkeypatch.setenv("GATEWAY_RELAY_ID", "gw-pinned") + monkeypatch.setenv("GATEWAY_RELAY_SECRET", "deadbeef") + called = {"n": 0} + monkeypatch.setattr(relay, "_post_provision", lambda **k: called.__setitem__("n", called["n"] + 1) or {}) + assert relay.self_provision_if_managed() is False + assert called["n"] == 0 + # The pinned secret is untouched. + assert relay.relay_connection_auth() == ("gw-pinned", "deadbeef") + + +# ─────────────────────────── happy path ─────────────────────────── + +def test_provisions_and_sets_env_in_process(monkeypatch): + _arm(monkeypatch) + monkeypatch.setenv("GATEWAY_RELAY_ENDPOINT", "https://gw.example.com/inbound") + monkeypatch.setenv("GATEWAY_RELAY_ROUTE_KEYS", "guild-1,guild-2") + captured: dict = {} + monkeypatch.setattr(relay, "_post_provision", _stub_post(captured)) + + assert relay.self_provision_if_managed() is True + # The connector POST carried the gateway-asserted endpoint + route keys. + assert captured["provision_url"] == "https://connector.example/relay/provision" + assert captured["access_token"] == "nas-token" + assert captured["gateway_endpoint"] == "https://gw.example.com/inbound" + assert captured["route_keys"] == ["guild-1", "guild-2"] + # Creds landed in os.environ (in-process), so register_relay_adapter() reads them. + gid, secret = relay.relay_connection_auth() + assert gid and secret == "a" * 64 + key, _host, _port = relay.relay_inbound_config() + assert key == "b" * 64 + + +def test_outbound_only_when_no_endpoint(monkeypatch): + _arm(monkeypatch) + captured: dict = {} + monkeypatch.setattr(relay, "_post_provision", _stub_post(captured)) + + assert relay.self_provision_if_managed() is True + assert captured["gateway_endpoint"] is None + assert captured["route_keys"] == [] + assert relay.relay_connection_auth()[1] == "a" * 64 + + +# ─────────────────────────── fail-soft ─────────────────────────── + +def test_token_failure_is_non_fatal(monkeypatch): + _arm(monkeypatch) + + def _boom(): + raise RuntimeError("no token") + + monkeypatch.setattr("hermes_cli.auth.resolve_nous_access_token", _boom) + # Must not raise; returns False; no creds set. + assert relay.self_provision_if_managed() is False + assert relay.relay_connection_auth() == (None, None) + + +def test_connector_failure_is_non_fatal(monkeypatch): + _arm(monkeypatch) + + def _boom(**kwargs): + raise RuntimeError("connector returned HTTP 503") + + monkeypatch.setattr(relay, "_post_provision", _boom) + assert relay.self_provision_if_managed() is False + assert relay.relay_connection_auth() == (None, None)