feat(relay): connector⇄gateway channel auth + signed-HTTP inbound receiver + enroll CLI (#48147)

* feat(relay): authenticate the connector⇄gateway WS channel

The relay gateway may be customer-managed and internet-exposed, so the
connector⇄gateway channel is itself authenticated (distinct from the
platform crypto the relay path sheds). Add gateway/relay/auth.py — a
Python port of the connector's HMAC token + delivery-signature schemes
(relayAuthToken.ts / deliverySigning.ts), verified byte-for-byte against
the connector's compiled TypeScript via cross-language test vectors.

Present an Authorization bearer on the /relay WS upgrade keyed by the
per-gateway secret (resolved from GATEWAY_RELAY_ID / GATEWAY_RELAY_SECRET
in env or config). The connector rejects an unauthenticated/invalid/
revoked upgrade with close 4401.

* feat(relay): signed-HTTP inbound delivery receiver

The connector delivers normalized inbound events to a tenant's gateway
over a signed HTTP POST, not the outbound /relay WS: the connector
instance owning a platform socket is generally not the instance a given
gateway dialed out to, so inbound targets a tenant endpoint that may
load-balance across gateway instances.

Add gateway/relay/inbound_receiver.py — verifies x-relay-signature /
x-relay-timestamp over the EXACT raw request bytes (re-serializing would
break the HMAC: JS JSON.stringify is compact, Python json.dumps spaces)
against the per-tenant delivery key verify list within a 300s replay
window, then dispatches messages to handle_message and interrupts to the
interrupt handler. Wire it into the adapter lifecycle (start in connect()
when a delivery key + bind port are configured, tear down in disconnect();
a purely-outbound dev gateway runs without it).

Refine test_relay_sheds_crypto to distinguish PLATFORM crypto (Discord
ed25519, Twilio/WeCom HMAC — still shed) from the connector⇄gateway
CHANNEL auth (intended): auth.py / inbound_receiver.py are exempt from
the platform-symbol scan but still banned from importing platform-crypto
modules, plus a positive guard that auth.py uses only stdlib hmac/hashlib.

* feat(relay): hermes gateway enroll CLI

Add the gateway half of zero-touch enrollment. `hermes gateway enroll`
resolves a fresh Nous Portal access token (the tenant-proving identity),
POSTs {enrollmentToken, gatewayId} to the connector's /relay/enroll, and
persists GATEWAY_RELAY_ID / GATEWAY_RELAY_SECRET / GATEWAY_RELAY_DELIVERY_KEY
to ~/.hermes/.env. The per-gateway secret authenticates the WS upgrade;
the per-tenant delivery key verifies signed inbound deliveries.

Refuses under is_managed() (hosted installs get the secret stamped in by
the orchestrator). Added as an 'enroll' subcommand on the existing
gateway subparser — not a new top-level command.

* docs(relay): inbound is signed HTTP, not WS; document channel auth

Fix the stale contract: §3/§5 said inbound rode the WS socket (single-
instance only, predates the multi-instance socket-ownership + channel-auth
model). Inbound + connector→gateway interrupt are signed HTTP POSTs to the
tenant endpoint. Add §6.1 documenting the two channel-auth schemes (per-
gateway WS-upgrade secret, per-tenant inbound delivery key) and how they
differ from the platform crypto the relay path sheds.

* test(relay): update build_gateway_parser callers for cmd_gateway_enroll

The enroll subcommand added cmd_gateway_enroll as a required keyword-only
arg to build_gateway_parser, but two existing parser-extraction tests still
called it with only cmd_gateway/cmd_proxy — failing CI with TypeError.
Thread the new handler through both call sites and add a test asserting
`gateway enroll` dispatches to cmd_gateway_enroll with its flags parsed.
This commit is contained in:
Ben Barclay 2026-06-18 12:01:54 +10:00 committed by GitHub
parent fcf6cb3d73
commit c276b017ad
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1279 additions and 16 deletions

View file

@ -61,10 +61,34 @@ live platform adapter's capability methods.
## 3. Inbound: `MessageEvent` envelope
The connector normalizes each platform wire event into a `MessageEvent`
(`gateway/platforms/base.py`) and delivers it to the gateway's inbound handler.
The gateway keys the session via `build_session_key()` from the embedded
`SessionSource` — so populating the right discriminators is the single
highest-correctness responsibility of the connector.
(`gateway/platforms/base.py`) and delivers it to the gateway. **Inbound is
delivered over a signed HTTP POST, not the outbound `/relay` WebSocket** (see
the transport note below). The gateway keys the session via `build_session_key()`
from the embedded `SessionSource` — so populating the right discriminators is
the single highest-correctness responsibility of the connector.
### Inbound transport (signed HTTP POST, not the outbound WS)
The gateway dials **out** to the connector's `/relay` WebSocket for the
handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound,
however, is delivered the other way: the connector **POSTs** the normalized
event to the gateway's inbound endpoint (`HttpGatewayDelivery` on the connector;
`gateway/relay/inbound_receiver.py` on the gateway). The reason is
multi-instance: the connector instance that owns a platform's socket (and thus
produces inbound events) is generally **not** the instance a given gateway
dialed its outbound WS into, so inbound must target a tenant **endpoint** (which
may load-balance across gateway instances) rather than ride one gateway's
outbound socket. Each delivery is HMAC-signed with the per-tenant **delivery
key** (§6.1); the gateway verifies the signature over the exact raw bytes before
accepting the event. Two POST targets:
- `POST {gatewayEndpoint}``{"type":"message", "event": <MessageEvent>}`
- `POST {gatewayEndpoint}/interrupt``{"type":"interrupt", "session_key", "reason"?}` (§5)
> An earlier draft of this contract delivered inbound over the WS `inbound`
> frame. That only works single-instance and predates the multi-instance
> socket-ownership + channel-auth model; the signed-HTTP path above is the
> shipped design.
### SessionSource fields (the wire surface)
@ -151,14 +175,16 @@ gateway holds zero capability material). Source of truth:
## 5. Interrupt (`/stop`) routing
- **Gateway → connector:** `send_interrupt(session_key, reason?)` egresses a
mid-turn `/stop`. The connector MUST forward it down the socket owned by the
mid-turn `/stop` over the outbound WS. The connector MUST forward it to the
gateway instance running that `session_key` (the routing invariant).
- **Connector → gateway:** an inbound interrupt for a `session_key` is bridged
by the adapter's `on_interrupt(session_key, chat_id)` into the existing
per-session interrupt mechanism, cancelling exactly that turn (siblings
untouched).
- **Connector → gateway:** an inbound interrupt for a `session_key` is delivered
as a **signed HTTP POST** to `{gatewayEndpoint}/interrupt` (§3 transport note),
and bridged by the adapter's `on_interrupt(session_key, chat_id)` into the
existing per-session interrupt mechanism, cancelling exactly that turn
(siblings untouched).
The interrupt rides the same per-turn bidirectional socket as inbound/outbound.
The gateway→connector `/stop` rides the outbound WS; the connector→gateway
interrupt rides the same signed-HTTP inbound path as a normalized event.
---
@ -201,6 +227,27 @@ relay planes — both are "verify at the edge → emit a normalized event," diff
only in transport. See `docs/capability-trust-boundary.md` (connector repo:
`gateway-gateway`) for the full A2 rationale and the connector-side vault.
### 6.1 Channel authentication (the connector⇄gateway link itself)
A2 makes the connector the sole holder of platform secrets while the gateway may
be **customer-managed and internet-exposed**, so the connector⇄gateway channel
is itself authenticated. The gateway holds two enrollment-issued credentials
(`hermes gateway enroll` → connector `/relay/enroll`): a **per-gateway secret**
and a **per-tenant delivery key**. Both are HMAC-SHA256 schemes with a
multi-secret rotation verify list (gateway side: `gateway/relay/auth.py`;
connector side: `src/core/relayAuthToken.ts` + `src/core/deliverySigning.ts`).
| Leg | Credential | Mechanism |
|-----|-----------|-----------|
| Gateway → connector WS upgrade | per-gateway secret | An `Authorization` bearer header on the `/relay` upgrade. The token is `base64url(payload:exp:sig)` where `payload = gatewayId` and `sig = HMAC(payload:exp, secret)`. Connector verifies and rejects the upgrade (**close 4401**) on mismatch/absence/revocation. The authenticated tenant comes from the connector's store, never the `hello` frame. |
| Connector → gateway inbound POST | per-tenant delivery key | Two headers: `x-relay-timestamp` (unix seconds) and `x-relay-signature` (hex `HMAC(ts.rawBody, deliveryKey)`). Gateway verifies over the **exact raw bytes** within a ±300s replay window before accepting the event; rejects **401** otherwise. |
This is the **channel** authenticator — distinct from platform crypto, which the
relay path still sheds entirely (§6). The gateway holds zero platform secrets;
these two keys authenticate only the connector link. Full threat model +
enrollment/rotation/kill-switch design: `docs/connector-gateway-auth-design.md`
(connector repo).
---
## 7. Versioning policy

