diff --git a/gateway/config.py b/gateway/config.py index 0ebf23e12d0..c63b9523d73 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -2143,5 +2143,24 @@ def _apply_env_overrides(config: GatewayConfig) -> None: except Exception as e: logger.debug("Plugin platform enable pass failed: %s", e) + # Relay (generic connector-fronted platform, EXPERIMENTAL). Enabled when a + # connector relay URL is configured via GATEWAY_RELAY_URL (env) or + # gateway.relay_url (config.yaml). The adapter is registered into the + # platform_registry at gateway startup (gateway.relay.register_relay_adapter) + # and dials OUT to the connector — so, like Telegram/Matrix, it has no public + # inbound port and just needs Platform.RELAY present+enabled in + # config.platforms for start_gateway()'s connect loop to bring it up. The + # connected-checker (Platform.RELAY in _PLATFORM_CONNECTED_CHECKERS) keys on + # extra["relay_url"], so mirror the URL into extra here. + relay_url_env = os.getenv("GATEWAY_RELAY_URL", "").strip() + relay_url_yaml = "" + existing_relay = config.platforms.get(Platform.RELAY) + if existing_relay is not None: + relay_url_yaml = str(existing_relay.extra.get("relay_url") or "").strip() + relay_url_val = relay_url_env or relay_url_yaml + if relay_url_val: + relay_config = _enable_from_env(Platform.RELAY) + relay_config.extra["relay_url"] = relay_url_val.rstrip("/") + for platform_config in config.platforms.values(): platform_config.extra.pop("_enabled_explicit", None) diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index fc4e5f40ee7..a1a7826f8f8 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -57,6 +57,13 @@ class RelayAdapter(BasePlatformAdapter): self._transport = transport # Capability surface read by stream_consumer (getattr(..., 4096)). self.MAX_MESSAGE_LENGTH = descriptor.max_message_length + # chat_id -> guild_id (Discord) / workspace scope, learned from inbound + # events. The connector's egress guard resolves the owning tenant from + # the OUTBOUND action's metadata.guild_id; the gateway's generic delivery + # path (run.py _thread_metadata_for_source) only carries thread_id, so we + # re-attach the scope here from what we saw inbound. Keyed by chat_id + # (channel) since that's what send() receives. See routedEgressGuard.ts. + self._scope_by_chat: Dict[str, str] = {} self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain") # ── capability surface (from descriptor) ───────────────────────────── @@ -108,8 +115,35 @@ class RelayAdapter(BasePlatformAdapter): async def _on_inbound(self, event) -> None: """Bridge a connector-delivered MessageEvent into the normal adapter path.""" + self._capture_scope(event) await self.handle_message(event) + def _capture_scope(self, event) -> None: + """Remember chat_id -> guild scope from an inbound event so our outbound + (the agent's reply) can re-assert it for the connector's egress tenant + resolution. Never raises — scope tracking must not break inbound.""" + try: + src = getattr(event, "source", None) + scope = getattr(src, "guild_id", None) if src else None + chat = getattr(src, "chat_id", None) if src else None + if scope and chat: + self._scope_by_chat[str(chat)] = str(scope) + except Exception: # noqa: BLE001 - scope tracking must never break inbound + pass + + def _with_scope(self, chat_id: str, metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """Ensure the outbound metadata carries guild_id for the connector's + egress tenant resolution. The connector resolves the owning tenant from + metadata.guild_id (Discord); without it egress is declined as + 'target not routed to an onboarded tenant'. No-op when we have no scope + for this chat (e.g. DMs) or it's already present.""" + meta: Dict[str, Any] = dict(metadata or {}) + if not meta.get("guild_id"): + scope = self._scope_by_chat.get(str(chat_id)) + if scope: + meta["guild_id"] = scope + return meta + async def on_interrupt(self, session_key: str, chat_id: str) -> None: """Bridge a connector-delivered /stop into the adapter's interrupt path. @@ -140,7 +174,7 @@ class RelayAdapter(BasePlatformAdapter): "chat_id": chat_id, "content": content, "reply_to": reply_to, - "metadata": metadata or {}, + "metadata": self._with_scope(chat_id, metadata), } ) return SendResult( diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index b2e8eda09cd..b091d44faa8 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -54,6 +54,35 @@ _HANDSHAKE_TIMEOUT_S = 30.0 _OUTBOUND_TIMEOUT_S = 30.0 +def _ws_dial_url(url: str) -> str: + """Normalize a connector URL to the ``ws(s)://…/relay`` dial target. + + The relay URL is configured once (``GATEWAY_RELAY_URL`` / ``gateway.relay_url``) + as the connector's BASE URL (e.g. ``https://connector.example``) and shared by + both the provision POST (which needs ``http(s)://…/relay/provision`` — see + ``_provision_url``) and the WS dial (which needs ``ws(s)://…/relay``, the path + the connector mounts its ``WebSocketServer`` on). Two normalizations, both + load-bearing: + + - scheme: ``https -> wss``, ``http -> ws`` (``websockets.connect`` raises + "scheme isn't ws or wss" on an http(s) URL). + - path: ensure it ends in ``/relay`` (the connector returns HTTP 400 on an + upgrade to any other path, since the WS server is mounted at ``/relay``). + + Idempotent: an already-``ws(s)://…/relay`` URL is returned unchanged, so a URL + configured WITH the scheme and/or ``/relay`` still works. + """ + raw = (url or "").strip() + if raw.startswith("https://"): + raw = "wss://" + raw[len("https://"):] + elif raw.startswith("http://"): + raw = "ws://" + raw[len("http://"):] + raw = raw.rstrip("/") + if not raw.endswith("/relay"): + raw = f"{raw}/relay" + return raw + + def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent: """Rebuild a MessageEvent from the connector's normalized inbound payload. @@ -118,7 +147,7 @@ class WebSocketRelayTransport: "WebSocketRelayTransport requires the 'websockets' package " "(install the messaging extra)." ) - self._url = url + self._url = _ws_dial_url(url) self._platform = platform self._bot_id = bot_id self._connect_timeout_s = connect_timeout_s diff --git a/tests/gateway/relay/test_relay_adapter.py b/tests/gateway/relay/test_relay_adapter.py index 64d6aab2f86..f176eb5728c 100644 --- a/tests/gateway/relay/test_relay_adapter.py +++ b/tests/gateway/relay/test_relay_adapter.py @@ -75,3 +75,68 @@ async def test_send_without_transport_returns_failure(): result = await a.send("chat1", "hello") assert result.success is False assert result.error == "no transport" + + +class _CaptureTransport: + """Minimal RelayTransport stand-in that records the outbound action.""" + + def __init__(self): + self.sent = None + + def set_inbound_handler(self, h): # noqa: D401 + self._h = h + + async def send_outbound(self, action): + self.sent = action + return {"success": True, "message_id": "m1"} + + +def _make_event(chat_id="chan-1", guild_id="guild-9"): + from gateway.platforms.base import MessageEvent, MessageType + from gateway.session import SessionSource + + src = SessionSource( + platform=Platform.RELAY, + chat_id=chat_id, + chat_type="channel", + guild_id=guild_id, + ) + return MessageEvent(text="hi", source=src, message_type=MessageType.TEXT) + + +@pytest.mark.asyncio +async def test_send_reattaches_guild_id_from_inbound_scope(): + """The connector's egress guard resolves the owning tenant from + metadata.guild_id; the gateway's generic delivery path drops it, so the + relay adapter must re-attach the guild scope learned from the inbound event. + Regression for live 'discord egress declined: target not routed to an + onboarded tenant'.""" + t = _CaptureTransport() + a = RelayAdapter(PlatformConfig(), make_desc(platform="discord"), transport=t) + # Simulate the connector delivering an inbound message in guild-9 / chan-1, + # but don't run the full handle_message pipeline — just the scope capture. + a._capture_scope(_make_event(chat_id="chan-1", guild_id="guild-9")) + + await a.send("chan-1", "the reply") + + assert t.sent["metadata"].get("guild_id") == "guild-9" + + +@pytest.mark.asyncio +async def test_send_without_known_scope_omits_guild_id(): + """A chat we never saw inbound (e.g. a DM) gets no guild_id — no-op, never + invents a scope.""" + t = _CaptureTransport() + a = RelayAdapter(PlatformConfig(), make_desc(platform="discord"), transport=t) + await a.send("unknown-chat", "hi") + assert "guild_id" not in t.sent["metadata"] + + +@pytest.mark.asyncio +async def test_send_preserves_explicit_guild_id(): + """An explicitly-provided metadata.guild_id is never overwritten.""" + t = _CaptureTransport() + a = RelayAdapter(PlatformConfig(), make_desc(platform="discord"), transport=t) + a._capture_scope(_make_event(chat_id="chan-1", guild_id="guild-9")) + await a.send("chan-1", "hi", metadata={"guild_id": "explicit-1"}) + assert t.sent["metadata"]["guild_id"] == "explicit-1" diff --git a/tests/gateway/relay/test_ws_transport.py b/tests/gateway/relay/test_ws_transport.py index dcb3f6c714f..00aa9b43327 100644 --- a/tests/gateway/relay/test_ws_transport.py +++ b/tests/gateway/relay/test_ws_transport.py @@ -177,3 +177,25 @@ async def test_disconnect_fails_pending_waiters_cleanly(server): # After disconnect, an outbound returns a structured failure rather than hanging. result = await t.send_outbound({"op": "send", "chat_id": "c", "content": "x"}) assert result["success"] is False + + +def test_https_url_normalized_to_wss(): + """The relay URL is configured once as the http(s):// BASE (for the provision + POST), but websockets.connect needs ws(s):// and the connector mounts its WS + server at /relay. The transport must convert scheme AND ensure the /relay + path. Regression for the live staging failures 'scheme isn't ws or wss' then + 'server rejected WebSocket connection: HTTP 400' (wrong path).""" + t = WebSocketRelayTransport("https://connector.example", "discord", "b") + assert t._url == "wss://connector.example/relay" + t2 = WebSocketRelayTransport("http://connector.local:8080", "discord", "b") + assert t2._url == "ws://connector.local:8080/relay" + + +def test_ws_dial_url_idempotent_with_scheme_and_path(): + # Already ws(s):// and/or already ending in /relay -> unchanged (no double append). + t = WebSocketRelayTransport("wss://connector.example/relay", "discord", "b") + assert t._url == "wss://connector.example/relay" + t2 = WebSocketRelayTransport("https://connector.example/relay/", "discord", "b") + assert t2._url == "wss://connector.example/relay" + t3 = WebSocketRelayTransport("ws://127.0.0.1:9", "discord", "b") + assert t3._url == "ws://127.0.0.1:9/relay" diff --git a/tests/gateway/test_config.py b/tests/gateway/test_config.py index 9e74dd355ad..9f38f9b8a0d 100644 --- a/tests/gateway/test_config.py +++ b/tests/gateway/test_config.py @@ -311,6 +311,55 @@ class TestLoadGatewayConfig: assert config.quick_commands == {"limits": {"type": "exec", "command": "echo ok"}} + def test_relay_platform_enabled_from_env_url(self, tmp_path, monkeypatch): + """GATEWAY_RELAY_URL must enable Platform.RELAY in config.platforms so + start_gateway()'s connect loop actually dials the connector. Registering + the adapter in the platform_registry is NOT enough — the connect loop + iterates config.platforms, so an un-enabled RELAY never connects (the + 'relay registered but no inbound' bug).""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.setenv("GATEWAY_RELAY_URL", "https://connector.example/relay/") + + config = load_gateway_config() + + assert Platform.RELAY in config.platforms + relay = config.platforms[Platform.RELAY] + assert relay.enabled is True + # Trailing slash stripped; mirrored into extra for the connected-checker. + assert relay.extra.get("relay_url") == "https://connector.example/relay" + assert Platform.RELAY in config.get_connected_platforms() + + def test_relay_platform_absent_when_url_unset(self, tmp_path, monkeypatch): + """No relay URL -> no RELAY platform, so direct/single-tenant gateways + are unaffected.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False) + + config = load_gateway_config() + + assert Platform.RELAY not in config.platforms + + def test_relay_platform_enabled_from_config_yaml(self, tmp_path, monkeypatch): + """gateway.relay_url in config.yaml also enables RELAY (env-less path).""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + config_path = hermes_home / "config.yaml" + config_path.write_text( + "gateway:\n platforms:\n relay:\n extra:\n relay_url: https://connector.example/relay\n", + encoding="utf-8", + ) + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False) + + config = load_gateway_config() + + assert Platform.RELAY in config.platforms + assert config.platforms[Platform.RELAY].enabled is True + def test_bridges_group_sessions_per_user_from_config_yaml(self, tmp_path, monkeypatch): hermes_home = tmp_path / ".hermes" hermes_home.mkdir()