diff --git a/gateway/relay/__init__.py b/gateway/relay/__init__.py index cbdb0f65cbc..7416dcdc0a5 100644 --- a/gateway/relay/__init__.py +++ b/gateway/relay/__init__.py @@ -44,15 +44,81 @@ def relay_url() -> Optional[str]: return None -def relay_platform_identity() -> tuple[str, str]: - """Platform + bot id this gateway fronts over the relay (for the handshake hello). +def relay_platform_identities() -> list[tuple[str, str]]: + """The (platform, bot_id) pairs this gateway fronts over the relay (Phase 1.5). - Defaults to ``("relay", "")``; overridable via ``GATEWAY_RELAY_PLATFORM`` / - ``GATEWAY_RELAY_BOT_ID`` so one connector can front several platforms. + Shape A (multi-platform-per-agent, D-Q1.5c — CUT OVER, no scalar fallback): + one gateway fronts a SET of platforms on one WS connection. The set is the + env-stamped deploy config: + + - ``GATEWAY_RELAY_PLATFORMS`` — comma-sep list (e.g. ``discord,telegram``). + - ``GATEWAY_RELAY_BOT_IDS`` — JSON keyed map + ``{"discord": {"botId": "..."}, "telegram": {"botId": "...", "username": "..."}}``. + + Returns the ordered list of ``(platform, bot_id)`` pairs (the FIRST is the + default the handshake/descriptor falls back to). The connector accepts N + hellos accumulating into its advertised set; outbound frames discriminate + per-frame on the platform (gateway-gateway D-Q1.5b.1). A platform present in + the list but absent from the ids map resolves with an empty bot_id (the + connector rejects an unprovisioned platform with a structured failure). + + Defaults to ``[("relay", "")]`` when nothing is configured (the generic + single-plane fallback for a connector that didn't stamp a platform set). """ - platform = os.environ.get("GATEWAY_RELAY_PLATFORM", "relay").strip() or "relay" - bot_id = os.environ.get("GATEWAY_RELAY_BOT_ID", "").strip() - return platform, bot_id + platforms_raw = os.environ.get("GATEWAY_RELAY_PLATFORMS", "").strip() + platforms = [p.strip() for p in platforms_raw.split(",") if p.strip()] + if not platforms: + return [("relay", "")] + ids = _relay_bot_ids_map() + out: list[tuple[str, str]] = [] + for platform in platforms: + entry = ids.get(platform) or {} + bot_id = str(entry.get("botId", "")).strip() if isinstance(entry, dict) else "" + out.append((platform, bot_id)) + return out + + +def _relay_bot_ids_map() -> dict: + """Parse ``GATEWAY_RELAY_BOT_IDS`` (JSON keyed map). Never raises — a malformed + map yields ``{}`` so a bad config degrades to empty bot ids (the connector + rejects an unprovisioned platform) rather than crashing boot.""" + import json + import logging + + raw = os.environ.get("GATEWAY_RELAY_BOT_IDS", "").strip() + if not raw: + return {} + try: + parsed = json.loads(raw) + return parsed if isinstance(parsed, dict) else {} + except Exception: # noqa: BLE001 - a bad map must not crash boot + logging.getLogger("gateway.relay").warning( + "GATEWAY_RELAY_BOT_IDS is not valid JSON; treating as empty" + ) + return {} + + +def relay_bot_username(platform: str) -> Optional[str]: + """The bot's deep-link username/handle for a platform (e.g. Telegram's + ``@handle`` for ``t.me/``), read from the per-platform entry in + ``GATEWAY_RELAY_BOT_IDS``. None when absent (most platforms don't need one). + """ + entry = _relay_bot_ids_map().get(platform) + if isinstance(entry, dict): + username = entry.get("username") + if username: + return str(username).lstrip("@") + return None + + +def relay_platform_identity() -> tuple[str, str]: + """The PRIMARY (platform, bot_id) — the first identity in the configured set. + + Kept for call sites that need a single representative identity (the default + descriptor platform, the policy projection's primary). The full set is + ``relay_platform_identities()``. Defaults to ``("relay", "")``. + """ + return relay_platform_identities()[0] def relay_connection_auth() -> tuple[Optional[str], Optional[str]]: @@ -217,14 +283,16 @@ def _policy_url(relay_dial_url: str) -> str: return f"{raw}/relay/policy" -def relay_relevance_policy() -> Optional[dict]: - """Project this gateway's RELEVANCE config into the connector's generic vocabulary. +def relay_relevance_policy(platform: Optional[str] = None) -> Optional[dict]: + """Project a fronted platform's RELEVANCE config into the connector's generic vocabulary. The connector's relevance gate (Phase 6 Unit ζ) reasons over a platform-agnostic policy — ``requireAddress`` / ``freeResponseScopes`` / ``allowOtherBots`` — NOT over Discord/Telegram words. This is the gateway side of that contract: it reads the agent's existing relevance knobs and - emits the generic shape the connector stores per-instance. + emits the generic shape the connector stores per-instance (Phase 1.5: the + connector keys the policy by ``(tenant, platform, instanceId)``, so each + fronted platform gets its own row — pass its name here). Mapping (the connector vocabulary ← the gateway's existing config): - ``requireAddress`` ← the platform's ``require_mention`` (the agent @@ -237,11 +305,13 @@ def relay_relevance_policy() -> Optional[dict]: Read from the relay platform's config block (the platform the connector fronts, e.g. ``discord:``), falling back to the bridged top-level keys, then - the ``{PLATFORM}_*`` env. Returns the generic dict, or None when relay isn't + the ``{PLATFORM}_*`` env. ``platform`` defaults to the PRIMARY fronted + platform (back-compat). Returns the generic dict, or None when relay isn't configured or the platform exposes no relevance knobs (⇒ the connector's quiet default already matches, so there's nothing to declare). """ - platform, _bot_id = relay_platform_identity() + if platform is None: + platform, _bot_id = relay_platform_identity() if not platform or platform == "relay": # No concrete fronted platform resolved ⇒ nothing platform-specific to project. return None @@ -428,7 +498,7 @@ def self_provision_relay() -> bool: logger.warning("relay self-provision skipped: could not resolve Nous token (%s)", exc) return False - platform, bot_id = relay_platform_identity() + identities = relay_platform_identities() # gatewayId default mirrors the enroll CLI's hostname-based slug. import socket @@ -442,35 +512,61 @@ def self_provision_relay() -> bool: instance_id = relay_instance_id() wake_url = relay_wake_url() - try: - result = _post_provision( - provision_url=_provision_url(dial_url), - access_token=access_token, - gateway_id=gateway_id, - platform=platform, - bot_id=bot_id, - gateway_endpoint=endpoint, - route_keys=route_keys, - instance_id=instance_id, - wake_url=wake_url, + # Phase 1.5 (D-Q1.5c): provision EACH fronted platform under the SAME + # gatewayId + the SAME (platform-less) per-gateway secret. The connector's + # secret record is (gatewayId -> tenant) only; platform/botId live on the + # per-platform route rows (relayProvision.ts:124/148), so N provision POSTs + # with one gatewayId add N platforms' routes under one secret. The loop is + # PARTIAL-FAILURE-TOLERANT: a platform that fails to provision is logged and + # skipped (it just isn't fronted) — the others still come up. The FIRST + # successful provision sets the in-process creds; later platforms re-provision + # against the same gatewayId (idempotent on the secret, additive on routes). + provisioned: list[str] = [] + result: dict = {} + for platform, bot_id in identities: + try: + result = _post_provision( + provision_url=_provision_url(dial_url), + access_token=access_token, + gateway_id=gateway_id, + platform=platform, + bot_id=bot_id, + gateway_endpoint=endpoint, + route_keys=route_keys, + instance_id=instance_id, + wake_url=wake_url, + ) + except RuntimeError as exc: + logger.warning( + "relay self-provision failed for platform=%s (%s); continuing with the rest", + platform, + exc, + ) + continue + provisioned.append(platform) + # Set creds in-process on the FIRST success so register_relay_adapter() + # reads them from os.environ (the per-gateway secret authenticates the + # outbound WS upgrade). Subsequent platforms share the same gatewayId + + # secret (the connector returns the same record for the same gatewayId). + # Never logged. + if "GATEWAY_RELAY_SECRET" not in os.environ or not os.environ.get("GATEWAY_RELAY_SECRET"): + os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id) + os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "") + os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "") + + if not provisioned: + logger.warning( + "relay self-provision failed for ALL platforms (%s); gateway will boot without relay auth", + ",".join(p for p, _ in identities), ) - except RuntimeError as exc: - logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc) return False - # Set creds in-process so register_relay_adapter() reads them from os.environ - # (the per-gateway secret authenticates the outbound WS upgrade). The delivery - # key is still issued by the connector and persisted for forward-compat, but - # inbound now rides the WS (no HTTP receiver), so it is not consumed here. - # Never logged. - os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id) - os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "") - os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "") tenant = str(result.get("tenant") or "") logger.info( - "relay self-provisioned (gateway_id=%s tenant=%s routes=%d inbound=%s instance=%s wake=%s)", - os.environ["GATEWAY_RELAY_ID"], + "relay self-provisioned (gateway_id=%s tenant=%s platforms=%s routes=%d inbound=%s instance=%s wake=%s)", + os.environ.get("GATEWAY_RELAY_ID", gateway_id), tenant or "?", + ",".join(provisioned), len(route_keys), "yes" if endpoint else "outbound-only", instance_id or "unbound", @@ -546,33 +642,50 @@ def send_relay_policy() -> bool: # be unauthenticated too, so there's no instance to attach a policy to). return False - policy = relay_relevance_policy() - if policy is None: - # Nothing non-default to declare ⇒ the connector's quiet default already - # matches; don't write a redundant row. - logger.info("relay policy: no non-default relevance config to declare; using connector default") - return False - + # Phase 1.5: declare a policy PER fronted platform — the connector keys the + # relevance policy by (tenant, platform, instanceId), so each platform this + # gateway fronts gets its own row. Per-platform, partial-tolerant: a platform + # with nothing non-default to declare is skipped; a failed POST for one + # platform doesn't block the others. A single-platform gateway declares one + # policy exactly as before. try: from gateway.relay.auth import make_upgrade_token token = make_upgrade_token(gateway_id, secret) - status = _post_policy(policy_url=_policy_url(dial_url), token=token, policy=policy) - except Exception as exc: # noqa: BLE001 - boot must survive a policy-declare failure - logger.warning("relay policy declaration failed (%s); connector keeps prior/default policy", exc) + except Exception as exc: # noqa: BLE001 - boot must survive a token-build failure + logger.warning("relay policy declaration failed to build token (%s); connector keeps prior policy", exc) return False - if status == 200: - logger.info( - "relay policy declared (platform=%s require_address=%s free_scopes=%d allow_bots=%s)", - policy.get("platform"), - policy.get("requireAddress"), - len(policy.get("freeResponseScopes") or []), - policy.get("allowOtherBots"), - ) - return True - logger.warning("relay policy declaration returned HTTP %s; connector keeps prior/default policy", status) - return False + any_declared = False + for platform, _bot_id in relay_platform_identities(): + policy = relay_relevance_policy(platform) + if policy is None: + # Nothing non-default to declare for this platform ⇒ the connector's + # quiet default already matches; don't write a redundant row. + continue + try: + status = _post_policy(policy_url=_policy_url(dial_url), token=token, policy=policy) + except Exception as exc: # noqa: BLE001 - boot must survive a policy-declare failure + logger.warning( + "relay policy declaration failed for platform=%s (%s); continuing", platform, exc + ) + continue + if status == 200: + any_declared = True + logger.info( + "relay policy declared (platform=%s require_address=%s free_scopes=%d allow_bots=%s)", + policy.get("platform"), + policy.get("requireAddress"), + len(policy.get("freeResponseScopes") or []), + policy.get("allowOtherBots"), + ) + else: + logger.warning( + "relay policy declaration for platform=%s returned HTTP %s; connector keeps prior/default policy", + platform, + status, + ) + return any_declared def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool: @@ -621,6 +734,12 @@ def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bo resolved_url, platform, bot_id, + # Phase 1.5: the full SET of (platform, bot_id) this gateway fronts. + # The transport sends one hello per identity (the connector + # accumulates them) and resolves the per-frame egress botId from + # this set. A single-platform deploy passes a 1-element list, so + # behaviour is byte-identical to before. + identities=relay_platform_identities(), gateway_id=gateway_id, upgrade_secret=upgrade_secret, # Phase 5 §5.3: re-dial + re-handshake after an unexpected socket diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index 0cf5fb18a60..b7cceb85469 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -71,6 +71,15 @@ class RelayAdapter(BasePlatformAdapter): # recipient's author binding; we re-attach this user_id as # metadata.user_id on the outbound action so it can. See _capture_scope. self._dm_user_by_chat: Dict[str, str] = {} + # chat_id -> the UNDERLYING platform (e.g. "discord", "telegram") this + # chat belongs to (Phase 1.5 multi-platform-per-agent). One relay adapter + # fronts N platforms on one WS; an outbound reply must egress through the + # platform the inbound came from. We remember it per chat_id from the + # inbound event's source.platform and stamp it on the OutboundFrame so the + # connector dispatches to the right sender. Empty for a single-platform + # gateway (the connector falls back to its session default). See + # _capture_scope / send. + self._platform_by_chat: Dict[str, str] = {} self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain") # Phase 7 Unit 7d-B: watches the transport for a terminal auth revocation # (a 4401 close after a successful handshake = the operator opted this @@ -243,6 +252,17 @@ class RelayAdapter(BasePlatformAdapter): chat = getattr(src, "chat_id", None) if not chat: return + # Phase 1.5: remember the underlying platform for this chat so the + # reply egresses through the right sender (one relay adapter fronts N + # platforms). source.platform is a Platform enum (e.g. Platform.DISCORD, + # mapped from the connector's "discord" by ws_transport _frame_to_event); + # record its string VALUE, skipping the generic RELAY fallback (a + # single-platform connector that didn't tag a concrete platform — the + # connector's session default handles egress then). + platform = getattr(src, "platform", None) + platform_value = getattr(platform, "value", platform) + if platform_value and platform_value != "relay": + self._platform_by_chat[str(chat)] = str(platform_value) guild = getattr(src, "guild_id", None) if guild: self._scope_by_chat[str(chat)] = str(guild) @@ -282,6 +302,17 @@ class RelayAdapter(BasePlatformAdapter): meta["user_id"] = dm_user return meta + def _platform_is_fronted(self, platform: str) -> bool: + """Whether ``platform`` is one of the platforms this gateway fronts over + the relay (Phase 1.5). Reads the transport's advertised identity set; used + to decide whether a follow-up's platform-prefixed `kind` names a real + fronted platform worth tagging on the frame (vs. leaving egress to the + session default). Safe when the transport is absent or single-identity.""" + ids = getattr(self._transport, "_identities", None) + if not ids: + return False + return any(p == platform for p, _ in ids) + async def on_interrupt(self, session_key: str, chat_id: str) -> None: """Bridge a connector-delivered /stop into the adapter's interrupt path. @@ -456,7 +487,8 @@ class RelayAdapter(BasePlatformAdapter): "content": content, "reply_to": reply_to, "metadata": self._with_scope(chat_id, metadata), - } + }, + platform=self._platform_by_chat.get(str(chat_id)), ) return SendResult( success=bool(result.get("success")), @@ -487,6 +519,16 @@ class RelayAdapter(BasePlatformAdapter): """ if self._transport is None: return SendResult(success=False, error="no transport") + # Phase 1.5: the capability `kind` is platform-prefixed (e.g. + # "discord.interaction_token"), so derive the egress platform from it when + # it names one we front — that tags the OutboundFrame so a multi-platform + # gateway routes the follow-up through the right sender. Falls back to the + # session default (connector-side) when the prefix isn't a fronted platform. + follow_up_platform = None + if kind and "." in kind: + prefix = kind.split(".", 1)[0] + if self._platform_is_fronted(prefix): + follow_up_platform = prefix result = await self._transport.send_follow_up( { "op": "follow_up", @@ -494,7 +536,8 @@ class RelayAdapter(BasePlatformAdapter): "kind": kind, "content": content, "metadata": metadata or {}, - } + }, + platform=follow_up_platform, ) return SendResult( success=bool(result.get("success")), diff --git a/gateway/relay/transport.py b/gateway/relay/transport.py index 7c0058dd98c..82470780edc 100644 --- a/gateway/relay/transport.py +++ b/gateway/relay/transport.py @@ -70,11 +70,19 @@ class RelayTransport(Protocol): """ ... - async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: + async def send_outbound( + self, action: Dict[str, Any], *, platform: Optional[str] = None + ) -> Dict[str, Any]: """Carry an outbound action (send/edit/typing) to the connector. Returns a result dict; for ``op == "send"`` it carries ``success`` and optionally ``message_id`` / ``error``. + + ``platform`` (Phase 1.5) tags WHICH fronted platform this reply targets, + carried on the OutboundFrame envelope so a gateway fronting N platforms + egresses each reply through the right sender (the transport resolves the + matching advertised botId). Omitted ⇒ the connector falls back to the + session's default platform (single-platform deploys unchanged). """ ... @@ -106,7 +114,9 @@ class RelayTransport(Protocol): """ ... - async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]: + async def send_follow_up( + self, action: Dict[str, Any], *, platform: Optional[str] = None + ) -> Dict[str, Any]: """Act on a shared-identity capability bound to a session (A2 outbound). Some platforms hand the connector a credential that acts on the SHARED diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index 5d66da9045f..2bc5143d242 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -201,6 +201,7 @@ class WebSocketRelayTransport: platform: str, bot_id: str, *, + identities: Optional[list[tuple[str, str]]] = None, connect_timeout_s: float = _HANDSHAKE_TIMEOUT_S, outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S, gateway_id: Optional[str] = None, @@ -217,6 +218,13 @@ class WebSocketRelayTransport: self._url = _ws_dial_url(url) self._platform = platform self._bot_id = bot_id + # Phase 1.5 (Shape A): the full SET of (platform, bot_id) this gateway + # fronts on this one WS. The handshake sends one `hello` per identity so + # the connector accumulates them into its advertised set (gateway-gateway + # D-Q1.5b.1); the first identity (platform/bot_id above) is the default an + # untagged outbound falls back to. Defaults to the single (platform, bot_id) + # so existing single-platform callers are unchanged. + self._identities = list(identities) if identities else [(platform, bot_id)] self._connect_timeout_s = connect_timeout_s self._outbound_timeout_s = outbound_timeout_s # Connection auth (Phase 2): when a per-gateway secret is configured the @@ -303,8 +311,13 @@ class WebSocketRelayTransport: else: self._ws = await websockets.connect(self._url) # type: ignore[union-attr] self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader") - # Send hello; the descriptor arrives via the reader and resolves handshake(). - await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id}) + # Send one hello PER fronted identity (Phase 1.5 Shape A). The connector + # accumulates them into its advertised set (the first sets the session + # default; each adds to the egress-allowed set). A single-platform gateway + # sends exactly one hello — byte-identical to before. The descriptor for + # the FIRST identity resolves handshake(); later descriptors are absorbed. + for platform, bot_id in self._identities: + await self._send({"type": "hello", "platform": platform, "botId": bot_id}) def _upgrade_headers(self) -> Dict[str, str]: """Auth headers for the WS upgrade, or {} when no secret is configured. @@ -372,14 +385,35 @@ class WebSocketRelayTransport: self._inbound = handler # ── outbound ───────────────────────────────────────────────────────── - async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: - return await self._request_response(action) + async def send_outbound( + self, action: Dict[str, Any], *, platform: Optional[str] = None + ) -> Dict[str, Any]: + return await self._request_response(action, platform=platform) - async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]: + async def send_follow_up( + self, action: Dict[str, Any], *, platform: Optional[str] = None + ) -> Dict[str, Any]: # follow_up rides the same outbound frame; the connector dispatches by # action.op. Kept as a distinct method to satisfy the transport Protocol # and to make the A2 call site explicit. - return await self._request_response(action) + return await self._request_response(action, platform=platform) + + def _bot_id_for(self, platform: Optional[str]) -> Optional[str]: + """The bot_id this transport advertised at hello for ``platform`` (Phase 1.5). + + The connector validates a per-frame egress target against the SET of + ``platform:botId`` pairs it accumulated from the N hellos, so a per-frame + ``platform`` must ride with its MATCHING ``botId`` (the session default + botId belongs to the first identity and would mis-key for a second + platform). Resolved from the identity set this transport was built with. + None when the platform isn't one we front (the connector then rejects it + with a structured failure — never a wrong-credential send).""" + if not platform: + return None + for p, b in self._identities: + if p == platform: + return b + return None async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: result = await self._request_response( @@ -475,7 +509,11 @@ class WebSocketRelayTransport: logger.debug("relay: inbound_ack send failed for %s", buffer_id) async def _request_response( - self, action: Dict[str, Any], frame_type: str = "outbound" + self, + action: Dict[str, Any], + frame_type: str = "outbound", + *, + platform: Optional[str] = None, ) -> Dict[str, Any]: if self._ws is None: return {"success": False, "error": "relay transport not connected"} @@ -483,8 +521,20 @@ class WebSocketRelayTransport: loop = asyncio.get_running_loop() fut: asyncio.Future[Dict[str, Any]] = loop.create_future() self._pending[request_id] = fut + frame: Dict[str, Any] = {"type": frame_type, "requestId": request_id, "action": action} + # Phase 1.5: tag the per-frame egress platform on the OutboundFrame + # envelope (gateway-gateway D-Q1.5b.1), with its MATCHING advertised botId + # so the connector's `${platform}:${botId}` advertised-set check passes. + # Only set when a concrete platform was resolved for this chat so a + # single-platform gateway emits the exact frame shape as before (the + # connector falls back to the session's default platform when absent). + if platform: + frame["platform"] = platform + bot_id = self._bot_id_for(platform) + if bot_id: + frame["botId"] = bot_id try: - await self._send({"type": frame_type, "requestId": request_id, "action": action}) + await self._send(frame) return await asyncio.wait_for(fut, timeout=self._outbound_timeout_s) except asyncio.TimeoutError: return {"success": False, "error": "relay outbound timed out"} diff --git a/tests/gateway/relay/stub_connector.py b/tests/gateway/relay/stub_connector.py index 2d9baab3e41..c549a982c4e 100644 --- a/tests/gateway/relay/stub_connector.py +++ b/tests/gateway/relay/stub_connector.py @@ -30,8 +30,15 @@ class StubConnector: self._passthrough: Optional[Any] = None self.connected = False self.sent: List[Dict[str, Any]] = [] + # Per-frame egress platform recorded alongside each sent action (Phase 1.5). + self.sent_platforms: List[Optional[str]] = [] self.interrupts: List[Dict[str, Any]] = [] self.follow_ups: List[Dict[str, Any]] = [] + self.follow_up_platforms: List[Optional[str]] = [] + # The fronted (platform, bot_id) identity set (Phase 1.5). Mirrors the real + # transport's _identities so RelayAdapter._platform_is_fronted resolves; a + # single-identity default keeps existing tests' behaviour unchanged. + self._identities: List[tuple] = [(descriptor.platform, "")] self.chat_info: Dict[str, Dict[str, Any]] = {} # Canned result for the next send_outbound (override per-test). self.next_send_result: Dict[str, Any] = {"success": True, "message_id": "m1"} @@ -64,8 +71,13 @@ class StubConnector: (Phase 5 §5.1).""" self._passthrough = handler - async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: + async def send_outbound( + self, action: Dict[str, Any], *, platform: Optional[str] = None + ) -> Dict[str, Any]: + # Record the per-frame egress platform (Phase 1.5) alongside the action so + # tests can assert which platform a reply was tagged for. self.sent.append(action) + self.sent_platforms.append(platform) if action.get("op") == "send": return dict(self.next_send_result) return {"success": True} @@ -76,8 +88,11 @@ class StubConnector: async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None: self.interrupts.append({"session_key": session_key, "reason": reason}) - async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]: + async def send_follow_up( + self, action: Dict[str, Any], *, platform: Optional[str] = None + ) -> Dict[str, Any]: self.follow_ups.append(action) + self.follow_up_platforms.append(platform) return dict(self.next_follow_up_result) # ── test driver ────────────────────────────────────────────────────── diff --git a/tests/gateway/relay/test_relay_adapter.py b/tests/gateway/relay/test_relay_adapter.py index b7cea4b3946..cc837624687 100644 --- a/tests/gateway/relay/test_relay_adapter.py +++ b/tests/gateway/relay/test_relay_adapter.py @@ -116,12 +116,16 @@ class _CaptureTransport: def __init__(self): self.sent = None + self.sent_platform = None + # No concrete fronted identities ⇒ _platform_is_fronted is a no-op here. + self._identities = [] def set_inbound_handler(self, h): # noqa: D401 self._h = h - async def send_outbound(self, action): + async def send_outbound(self, action, *, platform=None): self.sent = action + self.sent_platform = platform return {"success": True, "message_id": "m1"} diff --git a/tests/gateway/relay/test_relay_multiplatform.py b/tests/gateway/relay/test_relay_multiplatform.py new file mode 100644 index 00000000000..06fd47e734b --- /dev/null +++ b/tests/gateway/relay/test_relay_multiplatform.py @@ -0,0 +1,229 @@ +"""Unit tests for Phase 1.5 multi-platform-per-agent (relay). + +Covers the agent half of Shape A (gateway-gateway D-Q1.5b.1 / D-Q1.5c): + - relay_platform_identities() parsing the GATEWAY_RELAY_PLATFORMS list + + GATEWAY_RELAY_BOT_IDS keyed map (the cut-over shape — no scalar fallback), + - relay_bot_username() reading the per-platform username, + - self_provision_relay() looping one /relay/provision POST per platform under + one gatewayId + one secret, partial-failure-tolerant, + - the RelayAdapter stamping the per-frame egress platform on outbound from the + chat's inbound source.platform. + +The connector HTTP is monkeypatched; the cross-repo E2E exercises the real path. +""" + +from __future__ import annotations + +import json + +import pytest + +import gateway.relay as relay + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + for k in ( + "GATEWAY_RELAY_URL", + "GATEWAY_RELAY_ID", + "GATEWAY_RELAY_SECRET", + "GATEWAY_RELAY_DELIVERY_KEY", + "GATEWAY_RELAY_PLATFORM", + "GATEWAY_RELAY_BOT_ID", + "GATEWAY_RELAY_PLATFORMS", + "GATEWAY_RELAY_BOT_IDS", + ): + monkeypatch.delenv(k, raising=False) + monkeypatch.setattr("gateway.run._load_gateway_config", lambda: {}, raising=False) + + +# ─────────────────────────── identity parsing ─────────────────────────── + +def test_identities_default_relay_when_unconfigured(): + assert relay.relay_platform_identities() == [("relay", "")] + # The primary helper mirrors the first identity. + assert relay.relay_platform_identity() == ("relay", "") + + +def test_identities_single_platform(monkeypatch): + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") + monkeypatch.setenv("GATEWAY_RELAY_BOT_IDS", json.dumps({"discord": {"botId": "app-1"}})) + assert relay.relay_platform_identities() == [("discord", "app-1")] + assert relay.relay_platform_identity() == ("discord", "app-1") + + +def test_identities_multi_platform_keyed_map(monkeypatch): + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord, telegram") + monkeypatch.setenv( + "GATEWAY_RELAY_BOT_IDS", + json.dumps( + { + "discord": {"botId": "app-1"}, + "telegram": {"botId": "bot-9", "username": "@my_bot"}, + } + ), + ) + # Order preserved; whitespace in the list trimmed. + assert relay.relay_platform_identities() == [("discord", "app-1"), ("telegram", "bot-9")] + # The PRIMARY is the first listed platform. + assert relay.relay_platform_identity() == ("discord", "app-1") + # Username folded into the per-platform entry; the leading @ is stripped. + assert relay.relay_bot_username("telegram") == "my_bot" + assert relay.relay_bot_username("discord") is None + + +def test_identities_platform_missing_from_map_gets_empty_bot_id(monkeypatch): + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord,telegram") + monkeypatch.setenv("GATEWAY_RELAY_BOT_IDS", json.dumps({"discord": {"botId": "app-1"}})) + # telegram is listed but absent from the ids map ⇒ empty bot_id (the + # connector rejects an unprovisioned platform with a structured failure). + assert relay.relay_platform_identities() == [("discord", "app-1"), ("telegram", "")] + + +def test_bot_ids_malformed_json_degrades_to_empty(monkeypatch): + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") + monkeypatch.setenv("GATEWAY_RELAY_BOT_IDS", "{not valid json") + # A bad map must not crash boot — degrades to empty bot ids. + assert relay.relay_platform_identities() == [("discord", "")] + + +# ─────────────────────────── provision loop ─────────────────────────── + +def _arm(monkeypatch, *, url="wss://connector.example/relay", token="nas-token"): + monkeypatch.setattr(relay, "relay_url", lambda: url) + monkeypatch.setattr("hermes_cli.auth.resolve_nous_access_token", lambda: token) + + +def test_self_provision_loops_per_platform(monkeypatch): + _arm(monkeypatch) + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord,telegram") + monkeypatch.setenv( + "GATEWAY_RELAY_BOT_IDS", + json.dumps({"discord": {"botId": "app-1"}, "telegram": {"botId": "bot-9"}}), + ) + calls = [] + + def _fake(**kwargs): + calls.append((kwargs["platform"], kwargs["bot_id"], kwargs["gateway_id"])) + return {"secret": "s" * 64, "deliveryKey": "d" * 64, "tenant": "t", "gatewayId": kwargs["gateway_id"]} + + monkeypatch.setattr(relay, "_post_provision", _fake) + assert relay.self_provision_relay() is True + # One POST per fronted platform, all under the SAME gatewayId. + assert [(p, b) for p, b, _ in calls] == [("discord", "app-1"), ("telegram", "bot-9")] + assert len({gw for _, _, gw in calls}) == 1 + # The in-process secret is set once (from the first success). + import os + + assert os.environ["GATEWAY_RELAY_SECRET"] == "s" * 64 + + +def test_self_provision_partial_failure_tolerant(monkeypatch): + _arm(monkeypatch) + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord,telegram") + monkeypatch.setenv( + "GATEWAY_RELAY_BOT_IDS", + json.dumps({"discord": {"botId": "app-1"}, "telegram": {"botId": "bot-9"}}), + ) + + def _fake(**kwargs): + if kwargs["platform"] == "telegram": + raise RuntimeError("telegram provision boom") + return {"secret": "s" * 64, "deliveryKey": "d" * 64, "tenant": "t", "gatewayId": kwargs["gateway_id"]} + + monkeypatch.setattr(relay, "_post_provision", _fake) + # discord succeeds, telegram fails ⇒ still True (at least one fronted). + assert relay.self_provision_relay() is True + + +def test_self_provision_all_fail_returns_false(monkeypatch): + _arm(monkeypatch) + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord,telegram") + + def _fake(**kwargs): + raise RuntimeError("boom") + + monkeypatch.setattr(relay, "_post_provision", _fake) + assert relay.self_provision_relay() is False + + +# ─────────────────────────── per-frame egress (adapter) ─────────────────────────── + +@pytest.mark.asyncio +async def test_adapter_stamps_per_frame_platform_from_inbound(monkeypatch): + """An inbound from a concrete platform makes the reply egress tagged for it.""" + from gateway.config import Platform, PlatformConfig + from gateway.platforms.base import MessageEvent, MessageType + from gateway.relay.adapter import RelayAdapter + from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor + from gateway.session import SessionSource + + from tests.gateway.relay.stub_connector import StubConnector + + descriptor = CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="relay", + label="Relay", + max_message_length=4096, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=False, + markdown_dialect="plain", + len_unit="chars", + ) + stub = StubConnector(descriptor) + # This gateway fronts both discord and telegram. + stub._identities = [("discord", "app-1"), ("telegram", "bot-9")] + adapter = RelayAdapter(PlatformConfig(), descriptor, transport=stub) + await adapter.connect() + + # A telegram inbound for chat "tg-1". + await stub.push_inbound( + MessageEvent( + text="hi", + message_type=MessageType.TEXT, + source=SessionSource(platform=Platform.TELEGRAM, chat_id="tg-1", chat_type="dm", user_id="u-1"), + ) + ) + await adapter.send("tg-1", "a telegram reply") + # The reply was tagged for telegram (per-frame egress). + assert stub.sent_platforms[-1] == "telegram" + + # A discord inbound for chat "dc-1". + await stub.push_inbound( + MessageEvent( + text="yo", + message_type=MessageType.TEXT, + source=SessionSource(platform=Platform.DISCORD, chat_id="dc-1", chat_type="channel", guild_id="g-1"), + ) + ) + await adapter.send("dc-1", "a discord reply") + assert stub.sent_platforms[-1] == "discord" + + +@pytest.mark.asyncio +async def test_adapter_untagged_when_chat_platform_unknown(monkeypatch): + """A reply to a chat we never saw inbound for carries no per-frame platform + (the connector falls back to the session default).""" + from gateway.config import Platform, PlatformConfig + from gateway.relay.adapter import RelayAdapter + from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor + + from tests.gateway.relay.stub_connector import StubConnector + + descriptor = CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="relay", + label="Relay", + max_message_length=4096, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=False, + markdown_dialect="plain", + len_unit="chars", + ) + stub = StubConnector(descriptor) + adapter = RelayAdapter(PlatformConfig(), descriptor, transport=stub) + await adapter.connect() + await adapter.send("never-seen", "reply") + assert stub.sent_platforms[-1] is None diff --git a/tests/gateway/relay/test_relay_policy_send.py b/tests/gateway/relay/test_relay_policy_send.py index a7c7b79be35..3996eec1956 100644 --- a/tests/gateway/relay/test_relay_policy_send.py +++ b/tests/gateway/relay/test_relay_policy_send.py @@ -23,6 +23,8 @@ def _clean_env(monkeypatch): "GATEWAY_RELAY_SECRET", "GATEWAY_RELAY_PLATFORM", "GATEWAY_RELAY_BOT_ID", + "GATEWAY_RELAY_PLATFORMS", + "GATEWAY_RELAY_BOT_IDS", "DISCORD_ALLOW_BOTS", ): monkeypatch.delenv(k, raising=False) @@ -34,7 +36,7 @@ def _clean_env(monkeypatch): # -------------------------------------------------------------------------- def test_projection_maps_require_mention_and_free_response(monkeypatch): - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") monkeypatch.setattr( "gateway.run._load_gateway_config", lambda: {"discord": {"require_mention": True, "free_response_channels": ["c-support", "c-help"]}}, @@ -50,7 +52,7 @@ def test_projection_maps_require_mention_and_free_response(monkeypatch): def test_projection_allow_other_bots_from_env(monkeypatch): - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") monkeypatch.setenv("DISCORD_ALLOW_BOTS", "all") monkeypatch.setattr( "gateway.run._load_gateway_config", @@ -62,7 +64,7 @@ def test_projection_allow_other_bots_from_env(monkeypatch): def test_projection_comma_string_free_response(monkeypatch): - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") monkeypatch.setattr( "gateway.run._load_gateway_config", lambda: {"discord": {"free_response_channels": "c1, c2 ,c3"}}, @@ -73,7 +75,7 @@ def test_projection_comma_string_free_response(monkeypatch): def test_projection_falls_back_to_top_level_require_mention(monkeypatch): - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") monkeypatch.setattr( "gateway.run._load_gateway_config", lambda: {"require_mention": True}, # top-level, no discord: block @@ -86,7 +88,7 @@ def test_projection_falls_back_to_top_level_require_mention(monkeypatch): def test_projection_none_when_all_default(monkeypatch): # No require_mention, no free-response, no allow-bots ⇒ nothing to declare # (the connector's quiet default already matches). - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") monkeypatch.setattr("gateway.run._load_gateway_config", lambda: {"discord": {}}, raising=False) assert relay.relay_relevance_policy() is None @@ -109,7 +111,7 @@ def _arm(monkeypatch, *, url="wss://connector.example/relay"): monkeypatch.setenv("GATEWAY_RELAY_URL", url) monkeypatch.setenv("GATEWAY_RELAY_ID", "gw-x") monkeypatch.setenv("GATEWAY_RELAY_SECRET", "s" * 48) - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") def test_send_posts_projected_policy_with_token(monkeypatch): @@ -137,7 +139,7 @@ def test_send_posts_projected_policy_with_token(monkeypatch): def test_send_skips_when_no_secret(monkeypatch): monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay") - monkeypatch.setenv("GATEWAY_RELAY_PLATFORM", "discord") + monkeypatch.setenv("GATEWAY_RELAY_PLATFORMS", "discord") # no GATEWAY_RELAY_ID / SECRET monkeypatch.setattr( "gateway.run._load_gateway_config",