View file

@ -55,6 +55,64 @@ def relay_platform_identity() -> tuple[str, str]:
return platform, bot_id
def relay_connection_auth() -> tuple[Optional[str], Optional[str]]:
"""The (gateway_id, upgrade_secret) this gateway authenticates the WS upgrade with.
Both come from enrollment (``hermes gateway enroll`` writes them to
``~/.hermes/.env``): ``GATEWAY_RELAY_ID`` identifies the enrolled instance,
``GATEWAY_RELAY_SECRET`` is the per-gateway signing secret. Either absent ->
``(None, None)`` and the transport dials unauthenticated (dev/test, or a
connector that doesn't enforce auth). Checks env first (Docker), then
``gateway.relay_id`` / ``gateway.relay_secret`` in config.yaml.
"""
gateway_id = os.environ.get("GATEWAY_RELAY_ID", "").strip()
secret = os.environ.get("GATEWAY_RELAY_SECRET", "").strip()
if not (gateway_id and secret):
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = (_load_gateway_config().get("gateway") or {})
gateway_id = gateway_id or str(cfg.get("relay_id", "") or "").strip()
secret = secret or str(cfg.get("relay_secret", "") or "").strip()
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
pass
return (gateway_id or None, secret or None)
def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]:
"""Resolve (delivery_key, bind_host, bind_port) for the inbound receiver.
The connector delivers normalized inbound events to this gateway over a
SIGNED HTTP POST (not the outbound WS), verified with the per-tenant delivery
key issued at enrollment (``GATEWAY_RELAY_DELIVERY_KEY``). The receiver only
starts when a delivery key AND a bind port are configured a gateway with no
public inbound URL (e.g. a purely outbound dev run) simply doesn't run it.
Env first (Docker), then ``gateway.relay_delivery_key`` /
``gateway.relay_inbound_host`` / ``gateway.relay_inbound_port`` in config.yaml.
Port 0 (default/unset) -> receiver disabled.
"""
key = os.environ.get("GATEWAY_RELAY_DELIVERY_KEY", "").strip()
host = os.environ.get("GATEWAY_RELAY_INBOUND_HOST", "").strip()
port_raw = os.environ.get("GATEWAY_RELAY_INBOUND_PORT", "").strip()
if not (key and port_raw):
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = (_load_gateway_config().get("gateway") or {})
key = key or str(cfg.get("relay_delivery_key", "") or "").strip()
host = host or str(cfg.get("relay_inbound_host", "") or "").strip()
if not port_raw:
port_raw = str(cfg.get("relay_inbound_port", "") or "").strip()
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
pass
try:
port = int(port_raw) if port_raw else 0
except ValueError:
port = 0
return (key or None, host or "0.0.0.0", port)
def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool:
"""Register the generic ``relay`` platform via the platform registry.
@ -96,7 +154,14 @@ def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bo
if resolved_url:
from gateway.relay.ws_transport import WebSocketRelayTransport
transport = WebSocketRelayTransport(resolved_url, platform, bot_id)
gateway_id, upgrade_secret = relay_connection_auth()
transport = WebSocketRelayTransport(
resolved_url,
platform,
bot_id,
gateway_id=gateway_id,
upgrade_secret=upgrade_secret,
)
return RelayAdapter(config, placeholder, transport=transport)
platform_registry.register(

View file

@ -58,6 +58,10 @@ class RelayAdapter(BasePlatformAdapter):
# Capability surface read by stream_consumer (getattr(..., 4096)).
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
# Inbound delivery receiver (signed connector→gateway HTTP POSTs). Built
# lazily in connect() when a delivery key + bind port are configured; a
# purely-outbound dev gateway runs without it. See inbound_receiver.py.
self._inbound_runner: Any = None
# ── capability surface (from descriptor) ─────────────────────────────
@property
@ -88,8 +92,40 @@ class RelayAdapter(BasePlatformAdapter):
logger.warning("relay handshake failed: %s", exc)
return False
self._apply_descriptor(descriptor)
# Start the signed inbound-delivery receiver if configured (the connector
# POSTs normalized events to it over HTTP, verified with the tenant
# delivery key). Non-fatal: a receiver bind failure must not fail the
# outbound connection — the gateway can still send.
await self._maybe_start_inbound_receiver()
return True
async def _maybe_start_inbound_receiver(self) -> None:
"""Start the inbound HTTP receiver when a delivery key + port are set."""
from gateway.relay import relay_inbound_config
delivery_key, host, port = relay_inbound_config()
if not (delivery_key and port):
return # no inbound URL configured -> outbound-only gateway
try:
from aiohttp import web
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
receiver = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [delivery_key],
on_message=self._on_inbound,
on_interrupt=self.on_interrupt,
)
runner = web.AppRunner(receiver.build_app(), access_log=None)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
self._inbound_runner = runner
logger.info("relay inbound receiver listening on http://%s:%s", host, port)
except Exception as exc: # noqa: BLE001 - inbound bind failure must not kill outbound
logger.warning("relay inbound receiver failed to start: %s", exc)
self._inbound_runner = None
def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None:
"""Adopt a (re)negotiated descriptor into the live capability surface."""
self.descriptor = descriptor
@ -112,6 +148,12 @@ class RelayAdapter(BasePlatformAdapter):
await self.interrupt_session_activity(session_key, chat_id)
async def disconnect(self) -> None:
if self._inbound_runner is not None:
try:
await self._inbound_runner.cleanup()
except Exception: # noqa: BLE001 - best-effort teardown
pass
self._inbound_runner = None
if self._transport is not None:
await self._transport.disconnect()

168
gateway/relay/auth.py Normal file
View file

@ -0,0 +1,168 @@
"""Gateway-side relay authentication primitives. EXPERIMENTAL.
The connectorgateway channel is authenticated because a gateway may be
customer-managed and internet-exposed (see the connector repo
``docs/connector-gateway-auth-design.md``). This module is the **gateway half**
of two HMAC schemes whose wire bytes must match the connector's TypeScript
exactly:
1. **WS upgrade auth** (gateway connector): the gateway presents
``Authorization: Bearer <token>`` on the ``/relay`` WebSocket upgrade, where
``token = make_upgrade_token(gateway_id, secret)``. Mirrors the connector's
``relayAuthToken.ts`` ``makeToken`` (``src/core/relayAuthToken.ts``):
``base64url(f"{payload}:{exp}:{sig}")`` with
``sig = HMAC_SHA256(f"{payload}:{exp}", secret).hexdigest()`` and
``payload == gateway_id``.
2. **Inbound delivery signature** (connector gateway): the connector signs
each inbound POST with the per-tenant *delivery key*, carried as
``x-relay-timestamp`` + ``x-relay-signature`` headers; the gateway verifies
before accepting the event. Mirrors the connector's ``deliverySigning.ts``:
``sig = HMAC_SHA256(f"{ts}.{body_json}", key).hexdigest()`` over the EXACT
request body bytes, with a replay-window skew check.
Both schemes use a **multi-secret verify list** (primary first, then a secondary
during a rotation window), exactly like ``api/src/handlers/stats_oauth.ts`` so
a secret rotation doesn't invalidate outstanding tokens.
EXPERIMENTAL: may change without a deprecation cycle until 2 Class-1 platforms
validate the relay contract.
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import time
from typing import Optional, Sequence
# Header names the connector uses for inbound delivery signatures
# (connector ``src/core/deliverySigning.ts`` — DELIVERY_TS_HEADER / SIG_HEADER).
DELIVERY_TS_HEADER = "x-relay-timestamp"
DELIVERY_SIG_HEADER = "x-relay-signature"
# Default replay window for an inbound delivery signature (connector default).
_DEFAULT_MAX_SKEW_SECONDS = 300
# Default TTL for an upgrade token (connector ``makeUpgradeToken`` default).
_DEFAULT_UPGRADE_TTL_SECONDS = 300
def _hmac_hex(payload: str, secret: str) -> str:
"""HMAC-SHA256 hex digest of ``payload`` under ``secret`` (UTF-8)."""
return hmac.new(secret.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256).hexdigest()
def sign(payload: str, secret: str) -> str:
"""HMAC-SHA256 hex digest — the connector's ``sign`` (relayAuthToken.ts)."""
return _hmac_hex(payload, secret)
def verify_signature(payload: str, sig_hex: str, secrets: Sequence[str]) -> bool:
"""Constant-time check that ``sig_hex`` is a valid HMAC of ``payload`` under
ANY of ``secrets`` (rotation window). Length-mismatched candidates are
skipped without a timing leak. Mirrors ``verifySignature``.
"""
try:
sig_buf = bytes.fromhex(sig_hex)
except (ValueError, TypeError):
return False
if len(sig_buf) == 0:
return False
for secret in secrets:
if not secret:
continue
expected = bytes.fromhex(_hmac_hex(payload, secret))
if len(expected) != len(sig_buf):
continue
if hmac.compare_digest(sig_buf, expected):
return True
return False
def make_token(payload: str, secret: str, ttl_seconds: int = 0) -> str:
"""Build a signed, optionally-expiring token — the connector's ``makeToken``.
``base64url(f"{payload}:{exp}:{sig}")`` where ``exp`` is a unix-seconds
expiry (0 = never) and ``sig = HMAC_SHA256(f"{payload}:{exp}", secret)``.
base64url is unpadded to match Node's ``Buffer.toString("base64url")``.
"""
exp = int(time.time()) + ttl_seconds if ttl_seconds > 0 else 0
signed = f"{payload}:{exp}"
sig = _hmac_hex(signed, secret)
raw = f"{signed}:{sig}".encode("utf-8")
return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
def make_upgrade_token(
gateway_id: str, secret: str, ttl_seconds: int = _DEFAULT_UPGRADE_TTL_SECONDS
) -> str:
"""The WS-upgrade bearer token a gateway sends: ``payload = gateway_id``.
The connector peeks ``gateway_id`` (the payload head) to index its secret
verify list, then verifies the signature against that gateway's stored
secret(s). Mirrors the connector's ``makeUpgradeToken``.
"""
return make_token(gateway_id, secret, ttl_seconds)
def verify_token(token: str, secrets: Sequence[str]) -> Optional[str]:
"""Verify a token built by ``make_token``; return the payload or None.
Splits from the right so a payload may itself contain colons (mirrors the
connector's ``verifyToken``). Rejects an expired token and any signature
that doesn't match a secret in the verify list.
"""
try:
# base64url decode with padding restored.
padded = token + "=" * (-len(token) % 4)
decoded = base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")
except (ValueError, TypeError):
return None
parts = decoded.split(":")
if len(parts) < 3:
return None
sig = parts[-1]
try:
exp = int(parts[-2])
except ValueError:
return None
payload = ":".join(parts[:-2])
if exp != 0 and int(time.time()) > exp:
return None
signed = f"{payload}:{exp}"
return payload if verify_signature(signed, sig, secrets) else None
def _delivery_payload(ts: int, body_json: str) -> str:
"""Signed material for an inbound delivery: ``f"{ts}.{body_json}"``."""
return f"{ts}.{body_json}"
def verify_delivery_signature(
body_json: str,
timestamp: Optional[str],
signature: Optional[str],
verify_keys: Sequence[str],
max_skew_seconds: int = _DEFAULT_MAX_SKEW_SECONDS,
*,
now: Optional[int] = None,
) -> bool:
"""Verify a connector→gateway inbound delivery signature.
``body_json`` MUST be the exact request body bytes decoded as UTF-8 the
connector signs over the literal serialized body, so the gateway verifies
over the literal received body (no re-serialization). Checks the timestamp
is within ``max_skew_seconds`` of now and the HMAC matches any key in the
rotation verify list. Mirrors the connector's ``verifyDeliverySignature``.
"""
if not timestamp or not signature:
return False
try:
ts = int(timestamp)
except (ValueError, TypeError):
return False
current = now if now is not None else int(time.time())
if abs(current - ts) > max_skew_seconds:
return False
return verify_signature(_delivery_payload(ts, body_json), signature, verify_keys)

View file

@ -0,0 +1,204 @@
"""Gateway-side inbound delivery receiver. EXPERIMENTAL.
The connector delivers normalized inbound events to a tenant's gateway over a
**signed HTTP POST** (connector ``src/relay/httpGatewayDelivery.ts``), NOT over
the gateway's outbound ``/relay`` WebSocket: the connector instance that owns a
platform socket is generally not the instance a given gateway dialed out to, so
inbound is delivered to a tenant ENDPOINT (which may load-balance across gateway
instances). Each delivery is HMAC-signed with the per-tenant **delivery key**
(``gateway/relay/auth.py``); this receiver verifies the signature over the EXACT
raw request bytes before accepting the event.
Two routes (mirroring the connector's two POST targets):
POST {base} {"type":"message", "event": <MessageEvent>, ...}
POST {base}/interrupt {"type":"interrupt","session_key": ..., "reason"?}
The receiver:
1. reads the RAW body bytes (never a reparsed/re-serialized form the HMAC is
over the literal bytes the connector signed),
2. verifies ``x-relay-signature`` / ``x-relay-timestamp`` against the delivery
key verify list (primary + secondary during rotation), within the replay
window rejects 401 on any failure,
3. parses the JSON and dispatches: a ``message`` to the inbound handler (the
RelayAdapter's ``handle_message`` via the transport's normal path), an
``interrupt`` to the interrupt handler.
EXPERIMENTAL: the transport protocol may change without a deprecation cycle
until 2 Class-1 platforms validate it. See docs/relay-connector-contract.md.
"""
from __future__ import annotations
import json
import logging
from typing import Any, Awaitable, Callable, Optional, Sequence
from gateway.platforms.base import MessageEvent
from gateway.relay.auth import (
DELIVERY_SIG_HEADER,
DELIVERY_TS_HEADER,
verify_delivery_signature,
)
logger = logging.getLogger(__name__)
# Callbacks the receiver dispatches verified deliveries to.
InboundMessageHandler = Callable[[MessageEvent], Awaitable[None]]
InboundInterruptHandler = Callable[[str, str], Awaitable[None]]
try: # lazy/optional dep — mirrors the other HTTP-receiving adapters
from aiohttp import web
except ImportError: # pragma: no cover - exercised only when the extra is absent
web = None # type: ignore[assignment]
AIOHTTP_AVAILABLE = web is not None
def _event_from_wire(raw: dict) -> MessageEvent:
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
Identical mapping to the WS transport's ``_event_from_wire`` (the wire shape
is the same; only the transport differs). Kept here so the HTTP receiver has
no import dependency on the WS transport module.
"""
from gateway.config import Platform
from gateway.platforms.base import MessageType
from gateway.session import SessionSource
src = raw.get("source", {}) or {}
platform = src.get("platform", "relay")
try:
platform_enum = Platform(platform)
except ValueError:
platform_enum = Platform.RELAY
source = SessionSource(
platform=platform_enum,
chat_id=src.get("chat_id", ""),
chat_type=src.get("chat_type", "dm"),
chat_name=src.get("chat_name"),
user_id=src.get("user_id"),
user_name=src.get("user_name"),
thread_id=src.get("thread_id"),
chat_topic=src.get("chat_topic"),
user_id_alt=src.get("user_id_alt"),
chat_id_alt=src.get("chat_id_alt"),
guild_id=src.get("guild_id"),
parent_chat_id=src.get("parent_chat_id"),
message_id=src.get("message_id"),
)
try:
msg_type = MessageType(raw.get("message_type", "text"))
except ValueError:
msg_type = MessageType.TEXT
return MessageEvent(
text=raw.get("text", ""),
message_type=msg_type,
source=source,
message_id=raw.get("message_id"),
reply_to_message_id=raw.get("reply_to_message_id"),
media_urls=raw.get("media_urls") or [],
)
class InboundDeliveryReceiver:
"""Verifies + dispatches signed connector→gateway inbound deliveries.
Transport-agnostic core: ``handle_raw`` takes the raw body bytes + headers +
which route was hit and returns ``(status, body)``. The aiohttp wiring
(``build_app`` / ``serve``) is a thin shell so the verify+dispatch logic is
unit-testable without a live socket.
"""
def __init__(
self,
*,
delivery_key_verify_list: Callable[[], Sequence[str]],
on_message: InboundMessageHandler,
on_interrupt: Optional[InboundInterruptHandler] = None,
max_skew_seconds: int = 300,
) -> None:
# A callable (not a static list) so a rotated delivery key is picked up
# without rebuilding the receiver — mirrors the connector's verify list.
self._verify_list = delivery_key_verify_list
self._on_message = on_message
self._on_interrupt = on_interrupt
self._max_skew_seconds = max_skew_seconds
async def handle_raw(
self, *, raw_body: bytes, timestamp: Optional[str], signature: Optional[str], is_interrupt: bool
) -> tuple[int, dict]:
"""Verify the signature over ``raw_body`` and dispatch. Returns (status, json).
401 on a missing/invalid/expired signature (never dispatches unverified).
400 on malformed JSON. 200 on a verified, dispatched delivery.
"""
verify_keys = list(self._verify_list() or [])
if not verify_keys:
# No delivery key provisioned -> we cannot verify -> reject. A gateway
# that hasn't enrolled must not accept inbound (fail closed).
logger.warning("relay inbound: no delivery key configured; rejecting")
return 401, {"error": "no delivery key configured"}
# Verify over the EXACT raw bytes the connector signed. Decode to text
# with the same UTF-8 the connector's JSON.stringify produced; a single
# differing byte breaks the HMAC (raw-body-preservation discipline).
body_text = raw_body.decode("utf-8", errors="strict")
if not verify_delivery_signature(
body_text, timestamp, signature, verify_keys, self._max_skew_seconds
):
return 401, {"error": "invalid delivery signature"}
try:
payload = json.loads(body_text)
except json.JSONDecodeError:
return 400, {"error": "invalid JSON body"}
if is_interrupt or payload.get("type") == "interrupt":
session_key = str(payload.get("session_key", ""))
chat_id = str(payload.get("chat_id", "") or payload.get("reason", "") or "")
if self._on_interrupt is not None and session_key:
await self._on_interrupt(session_key, chat_id)
return 200, {"ok": True}
# Default: a normalized inbound message event.
event_raw = payload.get("event")
if not isinstance(event_raw, dict):
return 400, {"error": "missing event"}
event = _event_from_wire(event_raw)
await self._on_message(event)
return 200, {"ok": True}
# ── aiohttp wiring (thin shell over handle_raw) ──────────────────────
def build_app(self) -> Any:
"""Build an aiohttp Application exposing the delivery + interrupt routes."""
if not AIOHTTP_AVAILABLE:
raise RuntimeError(
"InboundDeliveryReceiver requires the 'aiohttp' package "
"(install the messaging extra)."
)
async def _deliver(request: Any) -> Any:
return await self._respond(request, is_interrupt=False)
async def _interrupt(request: Any) -> Any:
return await self._respond(request, is_interrupt=True)
app = web.Application()
app.router.add_get("/healthz", lambda _: web.Response(text="ok"))
app.router.add_post("/", _deliver)
app.router.add_post("/interrupt", _interrupt)
return app
async def _respond(self, request: Any, *, is_interrupt: bool) -> Any:
# Read the RAW bytes — do NOT use request.json() (it reparses and we'd
# verify over a re-serialized form, breaking the HMAC).
raw_body = await request.read()
status, body = await self.handle_raw(
raw_body=raw_body,
timestamp=request.headers.get(DELIVERY_TS_HEADER),
signature=request.headers.get(DELIVERY_SIG_HEADER),
is_interrupt=is_interrupt,
)
return web.json_response(body, status=status)

View file

@ -110,6 +110,8 @@ class WebSocketRelayTransport:
*,
connect_timeout_s: float = _HANDSHAKE_TIMEOUT_S,
outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S,
gateway_id: Optional[str] = None,
upgrade_secret: Optional[str] = None,
) -> None:
if not WEBSOCKETS_AVAILABLE:
raise RuntimeError(
@ -121,6 +123,14 @@ class WebSocketRelayTransport:
self._bot_id = bot_id
self._connect_timeout_s = connect_timeout_s
self._outbound_timeout_s = outbound_timeout_s
# Connection auth (Phase 2): when a per-gateway secret is configured the
# gateway presents an HMAC bearer on the WS upgrade so the connector can
# authenticate it (reject 4401 otherwise). gateway_id identifies the
# enrolled instance — the connector peeks it to index its secret verify
# list, then verifies the signature. Absent -> unauthenticated upgrade
# (dev/test, or a connector that doesn't enforce auth).
self._gateway_id = gateway_id
self._upgrade_secret = upgrade_secret
self._ws: Any = None
self._reader: Optional[asyncio.Task[None]] = None
@ -135,12 +145,33 @@ class WebSocketRelayTransport:
async def connect(self) -> bool:
loop = asyncio.get_running_loop()
self._descriptor_ready = loop.create_future()
self._ws = await websockets.connect(self._url) # type: ignore[union-attr]
headers = self._upgrade_headers()
if headers:
self._ws = await websockets.connect(self._url, additional_headers=headers) # type: ignore[union-attr]
else:
self._ws = await websockets.connect(self._url) # type: ignore[union-attr]
self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader")
# Send hello; the descriptor arrives via the reader and resolves handshake().
await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id})
return True
def _upgrade_headers(self) -> Dict[str, str]:
"""Auth headers for the WS upgrade, or {} when no secret is configured.
Presents ``Authorization: Bearer *** where the token is a signed
bearer built with the per-gateway secret (``gateway/relay/auth.py``
``make_upgrade_token``), keyed by ``gateway_id`` so the connector can
index its verify list. The connector rejects the upgrade (close 4401)
when this is missing/invalid/revoked; an unauthenticated connector
ignores it.
"""
if not (self._upgrade_secret and self._gateway_id):
return {}
from gateway.relay.auth import make_upgrade_token
token = make_upgrade_token(self._gateway_id, self._upgrade_secret)
return {"Authorization": f"Bearer {token}"}
async def disconnect(self) -> None:
self._closing = True
if self._reader is not None:

View file

@ -0,0 +1,250 @@
"""``hermes gateway enroll`` — enroll a self-hosted gateway with a relay connector.
The connectorgateway channel is authenticated (the gateway may be
customer-managed and internet-exposed). This command is the gateway half of the
zero-touch enrollment in the connector repo's
``docs/connector-gateway-auth-design.md``:
1. Resolve a fresh Nous Portal access token from the existing login
(``~/.hermes/auth.json``) the same path ``hermes dashboard register``
uses (``resolve_nous_access_token``). This proves *which Nous org (tenant)*
the caller owns; the connector derives the authoritative tenant from it via
``GET /api/oauth/account`` (never from anything the gateway asserts).
2. POST ``{enrollmentToken, gatewayId}`` to the connector's ``/relay/enroll``
with that token in the ``Authorization`` header, over TLS.
3. The connector verifies the enrollment token (signature + single-use +
tenant match), mints a per-gateway secret, get-or-creates the per-tenant
delivery key, and returns both ONCE.
4. Persist ``GATEWAY_RELAY_ID`` / ``GATEWAY_RELAY_SECRET`` /
``GATEWAY_RELAY_DELIVERY_KEY`` (+ ``GATEWAY_RELAY_URL`` if supplied) into
``~/.hermes/.env``. The per-gateway secret authenticates the WS upgrade;
the per-tenant delivery key verifies signed inbound deliveries.
Managed/hosted installs do NOT self-enroll: the orchestrator (NAS) mints the
secret directly and stamps it into the container env, so this command refuses to
run under ``is_managed()`` (mirrors ``dashboard register``).
EXPERIMENTAL: the relay auth scheme may change without a deprecation cycle until
2 Class-1 platforms validate the contract.
"""
from __future__ import annotations
import json
import os
import socket
import sys
import urllib.error
import urllib.request
from typing import Optional
def _default_gateway_id() -> str:
"""A stable-ish default gateway instance id: ``<hostname>-<pid-free slug>``.
The gatewayId identifies this enrolled instance for kill-switch granularity
(the connector indexes its secret verify list by it). Default to the host
name so a human can recognize it; overridable via ``--gateway-id``.
"""
host = ""
try:
host = socket.gethostname().strip()
except Exception:
host = ""
return f"gw-{host or 'hermes'}"
def _resolve_connector_url(override: Optional[str]) -> Optional[str]:
"""Resolve the connector base URL (no trailing slash) for enrollment.
Precedence: explicit ``--connector-url`` flag > ``GATEWAY_RELAY_URL`` env >
``gateway.relay_url`` in config.yaml. The relay URL is a ``ws(s)://`` dial
target; enrollment is an ``http(s)://`` POST to the same host, so we map the
scheme. Returns None when nothing is configured (the user must supply one).
"""
raw = (override or os.environ.get("GATEWAY_RELAY_URL", "")).strip()
if not raw:
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = (_load_gateway_config().get("gateway") or {})
raw = str(cfg.get("relay_url", "") or "").strip()
except Exception:
raw = ""
if not raw:
return None
raw = raw.rstrip("/")
# The relay dial URL is ws(s)://…/relay; enrollment posts to http(s)://…/relay/enroll.
if raw.startswith("ws://"):
raw = "http://" + raw[len("ws://"):]
elif raw.startswith("wss://"):
raw = "https://" + raw[len("wss://"):]
# Strip a trailing /relay path segment if the user pasted the dial URL.
if raw.endswith("/relay"):
raw = raw[: -len("/relay")]
return raw
def _post_enroll(
*,
connector_base_url: str,
access_token: str,
enrollment_token: str,
gateway_id: str,
timeout: float = 15.0,
) -> dict:
"""POST to the connector's ``/relay/enroll`` and return the JSON body.
Raises RuntimeError with a user-facing message on any non-2xx / transport
failure. The connector returns ``{secret, deliveryKey, tenant, gatewayId}``
on success, ``{error}`` at 400/401/403.
"""
url = f"{connector_base_url.rstrip('/')}/relay/enroll"
data = json.dumps({"enrollmentToken": enrollment_token, "gatewayId": gateway_id}).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
method="POST",
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
detail = ""
try:
detail = (json.loads(exc.read().decode()) or {}).get("error", "")
except Exception:
pass
if exc.code == 401:
raise RuntimeError(
"Connector rejected the caller identity (401). Your Nous Portal "
"token could not be verified — try `hermes auth login nous` and retry."
) from exc
if exc.code == 403:
raise RuntimeError(
detail
or "Enrollment token invalid, expired, already used, or tenant mismatch (403)."
) from exc
raise RuntimeError(
f"Connector returned HTTP {exc.code}" + (f": {detail}" if detail else "")
) from exc
except urllib.error.URLError as exc:
raise RuntimeError(
f"Could not reach the connector at {connector_base_url}: {exc.reason}"
) from exc
if not isinstance(payload, dict) or not payload.get("secret"):
raise RuntimeError("Connector returned an unexpected response (no secret).")
return payload
def cmd_gateway_enroll(args) -> None:
"""Enroll this gateway with a relay connector; persist the auth creds to .env."""
from hermes_cli.auth import AuthError, resolve_nous_access_token
from hermes_cli.config import is_managed, save_env_value
# Managed installs get GATEWAY_RELAY_* stamped in by the orchestrator (NAS
# mints the secret directly per the design's managed shape). Self-enrolling
# from inside such a container is a mistake — and save_env_value refuses to
# write anyway.
if is_managed():
print(
"✗ `hermes gateway enroll` is not available in a managed/hosted install.\n"
" The relay gateway secret is provisioned by the hosting platform."
)
sys.exit(1)
enrollment_token = (getattr(args, "token", None) or os.environ.get("GATEWAY_RELAY_ENROLL_TOKEN", "")).strip()
if not enrollment_token:
print(
"✗ No enrollment token. Pass --token <token> (or set "
"GATEWAY_RELAY_ENROLL_TOKEN).\n"
" The connector mints this single-use token when your tenant's route "
"is provisioned; it is delivered with your gateway config."
)
sys.exit(1)
connector_base_url = _resolve_connector_url(getattr(args, "connector_url", None))
if not connector_base_url:
print(
"✗ No connector URL. Pass --connector-url <url> (or set GATEWAY_RELAY_URL "
"/ gateway.relay_url in config.yaml)."
)
sys.exit(1)
gateway_id = (getattr(args, "gateway_id", None) or _default_gateway_id()).strip()
# 1. Resolve a fresh Nous access token (the tenant-proving identity).
try:
access_token = resolve_nous_access_token()
except AuthError as exc:
if getattr(exc, "relogin_required", False):
print("✗ You're not logged into Nous Portal.")
print(" Run `hermes setup` (or `hermes auth login nous`) first, then retry.")
else:
print(f"✗ Could not resolve a Nous Portal access token: {exc}")
sys.exit(1)
except Exception as exc:
print(f"✗ Could not resolve a Nous Portal access token: {exc}")
sys.exit(1)
# 2-3. Redeem the enrollment token at the connector.
try:
result = _post_enroll(
connector_base_url=connector_base_url,
access_token=access_token,
enrollment_token=enrollment_token,
gateway_id=gateway_id,
)
except RuntimeError as exc:
print(f"✗ Enrollment failed: {exc}")
sys.exit(1)
secret = str(result.get("secret") or "")
delivery_key = str(result.get("deliveryKey") or "")
tenant = str(result.get("tenant") or "")
resolved_gateway_id = str(result.get("gatewayId") or gateway_id)
# 4. Persist the creds idempotently. The secret + delivery key are sensitive;
# save_env_value writes them to ~/.hermes/.env (0600 dir) and never logs.
to_write = {
"GATEWAY_RELAY_ID": resolved_gateway_id,
"GATEWAY_RELAY_SECRET": secret,
"GATEWAY_RELAY_DELIVERY_KEY": delivery_key,
}
# Persist the connector URL too (as the ws(s):// dial target) when supplied
# explicitly, so the runtime can dial without re-specifying it.
explicit_url = (getattr(args, "connector_url", None) or "").strip()
if explicit_url:
to_write["GATEWAY_RELAY_URL"] = explicit_url.rstrip("/")
for key, value in to_write.items():
if not value:
continue
try:
save_env_value(key, value)
except Exception as exc:
print(f"✗ Failed to write {key} to .env: {exc}")
sys.exit(1)
from hermes_cli.config import get_env_path
print(f'✓ Enrolled gateway "{resolved_gateway_id}"' + (f" for tenant {tenant}" if tenant else ""))
print()
print(f" Wrote to {get_env_path()}:")
print(f" GATEWAY_RELAY_ID={resolved_gateway_id}")
print(" GATEWAY_RELAY_SECRET=<hidden>")
print(" GATEWAY_RELAY_DELIVERY_KEY=<hidden>")
if explicit_url:
print(f" GATEWAY_RELAY_URL={explicit_url.rstrip('/')}")
print()
print(
" The gateway now authenticates its relay WS upgrade with the per-gateway\n"
" secret and verifies signed inbound deliveries with the tenant delivery\n"
" key. Restart the gateway to pick up the new env."
)

View file

@ -11007,6 +11007,13 @@ def cmd_dashboard_register(args):
_impl(args)
def cmd_gateway_enroll(args):
"""Enroll a self-hosted gateway with a relay connector."""
from hermes_cli.gateway_enroll import cmd_gateway_enroll as _impl
_impl(args)
def cmd_completion(args, parser=None):
"""Print shell completion script."""
from hermes_cli.completion import generate_bash, generate_zsh, generate_fish
@ -11699,7 +11706,9 @@ def main():
# =========================================================================
# gateway + proxy commands (parsers built in hermes_cli/subcommands/gateway.py)
# =========================================================================
build_gateway_parser(subparsers, cmd_gateway=cmd_gateway, cmd_proxy=cmd_proxy)
build_gateway_parser(
subparsers, cmd_gateway=cmd_gateway, cmd_proxy=cmd_proxy, cmd_gateway_enroll=cmd_gateway_enroll
)
# =========================================================================
# lsp command

View file

@ -29,7 +29,9 @@ def _add_compat_platform_flag(parser: argparse.ArgumentParser) -> None:
)
def build_gateway_parser(subparsers, *, cmd_gateway: Callable, cmd_proxy: Callable) -> None:
def build_gateway_parser(
subparsers, *, cmd_gateway: Callable, cmd_proxy: Callable, cmd_gateway_enroll: Callable
) -> None:
"""Attach the ``gateway`` and ``proxy`` subcommands to ``subparsers``."""
# =========================================================================
# gateway command
@ -236,6 +238,52 @@ def build_gateway_parser(subparsers, *, cmd_gateway: Callable, cmd_proxy: Callab
help="Skip the confirmation prompt",
)
# gateway enroll — enroll a self-hosted gateway with a relay connector
# (connector⇄gateway auth). Redeems a single-use enrollment token for the
# per-gateway secret + per-tenant delivery key and writes them to .env.
# See docs/relay-connector-contract.md (and the connector repo's
# docs/connector-gateway-auth-design.md). EXPERIMENTAL.
gateway_enroll = gateway_subparsers.add_parser(
"enroll",
help="Enroll this gateway with a relay connector (writes relay auth creds to .env)",
description=(
"Redeem a single-use enrollment token with a relay connector. "
"Authenticates as your Nous Portal account (the connector derives the "
"authoritative tenant from it), mints this gateway's per-gateway secret "
"and per-tenant delivery key, and writes GATEWAY_RELAY_ID / "
"GATEWAY_RELAY_SECRET / GATEWAY_RELAY_DELIVERY_KEY into ~/.hermes/.env. "
"Requires being logged in (hermes setup). Not available in managed installs."
),
)
gateway_enroll.add_argument(
"--token",
default=None,
help=(
"The single-use enrollment token from the connector (delivered with "
"your gateway config). Also settable via GATEWAY_RELAY_ENROLL_TOKEN."
),
)
gateway_enroll.add_argument(
"--connector-url",
dest="connector_url",
default=None,
help=(
"The connector base/relay URL, e.g. wss://connector.example.com/relay "
"or https://connector.example.com. Also settable via GATEWAY_RELAY_URL "
"/ gateway.relay_url in config.yaml."
),
)
gateway_enroll.add_argument(
"--gateway-id",
dest="gateway_id",
default=None,
help=(
"A stable id for this gateway instance (kill-switch granularity). "
"Defaults to gw-<hostname>."
),
)
gateway_enroll.set_defaults(func=cmd_gateway_enroll)
# =========================================================================
# proxy command — local OpenAI-compatible proxy that attaches the user's
# OAuth-authenticated provider credentials to outbound requests. Lets

View file

@ -0,0 +1,167 @@
"""Unit tests for gateway/relay/auth.py — the gateway-side relay auth primitives.
Two layers:
1. **Self-consistency** make_token/verify_token round-trip, delivery-signature
verify, rotation verify list, tamper + skew + expiry rejection.
2. **Cross-implementation conformance** frozen vectors generated by the
connector's TypeScript (``src/core/relayAuthToken.ts`` ``makeToken``/``sign``)
are reproduced byte-for-byte by the Python port. If the connector ever
changes its wire scheme, these vectors must be regenerated in lockstep
(and that is the point the test fails loudly on drift). Regenerate with:
node -e 'import("./dist/core/relayAuthToken.js").then(m=>{ \
const s="00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"; \
console.log(m.makeToken("gw-instance-1", s, 0)); \
console.log(m.sign("1750000000."+JSON.stringify({a:1}), s)); })'
"""
from __future__ import annotations
import json
from gateway.relay.auth import (
DELIVERY_SIG_HEADER,
DELIVERY_TS_HEADER,
make_token,
make_upgrade_token,
sign,
verify_delivery_signature,
verify_signature,
verify_token,
)
# A fixed 256-bit hex secret used for the frozen connector vectors below.
_SECRET = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
# ── Frozen vectors produced by the connector's TypeScript (relayAuthToken.ts).
# Generated via dist/core/relayAuthToken.js makeToken/sign; see module docstring.
_CONN_TOKEN = "Z3ctaW5zdGFuY2UtMTowOjM3YWE3YjE0NWU4NzY0ZDQwM2JhOWM2MzlmMjMwZGQ2M2RlOGVkOTliODhmZWQzNmFhMDI2MjVhOGE3ZTM1NjQ"
# The EXACT bytes the connector signed: JS JSON.stringify emits compact JSON
# (no spaces). The gateway verifies over the literal received body, so the
# vector is the compact form — NOT Python's spaced json.dumps default. This is
# the raw-byte-preservation discipline (a single differing byte breaks the HMAC).
_CONN_BODY = '{"type":"message","event":{"text":"hi","source":{"chat_id":"c1"}}}'
_CONN_TS = 1750000000
_CONN_SIG = "ac9509c8dae52b5590f06378260877334ff1adc4b1c96bafa4b514165fae6dc6"
# ── Self-consistency ──────────────────────────────────────────────────────
def test_token_round_trip_no_expiry():
tok = make_token("payload-123", _SECRET, 0)
assert verify_token(tok, [_SECRET]) == "payload-123"
def test_token_payload_may_contain_colons():
# verify_token must split from the right so a colon-bearing payload survives.
payload = "agent:main:discord:group:chanA"
tok = make_token(payload, _SECRET, 0)
assert verify_token(tok, [_SECRET]) == payload
def test_upgrade_token_is_make_token_of_gateway_id():
assert make_upgrade_token("gw-1", _SECRET, 0) == make_token("gw-1", _SECRET, 0)
def test_token_wrong_secret_rejected():
tok = make_token("p", _SECRET, 0)
assert verify_token(tok, ["deadbeef" * 8]) is None
def test_token_expired_rejected():
# ttl in the past -> exp < now -> rejected.
tok = make_token("p", _SECRET, ttl_seconds=1)
# Force expiry by signing with a manual past exp via the low-level helper.
# Simpler: a 1s ttl token is still valid now; instead assert a clearly-old one.
# Build an already-expired token by hand using the same scheme.
import base64
signed = "p:1" # exp=1 (1970) -> long past
sig = sign(signed, _SECRET)
raw = f"{signed}:{sig}".encode()
expired = base64.urlsafe_b64encode(raw).decode().rstrip("=")
assert verify_token(expired, [_SECRET]) is None
# And the fresh one is accepted.
assert verify_token(tok, [_SECRET]) == "p"
def test_token_rotation_verify_list():
# A token signed with the (old) secondary still verifies during rotation.
old, new = _SECRET, "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"
tok_old = make_token("p", old, 0)
assert verify_token(tok_old, [new, old]) == "p" # primary=new, secondary=old
assert verify_token(tok_old, [new]) is None
def test_token_garbage_rejected():
assert verify_token("not-base64url!!!", [_SECRET]) is None
assert verify_token("", [_SECRET]) is None
def test_verify_signature_constant_time_multi_secret():
payload = "1700000000.body"
s = sign(payload, _SECRET)
assert verify_signature(payload, s, ["wrong", _SECRET]) is True
assert verify_signature(payload, s, ["wrong"]) is False
assert verify_signature(payload, "zz", [_SECRET]) is False # bad hex
# ── Delivery signature (connector -> gateway inbound) ──────────────────────
def test_delivery_signature_accepts_valid():
body = json.dumps({"type": "message", "event": {"text": "x"}})
ts = 1700000000
s = sign(f"{ts}.{body}", _SECRET)
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts) is True
def test_delivery_signature_tamper_rejected():
body = json.dumps({"type": "message", "event": {"text": "x"}})
ts = 1700000000
s = sign(f"{ts}.{body}", _SECRET)
# A single changed body byte breaks the HMAC.
assert verify_delivery_signature(body + " ", str(ts), s, [_SECRET], now=ts) is False
def test_delivery_signature_skew_rejected():
body = "{}"
ts = 1700000000
s = sign(f"{ts}.{body}", _SECRET)
# Beyond the 300s replay window in either direction.
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 301) is False
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts - 301) is False
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 299) is True
def test_delivery_signature_missing_headers_rejected():
assert verify_delivery_signature("{}", None, "abc", [_SECRET]) is False
assert verify_delivery_signature("{}", "1700000000", None, [_SECRET]) is False
assert verify_delivery_signature("{}", "not-an-int", "abc", [_SECRET]) is False
def test_delivery_headers_match_connector_names():
# The gateway reads exactly the header names the connector writes.
assert DELIVERY_TS_HEADER == "x-relay-timestamp"
assert DELIVERY_SIG_HEADER == "x-relay-signature"
# ── Cross-implementation conformance (frozen connector vectors) ────────────
def test_python_make_token_matches_connector_byte_for_byte():
assert make_token("gw-instance-1", _SECRET, 0) == _CONN_TOKEN
def test_python_verifies_connector_token():
assert verify_token(_CONN_TOKEN, [_SECRET]) == "gw-instance-1"
def test_python_sign_matches_connector_delivery_sig():
assert sign(f"{_CONN_TS}.{_CONN_BODY}", _SECRET) == _CONN_SIG
def test_python_verifies_connector_delivery_signature():
assert verify_delivery_signature(_CONN_BODY, str(_CONN_TS), _CONN_SIG, [_SECRET], now=_CONN_TS) is True

View file

@ -0,0 +1,150 @@
"""Unit tests for gateway/relay/inbound_receiver.py.
Covers the verify-then-dispatch core (handle_raw): a correctly-signed message
delivery is verified + dispatched; an interrupt delivery routes to the interrupt
handler; unsigned/tampered/expired/no-key deliveries are rejected 401; malformed
JSON is 400. Signatures are produced with the SAME auth primitives the connector
uses (gateway/relay/auth.py sign), so this exercises the real verify path.
"""
from __future__ import annotations
import json
import time
import pytest
from gateway.relay.auth import sign
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
_KEY = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
def _signed(body_obj: dict, key: str = _KEY, ts: int | None = None) -> tuple[bytes, str, str]:
"""Serialize compactly (as the connector's JSON.stringify does), sign it."""
body = json.dumps(body_obj, separators=(",", ":"))
raw = body.encode("utf-8")
t = ts if ts is not None else int(time.time())
return raw, str(t), sign(f"{t}.{body}", key)
def _receiver(**kw):
received: list = []
interrupts: list = []
async def on_message(ev):
received.append(ev)
async def on_interrupt(sk, chat):
interrupts.append((sk, chat))
r = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [_KEY],
on_message=on_message,
on_interrupt=on_interrupt,
**kw,
)
return r, received, interrupts
@pytest.mark.asyncio
async def test_valid_message_delivery_dispatched():
r, received, _ = _receiver()
raw, ts, sig = _signed(
{
"type": "message",
"event": {
"text": "hello",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
},
}
)
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 200 and body == {"ok": True}
assert len(received) == 1
assert received[0].text == "hello"
assert received[0].source.guild_id == "guildA"
@pytest.mark.asyncio
async def test_valid_interrupt_delivery_routes_to_interrupt_handler():
r, _, interrupts = _receiver()
raw, ts, sig = _signed({"type": "interrupt", "session_key": "agent:main:discord:group:c:u", "reason": "stop"})
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=True)
assert status == 200
assert interrupts and interrupts[0][0] == "agent:main:discord:group:c:u"
@pytest.mark.asyncio
async def test_tampered_body_rejected_401():
r, received, _ = _receiver()
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw + b" ", timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
assert received == []
@pytest.mark.asyncio
async def test_unsigned_rejected_401():
r, _, _ = _receiver()
raw, _, _ = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw, timestamp=None, signature=None, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_expired_timestamp_rejected_401():
r, _, _ = _receiver(max_skew_seconds=300)
raw, _, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, ts=1)
# ts=1 (1970) is far outside the 300s window vs now.
status, _ = await r.handle_raw(raw_body=raw, timestamp="1", signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_wrong_key_rejected_401():
r, _, _ = _receiver()
other = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=other)
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_no_delivery_key_fails_closed_401():
async def on_message(ev):
pass
r = InboundDeliveryReceiver(delivery_key_verify_list=lambda: [], on_message=on_message)
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_rotation_secondary_key_accepted():
new = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
received: list = []
async def on_message(ev):
received.append(ev)
# Connector still signs with the OLD key (secondary); verify list has both.
r = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [new, _KEY], on_message=on_message
)
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=_KEY)
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 200 and len(received) == 1
@pytest.mark.asyncio
async def test_malformed_json_after_valid_signature_is_400():
r, _, _ = _receiver()
# Sign a non-JSON body so the signature passes but json.loads fails.
raw = b"not json at all"
ts = str(int(time.time()))
sig = sign(f"{ts}.{raw.decode()}", _KEY)
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 400

View file

@ -47,6 +47,19 @@ def _relay_py_files() -> list[Path]:
return sorted(_RELAY_PKG.glob("*.py"))
# ``auth.py`` is the connector⇄gateway CHANNEL authenticator (the gateway's WS
# upgrade bearer + inbound-delivery signature verification). ``inbound_receiver.py``
# is the signed-inbound-delivery receiver that USES that channel auth to verify
# connector→gateway POSTs. Both are net-new, intended, and the whole point of
# authenticating an untrusted/disposable gateway — they are NOT platform crypto.
# They use HMAC over the connector's per-gateway / per-tenant secrets (NOT any
# platform's signing secret), so they are exempt from the platform-crypto symbol
# scan below. The module-import ban (platform-crypto modules) still applies to
# every file including these — they import only stdlib hmac/hashlib and each
# other, never a platform-crypto module, so they stay clean there.
_CHANNEL_AUTH_FILES = {"auth.py", "inbound_receiver.py"}
def test_relay_package_imports_no_platform_crypto():
"""No module in gateway/relay imports a platform-crypto / verification module."""
offenders: list[str] = []
@ -72,9 +85,19 @@ def test_relay_package_imports_no_platform_crypto():
def test_relay_package_calls_no_signature_verification():
"""No relay module references a signature/crypto-verification symbol by name."""
"""No relay module references a PLATFORM signature/crypto-verification symbol.
Scoped to platform crypto (Discord ed25519, Twilio/WeCom HMAC, webhook
signature checks). The connectorgateway channel authenticator (``auth.py``)
is exempt: its HMAC is over the connector's own per-gateway/per-tenant
secrets to authenticate the relay channel itself the gateway holds NO
platform secret and re-validates NO platform payload. See ``auth.py`` and
docs/connector-gateway-auth-design.md.
"""
offenders: list[str] = []
for path in _relay_py_files():
if path.name in _CHANNEL_AUTH_FILES:
continue
for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
# Skip comments / docstrings-as-prose: only flag code-like usage.
stripped = line.strip()
@ -89,3 +112,30 @@ def test_relay_package_calls_no_signature_verification():
+ "\n ".join(offenders)
+ "\nThe connector verifies at the edge; the gateway re-validates nothing."
)
def test_channel_auth_uses_only_stdlib_crypto_not_platform_modules():
"""auth.py (channel authenticator) imports only stdlib crypto, no platform crypto.
Positive guard: the connectorgateway channel auth is allowed to do HMAC,
but it must do so with stdlib primitives over connector-owned secrets it
must never reach for a platform-crypto module. This keeps the exemption
above honest (auth.py can't smuggle in platform verification).
"""
auth_py = _RELAY_PKG / "auth.py"
assert auth_py.is_file(), "gateway/relay/auth.py (channel authenticator) is missing"
tree = ast.parse(auth_py.read_text(encoding="utf-8"), filename=str(auth_py))
imported: list[str] = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
imported += [a.name for a in node.names]
elif isinstance(node, ast.ImportFrom):
imported.append(node.module or "")
# No platform-crypto module import.
assert not [m for m in imported if any(tok in m for tok in _FORBIDDEN_MODULE_TOKENS)], (
f"auth.py must not import platform crypto; imports={imported}"
)
# It does use stdlib hmac/hashlib (that's how it authenticates the channel).
assert "hmac" in imported and "hashlib" in imported, (
f"auth.py should authenticate the channel with stdlib hmac/hashlib; imports={imported}"
)

View file

@ -305,6 +305,7 @@ def test_gateway_run_force_flag_survives_parser_extraction():
subparsers,
cmd_gateway=lambda _args: None,
cmd_proxy=lambda _args: None,
cmd_gateway_enroll=lambda _args: None,
)
args = parser.parse_args(["gateway", "run", "--force"])

View file

@ -20,6 +20,10 @@ def _h_proxy(args): # pragma: no cover - identity only
return "proxy"
def _h_gateway_enroll(args): # pragma: no cover - identity only
return "gateway_enroll"
def _h_profile(args): # pragma: no cover - identity only
return "profile"
@ -34,7 +38,12 @@ def _profile_parser():
def _gateway_parser():
p = argparse.ArgumentParser(prog="hermes")
sub = p.add_subparsers(dest="command")
build_gateway_parser(sub, cmd_gateway=_h_gateway, cmd_proxy=_h_proxy)
build_gateway_parser(
sub,
cmd_gateway=_h_gateway,
cmd_proxy=_h_proxy,
cmd_gateway_enroll=_h_gateway_enroll,
)
return p
@ -90,3 +99,25 @@ def test_gateway_lifecycle_accepts_legacy_platform_flag():
assert ns.gateway_command == action
assert ns.platform == "photon"
assert ns.func is _h_gateway
def test_gateway_enroll_dispatch():
p = _gateway_parser()
ns = p.parse_args(
[
"gateway",
"enroll",
"--token",
"tok",
"--connector-url",
"wss://connector.example.com/relay",
"--gateway-id",
"gw-1",
]
)
assert ns.command == "gateway"
assert ns.gateway_command == "enroll"
assert ns.func is _h_gateway_enroll
assert ns.token == "tok"
assert ns.connector_url == "wss://connector.example.com/relay"
assert ns.gateway_id == "gw-1"